ACI – WebSocket Subscription for Push Notification

Introduction

Cisco ACI provides the capability to subscribe via Web Socket.  Once you subscribe to particular objects you can get push notifications for those objects. You can subscribe to many objects and each object will get a subscription ID.  As an example if you subscribe to fvBD every time a BD is created/modified/deleted you will get a push notification. 

Cisco APIC REST API Configuration Guide

Brief Description of Web Sockets

rfc6455

WebSockets provide a persistent connection between a client and server that both parties can use to start sending data at any time. The client establishes a WebSocket connection through a process known as the WebSocket handshake. This process starts with the client sending a regular HTTP request to the server. An Upgrade header is included in this request that informs the server that the client wishes to establish a WebSocket connection.

From Good Blog Post On Web Sockets

Websockets use ws or wss protocol (much like http and https).  Persistent tcp connections are formed on port 80 or 443.

Requirements

a) ACI Fabric up and Running
b) Pyrthon 3 virtual Environment (preferably) setup

Following python modules are required:

import requests
requests.packages.urllib3.disable_warnings()
import json import ssl
import websocket
import threading
import schedule
import time
import sys

* No Cobra Required

Brief Description of Script:

  • The script sends a json encapsulated API call to APIC and fetches the token. The Token is later used during the Initial WebSocket connection establishment call.
  • We then construct the cookie dictionary using the Key / Value pair {“APIC-cookie”: token}. The Cookie is used during the get call for subscription (requests.get)
  • We then start the initial connect to the APIC Web Socket. A code of 101 is returned if successful.
  • We then subscribe to various objects in ACI (you can add more as you wish in the code below). The default refresh-timeout is kept to the default of 60 seconds. For every subscription that we make we receive a subscription-ID (in form of dictionary). We save each subscription-id as a variable.
  • We print on the screen any inputs (push Notifications) that are received from the subscriptions.
  • Every 30 seconds we send a refresh to keep the subscriptions alive. We use the subscription-ID for each subscription to do this.
  • Note: that python threading module is needed for the print and refresh functions since they are endless loops (while True:).

Download Python Script

download from GIT

__author__ = 'soumukhe'
#!/usr/bin/env python

# Kindly make sure that you have the modules threading and websocket client installed also.  "pip install threading"    "pip install websocket-client"
# Please use python 3

import requests
requests.packages.urllib3.disable_warnings()
import json
import ssl
import websocket
import threading
import schedule
import time
import sys

# Ensure using python3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

#Please put in IP for APIC and Credentials here:
APIC = "APIC IP"
user = "Username"
password = "Password"

URL = "https://" + APIC + "/api/aaaLogin.json"
BODY = {"aaaUser": {"attributes": {"name": user, "pwd": password}}}

# websocket_url = "wss://" + APIC + "/socket{}".format(token)


def getCookie():
    global cookie
    global token
    login_response = requests.post(URL, json=BODY, verify=False)
    response_body = login_response.content
    # convert response_body to a dict
    response_body_dictionary = json.loads(response_body)
    #print(response_body_dictionary)
    #collect token for authentication
    token = response_body_dictionary["imdata"][0]["aaaLogin"]["attributes"]["token"]
    #print (token)
    cookie = {"APIC-cookie": token}
    #print (cookie)
    return cookie




def WSocket():
    # This module starts the initial connect to the APIC Web Socket
    global ws
    websocket_url = "wss://" + APIC + "/socket{}".format(token)
    ws = websocket.create_connection(websocket_url, sslopt={"cert_reqs": ssl.CERT_NONE})
    print( "WebSocket Subscription Status Code: ", ws.status)
    #print (type(ws))
    #return ws


