Cisco Nexus Dashboard Insights from release 5.0.1x can use the Kafka services that runs on ND and subscribe to a topic as a publisher to that topic that has been created on a Kafka service. You can then have a Kafka consumer subscribe to that topic and receive all the messages. The messages that can be obtained from NI in this way are anomalies, advisories, faults, audit logs and statistics. You can be selective on what you export (based on your requirements) and then send the Kafka consumer messages to some other application like elasticsearch/kibana to do custom queries and visualizations.
Previously, I had written an article: Subscribing Nexus Dashboard Insights Kafka Producer to a Kafka topic and streaming events to a Kafka Consumer
In this article, I will go 1 step further and show you how to pull in the data obtained from NI to Elasticsearch/Kibana for data analysis and Visualizations as depicted In the figure below.

What is Kafka:
Applications frequently need to talk to other applications. As an example Application A gathers some data and Application B needs to do some custom processing with the output of data from Application A. When Applications that are split in functionality need to exchange data between each other, there are generally 2 kinds of ways to do this.
- The old legacy way was Synchronous communications from application to application. This method is not very reliable as data can be lost when the receiving application goes down for a period of time. Further if Application A needs to send data to multiple other applications, let’s say Application A (buying service), needs to send data to Application B (shipping service) and Application C (inventory service) then Application A will have to send the messages to 2 different applications increasing load on Application A. Further synchronous messaging between applications can be problematic if there are sudden spikes of traffic.
- The way that most applications handle messaging between them is through some sort of middle-ware service. The middle-ware service can be broadly categorized in 3 different models.
- queue model. A queue model is where there is a queue service that polls the producer for messages at given intervals and stores the message. The consumers of the messages then poll the queue service at given intervals of time and picks up the messages and sends a command to clear up the messages from the queue, so that other consumers don’t pick it up also, hence duplicating messages. This works well when there is a cluster of consumers to share the load of processing. Queue model is basically a pull model. In AWS, the equivalent of this service is SQS (Simple Queue Service).
- pub/sub model. In this model, the producer application subscribes to a topic that’s hosted by the queue service. It then sends the messages (push model), to the middleware application hosting the topic. The consumer applications then subscribe to the topic also but as consumers. In AWS, the equivalent of this service is SNS (Simple Notification Service).
- Real time pub/sub model or streaming messages from publisher to middleware queue service. RabbitMQ and Apache Kafka fall into this category. in AWS, the equivalent of this service is Kinesis.
What is Elsasticsearch/Kibana:
The ELK Stack is a set of three open-source products—Elasticsearch, Logstash and Kibana—all developed and maintained by Elastic. Elasticsearch is NoSQL database that uses the Lucene search engine. Logstash is a data processing and transportation pipeline used to populate Elasticsearch with the data (though also it supports other destinations including Graphite, Kafka, Nagios and RabbitMQ). Kibana is a dashboard that works on top of Elasticsearch and facilitates data analysis using visualizations and dashboards.
source: https://devops.com/splunk-elk-stack-side-side-comparison/
Elasticsearch V. Splunk:
According to Google Trends, the ELK Stack has now overtaken Splunk in terms of the proportion of Google searches. But ELK’s traction does not stop there. As mentioned earlier, Splunk self-reports 12,000 total users. Elasticsearch is reportedly downloaded 500,000 times every single month. In IT departments, then, it is far more likely to meet people who are familiar with ELK than with Splunk, meaning that the adoption rate of the ELK Stack could “snowball” and increase even more in the future whenever ELK users join new companies or teams. People tend to use whatever software they already know or is already being used.
source: https://devops.com/splunk-elk-stack-side-side-comparison/
A quick primer on Elasticsearch/Kibana:
There are tons of documentation and videos online where you can learn about using elasticsearch/kibana (just do a google and youtube search). This is quite an elaborate topic and is beyond the scope of this write up. You can also visit: https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
Here, I will highlight some of the main items:
- Elastcisearch based on Lucene Search from 1998 (developed by google)
- Uses Inverted Index, maps words to the actual document locations of where they occur
- process of taking the data and converting to Inverted Index takes time
- During indexing the data goes through Analyzer. Analyzer transorms (tokenize and filters) the data to remove unnecessary information, like Removing Stop Words, Lowercasing, Stemming, and matching Synonyms. This is based on Artifical Intelligence (language based).
- once, done indexing, searching is lightning fast (searching data is the main purpose of elasticsearch). This is why it is used for Big Data
- Elasticsearch is a distributed Document DB (json based). You can Insert Documents, Delete Documents, Retreive Documents, Analyze Documents and Search Documents
- documents are split up in shards, shards have segments in them which are the inverted indicies
- Elasticsearch can be distributed across many nodes and nodes can have Primary and Replica shards that gives it high availability
- Elastic Search Queries are based on DSL (Domain Specific Language).
- Searches can be done with Query or Filter or a combination of both
- Queries are based on Relevancy Score, the higher the score, the more matches in a particular document
- Filters are very close to Queries, but don’t have Relevancy Score, so it is blazing fast for very large data sets
- Aggregation DSL is an exteremely powerful tool. An aggregation summarizes your data as metrics, statistics, or other analytics.
Let’s now start implementing the POC for NDI to Elasticsearch/Kibana
To start off with go to Nexus Dashboard Insights/Site Group Configuration as shown below