def Subscribe():
    # This module subscribes to interested objects in ACI
    global tenant_subscription_id
    global bd_subscription_id
    global epg_subscription_id
    global app_profile_subscription_id
    global vrf_subscription_id
    global aaalogin_subscription_id
    global aaalogout_subscription_id




    # subscribe to fvTenant
    tenant_url = "https://" + APIC + "/api/class/fvTenant.json?subscription=yes&refresh-timeout=60?query-target=subtree"
    #print (tenant_url)
    tenant_subscription = requests.get(tenant_url, verify=False, cookies=cookie)
    json_dict = json.loads(tenant_subscription.text)
    nice_output = json.dumps(json_dict, indent=4)
    # print (nice_output)
    tenant_subscription_id = json_dict["subscriptionId"]
    print ("Tenant-Subscription ID: ", tenant_subscription_id)
    # return tenant_subscription_id

    # subscribe to fvBD
    bd_url = "https://" + APIC + "/api/class/fvBD.json?subscription=yes&refresh-timeout=3600?query-target=subtree"
    bd_subscription = requests.get(bd_url, verify=False, cookies=cookie)
    json_dict = json.loads(bd_subscription.text)
    nice_output = json.dumps(json_dict, indent=4)
    # print (nice_output)
    bd_subscription_id = json_dict["subscriptionId"]
    print("BD-Subscription ID: ", bd_subscription_id)

    # subscribe to fvAEPg
    epg_url = "https://" + APIC + "/api/class/fvAEPg.json?subscription=yes&refresh-timeout=60?query-target=subtree"
    epg_subscription = requests.get(epg_url, verify=False, cookies=cookie)
    json_dict = json.loads(epg_subscription.text)
    nice_output = json.dumps(json_dict, indent=4)
    # print (nice_output)
    epg_subscription_id = json_dict["subscriptionId"]
    print("epg-Subscription ID: ", epg_subscription_id)

    # subscribe to fvAp
    APP_Profile_url = "https://" + APIC + "/api/class/fvAp.json?subscription=yes&refresh-timeout=60?query-target=subtree"
    APP_Profile_subscription = requests.get(APP_Profile_url, verify=False, cookies=cookie)
    json_dict = json.loads(APP_Profile_subscription.text)
    nice_output = json.dumps(json_dict, indent=4)
    # print (nice_output)
    app_profile_subscription_id = json_dict["subscriptionId"]
    print("APP_Profile-Subscription ID: ", app_profile_subscription_id)

    # subscribe to fvCtx
    APP_Profile_url = "https://" + APIC + "/api/class/fvCtx.json?subscription=yes&refresh-timeout=60?query-target=subtree"
    APP_Profile_subscription = requests.get(APP_Profile_url, verify=False, cookies=cookie)
    json_dict = json.loads(APP_Profile_subscription.text)
    nice_output = json.dumps(json_dict, indent=4)
    # print (nice_output)
    vrf_subscription_id = json_dict["subscriptionId"]
    print("VRF Subscription ID: ", vrf_subscription_id)

    # subscribe to aaaActiveUserSession - created
    AAALogin_url = "https://"  + APIC + "/api/class/aaaActiveUserSession.json?subscription=yes&refresh-timeout=60?query-target=self&query-target-filter=and(wcard(aaaActiveUserSession.status,\"created\"))"
    AAALogin_subscription = requests.get(AAALogin_url, verify=False, cookies=cookie)
    json_dict = json.loads(AAALogin_subscription.text)
    nice_output = json.dumps(json_dict, indent=4)
    #print (nice_output)
    aaalogin_subscription_id = json_dict["subscriptionId"]
    print("AAALogin-Subscription ID: ", aaalogin_subscription_id)
    
    # subscribe to aaaActiveUserSession - deleted
    AAALogout_url = "https://" + APIC + "/api/class/aaaActiveUserSession.json?subscription=yes&refresh-timeout=60?query-target=self&query-target-filter=and(wcard(aaaActiveUserSession.status,\"deleted\"))"
    AAALogout_subscription = requests.get(AAALogout_url, verify=False, cookies=cookie)
    json_dict = json.loads(AAALogout_subscription.text)
    nice_output = json.dumps(json_dict, indent=4)
    #print (nice_output)
    aaalogout_subscription_id = json_dict["subscriptionId"]
    print("AAALogout-Subscription ID: ", aaalogout_subscription_id)
    print ("\n" * 2 )