Click on Export Data and create 3 Exports:
- eventz
- audit
- anomalies

Details of “eventz” are shown below


For Audit topic, just enable audit logs as shown below:

For Anomalies, enable Anomalies, other than info as shown below

Now, that the NDI export data has been configured, let’s start bringing up the ELK stack and configuring the topics
First, we need to install the elk stack. For this, you can use the below procedure to get this going in minutes.
1) Install ubuntu, docker and docker compose. If you need help, please see: https://github.com/soumukhe/ndi-kafka-elasticsearch-kibana/blob/master/README.md
2) ssh to ubuntu box and clone this repo: git clone https://github.com/soumukhe/ndi-kafka-elasticsearch-kibana.git
3) cd ndi-kafka-elasticsearch-kibana
4) run the fixIP script with the IP of the VM. For example ./fixIP 10.10.140.10
5) docker-compose up -d
6) make sure to do a docker ps to verify that all 8 associated containers are up and running
You should have 8 cotainers running as shown below:
aciadmin@DMZ-Ubuntu-Jump-User:~/ndi-kafka-elasticsearch-kibana$ docker ps --format '{{ .Names }}' | sort |nl
1 broker
2 elasticsearch
3 kafkacat
4 kafka-connect
5 kibana
6 ksqldb
7 schema-registry
8 zookeeper
after the containers come up, please repeat the command "docker ps --format '{{ .Names }}' | sort |nl" after a few minutes.
What I noticed is that ksqldb might have crashed. If this is the case, just do "docker-compose up -d" again and then ksqldb will be stable
you can always check logs for ksqldb by: docker logs -f ksqldb
Next, let’s create the 3 topics:
open session to ksqldb container: docker exec -it ksqldb ksql http://ksqldb:8088

Create Schema for eventz topic:
CREATE STREAM events (type VARCHAR,eventDN VARCHAR,changeSet VARCHAR,createTime VARCHAR KEY,modType VARCHAR,severity VARCHAR, desc VARCHAR, trigger VARCHAR,cause VARCHAR, eventDnClass VARCHAR,nodeName VARCHAR,fabricName VARCHAR,nodeType VARCHAR,vendor VARCHAR) WITH (KAFKA_TOPIC='eventz', PARTITIONS=1, FORMAT='JSON');

Create Schema for anomalies topic:
CREATE STREAM anomalies (alertType VARCHAR,anomalyId VARCHAR,category VARCHAR,changeSet VARCHAR,startTs VARCHAR KEY,endTs VARCHAR,fabricName VARCHAR, cleared VARCHAR, comment VARCHAR,cause VARCHAR, mnemonicTitle VARCHAR,nodeNames VARCHAR,vendor VARCHAR,offline VARCHAR,newAlert VARCHAR ) WITH (KAFKA_TOPIC='anomalies', PARTITIONS=1, FORMAT='JSON');

Create Schema for audit topic:
CREATE STREAM audit (className VARCHAR,configDn VARCHAR,changeSet VARCHAR,modType VARCHAR,createTime VARCHAR KEY,severity INT,txId VARCHAR, user VARCHAR, auditId VARCHAR,descr VARCHAR, auditCode VARCHAR,type VARCHAR,nodeName VARCHAR,fabricName VARCHAR,nodeType VARCHAR,vendor VARCHAR ) WITH (KAFKA_TOPIC='audit', PARTITIONS=1, FORMAT='JSON');

Verify that topics are created properly:
show topics;