def printws():
    while True:
        print(ws.recv())




def refresh():
    # This module refreshes the subscription.  Default Timeout for refresh is 60 seconds as also hardcoded in the subscription module "refresh-timeout=60"
    while True:
        time.sleep(30)
        # refresh subscription  -- fvTenant
        tenant_refresh_url = "https://" + APIC + "/api/subscriptionRefresh.json?id={}".format(tenant_subscription_id)
        tenant_refresh_response = requests.get(tenant_refresh_url, verify=False, cookies=cookie)
        #print(tenant_refresh_response.content)
        #
        # refresh subscription  -- fvBD
        tenant_refresh_url = "https://" + APIC + "/api/subscriptionRefresh.json?id={}".format(bd_subscription_id)
        tenant_refresh_response = requests.get(tenant_refresh_url, verify=False, cookies=cookie)
        #print(tenant_refresh_response.content)
        #
        # refresh subscription  -- fvAEPg
        tenant_refresh_url = "https://" + APIC + "/api/subscriptionRefresh.json?id={}".format(epg_subscription_id)
        tenant_refresh_response = requests.get(tenant_refresh_url, verify=False, cookies=cookie)
        #print(tenant_refresh_response.content)
        #
        # refresh subscription  -- fvAp
        tenant_refresh_url = "https://" + APIC + "/api/subscriptionRefresh.json?id={}".format(app_profile_subscription_id)
        tenant_refresh_response = requests.get(tenant_refresh_url, verify=False, cookies=cookie)
        #print(tenant_refresh_response.content)
        #
        # refresh subscription  -- fvCtx
        tenant_refresh_url = "https://" + APIC + "/api/subscriptionRefresh.json?id={}".format(vrf_subscription_id)
        tenant_refresh_response = requests.get(tenant_refresh_url, verify=False, cookies=cookie)
        # print(tenant_refresh_response.content)
        #
        # refresh subscription  -- aaaActiveUserSession - Status Created
        tenant_refresh_url = "https://" + APIC + "/api/subscriptionRefresh.json?id={}".format(aaalogin_subscription_id)
        tenant_refresh_response = requests.get(tenant_refresh_url, verify=False, cookies=cookie)
        #print(tenant_refresh_response.content)
        #
        # refresh subscription  -- aaaActiveUserSession - Status Deleted
        tenant_refresh_url = "https://" + APIC + "/api/subscriptionRefresh.json?id={}".format(aaalogout_subscription_id)
        tenant_refresh_response = requests.get(tenant_refresh_url, verify=False, cookies=cookie)
        # print(tenant_refresh_response.content)

def startThreading():
    th = threading.Thread(target=printws)
    th1 = threading.Thread(target=refresh)
    th.start()
    th1.start()

if __name__ == "__main__":
    cookie = getCookie()
    #print (cookie)
    token =  (cookie["APIC-cookie"])
    #print (token)
    print("\n" * 2)
    print ("*" * 10, "WebSocket Subscription Status & Messages", "*" * 10)
    WSocket()
    Subscribe()
    startThreading()


Output:

(truncated for readability)

Updates and Feedback Worth Noting:

From the basic idea above, you can make the script more fancy/functional, for instance you can write the output to a file, parse it and pull it into a web front-end using flask.  You can also post it to webex teams.   

Below is feedback that I got from a colleague at Cisco (Eric Saren):

Hi Soumitra,

I wrote a library where you pass a function as argument to the subscription and whenever something is sent on the websocket the function is run. For my testing I created a function that posts anything to webex teams, but in theory any function and logic could be passed.

Br,

Erik