You can also see details of a topic, as an example to see more details on audit topic:
describe audit;

To make sure that you are getting NDI kafka messages, for example in the eventz topic, do the below:
PRINT eventz FROM BEGINNING;

In case you want to delete a topic, you can use the commands as shown below:
drop stream audit delete TOPIC;
drop stream eventz delete TOPIC;
drop stream anomalies delete TOPIC;
Now that we have verified that we are receiving the messages in our Kafka Consumer, the next step is that we need to transport these messages to Elasticsearch. This is generally done with a pipeline. Logstash is ideal for this, however in this case, we will be using Kafkaconnect, beause it already comes packaged with Kafka and can be done really easily. Kafkaconnect is really handy to send data (export) from Kafka to other applications or to receive data to Kafka consumer (even from csv files). Logstash can be used for sending data to elasticsearch from Kafka and from other apps, or even csv files.

Using Kafaconnect to send the data from Kafka consumer to Elasticsearch/Kibana:
For eventz topic:
CREATE SINK CONNECTOR EVENTZ WITH (
'connector.class'='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url'='http://elasticsearch:9200',
'key.converter'='org.apache.kafka.connect.storage.StringConverter',
'value.converter'='org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable'='false',
'type.name'='_doc',
'topics'='eventz',
'key.ignore'='true',
'schema.ignore'='true',
'transforms.setTimestampType0.field'='createTime',
'transforms.setTimestampType0.target.type'='Timestamp'
);

For anomalies topic:
CREATE SINK CONNECTOR ANOMALIES WITH (
'connector.class'='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url'='http://elasticsearch:9200',
'key.converter'='org.apache.kafka.connect.storage.StringConverter',
'value.converter'='org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable'='false',
'type.name'='_doc',
'topics'='anomalies',
'key.ignore'='true',
'schema.ignore'='true',
'transforms.setTimestampType0.field'='startTs',
'transforms.setTimestampType0.target.type'='Timestamp'
);

For audit topic:
CREATE SINK CONNECTOR AUDIT WITH (
'connector.class'='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url'='http://elasticsearch:9200',
'key.converter'='org.apache.kafka.connect.storage.StringConverter',
'value.converter'='org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable'='false',
'type.name'='_doc',
'topics'='audit',
'key.ignore'='true',
'schema.ignore'='true',
'transforms.setTimestampType0.field'='createTime',
'transforms.setTimestampType0.target.type'='Timestamp'
);

Verify all the connectors are good:
show connectors;

Now, that you have connected the connectors, verify that elasticsearch is receiving the data from Kafka Consumer:
do the followig 3 curls for events, audits and anomalies. Make sure you do the curl from your ubuntu shell, not from ksqldb
If you don't have jq installed please install jq with:
sudo apt install jq
curl -s http://localhost:9200/eventz/_search \
-H 'content-type: application/json'\
-d '{ "size": 42 }'|jq -c '.hits.hits[]'
curl -s http://localhost:9200/audit/_search \
-H 'content-type: application/json'\
-d '{ "size": 42 }'|jq -c '.hits.hits[]'
curl -s http://localhost:9200/anomalies/_search \
-H 'content-type: application/json'\
-d '{ "size": 42 }'|jq -c '.hits.hits[]'

Also, check the mappings by the below curl command from ubuntu shell:
curl -s http://localhost:9200/eventz/_mapping | jq '.'
curl -s http://localhost:9200/audit/_mapping | jq '.'
curl -s http://localhost:9200/anomalies/_mapping | jq '.'

In case you need to drop the connectors, do the below:
From ksqldb:
DROP CONNECTOR ANOMALIES;
DROP CONNECTOR EVENTZ;
DROP CONNECTOR AUDIT;
From Ubuntu bash:
delete the existing index in Elasticsearch
docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/amomalies"
docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/eventz"
docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/audit"
We are all done with the configurations:
Next, browse to elasticsearch with http using the ip of the ubuntu box and port 5601
In my case, I browsed to http://10.1.100.11:5601

In the search bar type in “index pattern”

click on Create Index Pattern

You will now see the 3 indicies show up.

For this POC, create 3 index patterns one for anomalies, one for audit and one for eventz
Below I show the example for anomalies, type in anamolies in the “Index pattern name” field, then click “Next Step”

In the Time Field, choose “startTs” and then click “Create Index pattern”

Repeat the above steps and then create index pattern for audit and eventz
for audit, choose the primary time field as: createTime
for eventz, choose the primary time field as: createTime
Now, click on the 3 parallel bars on top left of the elastic screen and click on “Discover”

Make sure to choose time to show as “Last 6 days”. Also note that you can switch between anomalies, eventz and audit view

📗 You can expand any of the records and look at more details as shown below

📗 You can click on any of the field of interest and hit the “+” button to put in a filter.
As an example when I click on entityName, I see I have some issue with e1/25. I want to filter on e1/25 to view all anomalies for that interface. This will put a filter for entityName: e1/25 so, I can see all issues with e1/25 as shown below

Querying with DSL scripts:
Elastic search has very powerful DSL (Domain Specific Language) scripts. You can do Query and Filter and Aggregate with these scripts. Please look at Elastic Search documentation for more information on usage. To start using DSL scripts, type in “dev” in search box as shown below:

On the left hand paneof Dev Tools, copy and paste the below DSL Scripts:
#matchall: for everything
#--------------------------
GET /eventz/_search
{
"query": {
"match_all": {}
}
}
match
#--------
GET /anomalies/_search
{
"query": {
"match": {"mnemonicDescription": "bridge"}
}
}
# Below will not return anything because "BRIDGE_DOMAIN_HAS_INVALID_VRF" is 1 word
#---------------------------------------------------------------------------------
GET /anomalies/_search
{
"query": {
"match": {"anomalyType": "bridge"}
}
}
# match_phrase_prefix
#--------------------
GET /anomalies/_search
{
"query": {
"match_phrase_prefix": {"mnemonicTitle": "BRIDGE_DOMAIN_HA"}
}
}
# search for docs that have "mnemonicDescription"
# ----------------------------------------------
GET /anomalies/_search
{
"size": 200,
"_source":{
"includes": "mnemonicDescription"
}
}
# do a agg to see what kind of anamolies by number
#-------------------------------------------------
GET /anomalies/_search
{
"size":0,
"aggs": {
"anomilies_summary": {
"terms": {
"field": "mnemonicDescription.keyword"
}
}
}
}
# do a agg to see what kind of eventz by number
# ----------------------------------------------
GET /eventz/_search
{
"size":0,
"aggs": {
"eventz_summary": {
"terms": {
"field": "description.keyword"
}
}
}
}
# Anomalies,count by severity
# ----------------------------
GET /anomalies/_search
{
"size":0,
"aggs": {
"anomilies_summary": {
"terms": {
"field": "mnemonicTitle.keyword"
},
"aggs": {
"anomaly_count_by_severity":{
"max": {"field":"severity"}
}
}
}
}
}
# ***********************
Now run the DSL Queries as shown below:

The last Item I would like to demonstrate is the Kibana Dashboard:
Click on 3 bars and Visualize as shown below. Then click on “Create new visualization”

Next, click on Lens. Aggretation Based is very powerful, but for the sake of simplicity, we’ll use the Lens view

- Choose anomalies
- drag and drop mnemocDescription.keyword to display pane
- choose Donut
- Save as anomalies view

Next, Repeat the procedure and create a view for mnemomicTitle.keyword. Choose Data Table and save as “anamoly type with count”

Next, repeat but this time choose:
- eventz
- drag and drop descr.keyword in display pane
- choose Data Table
- save as “events list”

Next, repeat but this time choose:
- audit
- drag and drop descr.keyword to view pane
- choose donut
- save as audit view

Now, that we have created 4 views, let’s put them in a Kibana Dashboard.
- Click on 3 bars
- Create New Dashboard

Next,
- Choose “Add from Library”
- Add the views in this order:
- anomalies view
- anomaly type by count
- events list
- audit view
- Save as myDashboard1

Final Kibana Dashboard View:

References:
https://www.cisco.com/c/en/us/support/data-center-analytics/nexus-insights/series.html
https://www.elastic.co/blog/just-enough-kafka-for-the-elastic-stack-part1
https://docs.confluent.io/platform/current/platform.html
https://devops.com/splunk-elk-stack-side-side-comparison/
Kafka Connect in Action: Elasticsearch
https://talks.rmoff.net/QZ5nsS/from-zero-to-hero-with-kafka-connect
https://rmoff.net/2019/10/07/kafka-connect-and-elasticsearch/
An introduction to ksqlDB
Kafka Connect in Action: Loading a CSV file into Kafka
https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html