8 thoughts on “ACI – WebSocket Subscription for Push Notification

  1. Great post! I was able to get this to work, but had to make a couple changes:

    1.) On the getCookie() function, I had to modify the line that retrieves the response body to decode it to utf-8 format:
    response_body = login_response.content.decode(‘utf-8’)

    2.) On our APICs we have TLS 1.0 and 1.1 disabled on the mgmt interfaces for obvious security reasons. With this setup, I would get a connection reset when the script tried to establish the web socket. It turns out, it is trying to do this with TLS 1.0. After re-enabling 1.0 on the APICs, everything worked. It appears that the SSL library being used by the websocket module only supports TLS 1.0 (the https request to get the auth token seems to be fine as it uses TLS 1.2)? I’m running Python 3.5.2 and here are the modules I have installed:
    asn1crypto==0.24.0
    certifi==2019.9.11
    cffi==1.12.3
    chardet==3.0.4
    cryptography==2.7
    gevent==1.4.0
    greenlet==0.4.15
    idna==2.8
    pycparser==2.19
    pyOpenSSL==19.0.0
    requests==2.22.0
    schedule==0.6.0
    simplejson==3.16.0
    six==1.12.0
    urllib3==1.25.5
    websocket==0.2.1
    websocket-client==0.56.0

    I haven’t done too much research on this yet to see if there is a way to utilize a more up to date SSL library for websocket connection, but at least in my case, this would be a requirement for me to use this in my production environment.

  2. Thank You so much for sharing this ! This is the power of collaboration, so we all live and learn.

    You are right. I’m having the same problem when I disable TLS 1.0 and 1.1. Let me do some research and look into it.

    Regards
    Soumitra

  3. update.

    I tried again on 2 different fabrics and now it seems to be working for me. When I initially disabled TLS 1.0 and 1.1 earlier it had not worked, but I tried again after a while and it worked. (tried on 2 different fabrics)

    I fired up a new 3.5 and a new 3.6.5 python Virtual Env and here were the results:

    * With 3.5, I hit your issue #1 ( with your fix of utf-8 format, it worked)
    * With 3.6.5 it ran with no issues, not even issue#1

    Can you try with 3.6.5 and see if you get similar results:

    I have minimal set of packages installed on these virtual environments.

    certifi==2019.9.11
    chardet==3.0.4
    idna==2.8
    requests==2.22.0
    schedule==0.6.0
    six==1.12.0
    urllib3==1.25.6
    websocket-client==0.56.0

  4. Hello Mr. Baumann 🙂

    Great post Soumitra !!!

    How does this different from the module in ACI Toolkit ? I’ve been using ACI Toolkit ACI Session module and worked well. I am curious to know if there are any major differences/advantages/disadvantages.

    from applications.acisession import Session

    session = Session(“https://”, “”, “”)
    session.login()

    sub_url = ‘/api/class/acllogDropL3Pkt.json?&subscription=yes&page-size=1’
    sub_url2 = ‘/api/class/fvAEPg.json?&subscription=yes’
    sub_url3 = ‘/api/class/fvIp.json?&subscription=yes’
    sub_url4 = ‘/api/class/fvCEp.json?&subscription=yes&page-size=1’
    sub_url5 = ‘/api/class/infraAccPortP.json?subscription=yes’

    session.subscribe(url=sub_url, only_new=True)
    session.subscribe(url=sub_url2, only_new=True)
    session.subscribe(url=sub_url3, only_new=True)
    session.subscribe(url=sub_url4, only_new=True)
    session.subscribe(url=sub_url5, only_new=True)

  5. Hi, thank you, great work.
    Now i am trying to get actual RIB data from ACI but without success.. any advice please?

    # subscribe to RIBV4
    ribv4_url = “https://” + APIC + “/api/node/mo/topology/pod-1/node-111/sys/uribv4/dom-lab:pbr-vrf/db-rt.json?subscription=yes&refresh-timeout=60?query-target=subtree”
    ribv4_subscription = requests.get(ribv4_url, verify=False, cookies=cookie)
    json_dict = json.loads(ribv4_subscription.text)
    nice_output = json.dumps(json_dict, indent=4)
    # print (nice_output)
    ribv4_subscription_id = json_dict[“subscriptionId”]
    print(“ribv4-Subscription ID: “, ribv4_subscription_id)

    1. Hi Pavel, I don’t know off hand. Will try this out soon as I get some spare time. I have a lot going on, so maybe a while. Meanwhile if anyone else has already done this, please reply back.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.