Automation

Last updated: January 3rd, 2020

UnSubscribed Events

<<<<<<< HEAD

These events fire when a new data feed is detected that is not already configured in the DataBus. When these events are fired, the DataBus will look to see if an automation script has been assigned to the DataSource.

The automation scripts are written in Python and allow for the creation of all aspects of the DataBus configuration, including the setup of Dashboards and Widgets. The example script below shows how a message in JSON format can be decoded and used to automatically configure a processing pipeline into Azure Storage Tables, and then configure a dasboard widget to show the data.

Example:

import clr
from System import DateTime,String,Math,TimeSpan,Array
from System.IO import *
from Indicium.DataBus.Common.Data import *
from Indicium.DataBus.Common.Config import *
from Indicium.DataBus.Common import DataCollectionModeEnum
from System.Collections.Generic import List
from Indicium.DataBus.Automation.Helpers import *
from Indicium.DataBus.Core.Widgets import Chart,ChartTimeSeriesDetail

clr.AddReference("NewtonSoft.Json")

from Newtonsoft.Json import *

class Automation:
    
    def __init__(self):
        self.heartbeat = 0
        
    def unSubscribed(self, api, dataSource, data):
        self.heartbeat += 1     
        
        # Read MQTT Package
        loraJson = data.Values[0].ParseJson()
        iotDeviceName = "DEV-NAME-" + loraJson.sensor.ToString()
        
        # Use API to add input item
        dsApi = api.GetDataSourceByUri(dataSource.Uri)
        newInputItem = dsApi.AddIntervalInputItem(iotDeviceName, TimeSpan.FromMinutes(15), "Auto Generated", "-3d:1d").AddConfig("portal", loraJson.portal).AddConfig("sensor", loraJson.sensor).Enable()
        
        # Add LocalDB Subscription
        pir = newInputItem.AddSubscriptionPipeline("Process Level").Enable()
        pir.AddOutputItem("JsonExtractPlugin").AddConfig("path","$.values[?(@.sensor_type=='level')].value").AddConfig("name","PIR").AddConfig("outputType","int")
        pir.AddOutputItem("TableStoragePlugin","in-azuretablestorage-storagetables").AddConfig("tableName","Dam").AddConfig("idSelect","OutputItem")
        pir.Save()
        
        chart = Chart()
        chart.Name = iotDeviceName
        chart.Period = "-7d:1d"
        chart.ShowTitle = True
        chart.LeftMin = 0
        chart.RightMin = 100
        chart.Size = 6
        chart.Height = 100
        detail = ChartTimeSeriesDetail()
        detail.Uri = "in-azuretablestorage-storagetables/in-azuretablestorage-storagetables-dev-name-" + str(loraJson.sensor) + "-process-level"
        detail.Type = "Line"
        detail.Axis = 0
        chart.TimeSeriesDetailList = Array[ChartTimeSeriesDetail]([detail])
        
        dashboard = api.GetDashboard(4)
        dashboard.AddWidget(1, chart)        
        
        EmailHelper.SendEmail("from@yourdomain.com","to@yourdomain.com","New Device Configured","New device configured: %s" % iotDeviceName);        
            
        return data
=======

These events fire when a new data feed is detected and allow for automating the configuration of the DataBus.

The example below shows how a simple data event can create a simple pipeline for the data in a local database.

from System 

class Automation:
            
    # This function is execute when the plugin detects data from an unconfigured input
    # @api - the api we can use to interact with the DataBus
    # @dataSource - the DataSource which spawned the unknown data
    # @data - the instance of the data that was spawned
    # return the data that has been updated by this script
    def unSubscribed(self, api, dataSource, data):
                
        # Use API to add Input Item
        dsApi = api.GetDataSourceByUri(dataSource.Uri)
        newInputItem = dsApi.AddInputItem(data.Name, "Automatically Created using API").AddConfig("InputTopic", data.Name).Enable()
        
        # Map Battery Voltage
        newInputItem.AddSubscriptionPipeline("Local DB").Enable().AddOutputItem("LocalDbPlugin").AddConfig("writeMode","AppendReplace").AddConfig("idSelect","OutputItem")
        
        return data
                                            
                                        
>>>>>>> 9f6568cc5f03ed030eaec09519a29417c3426333

New Data Events

These events fire when data is first recieved by the DataBus. This allows data to be manuipulated or analysed before it is passed into any processing pipelines. This is a great place to put logic like analysis of JSON messages for IoT devices where payloads need to be decoded and decryption of messages needs to be applied.

<<<<<<< HEAD

Example:

import clr
from System import DateTime,String,Math,Convert,Globalization
from System.IO import *
from Indicium.Common.Time import *
clr.AddReference('NewtonSoft.Json')

from Newtonsoft.Json import *

class Automation:
    def __init__(self):
        self.heartbeat = 0          

    def newData(self, data):
        self.heartbeat += 1
                
        loraJson = data.Point.ParseJson()
        coded_string = loraJson.data.ToString()
        payload = Convert.FromBase64String(coded_string) #decodes base64 to hex
        fPort = int(loraJson.fPort)
        loraJson.databus_data = Automation.decode(self,fPort,payload)
        try:
            #Set a new field with the current time
            loraJson.databus_time = DateTimeExtensions.ToUnixTime(DateTime.UtcNow)
            #Get First occarance of the time
            gwTimeVal = Automation.findTime(self,loraJson.rxInfo)
            #Parse the time from JSON Format
            gwTime = DateTimeExtensions.ParseIsoFormat(gwTimeVal)
            #Set a new field with the time
            loraJson.gateway_time = DateTimeExtensions.ToUnixTime(gwTime)
            # use the time recorded by LORA
            data.Point.TimeStamp = gwTime
        except Exception as ex:
            loraJson.newDataError = str(ex)
            
        data.Point.UpdateJson(loraJson)
        
        return data

    def decode(self, fPort, bytes):               
        decoded = Linq.JObject()
        status = Linq.JObject()

        if (fPort == 1) or (fPort == 2):
            decoded.type = ("alarm" if (fPort == 2) else "periodic")
            decoded.uptime = (( bytes[0] << 24) + (bytes[1] << 16) + (bytes[2] << 8) + (bytes[3] << 0))
            decoded.rssi =( bytes[4] )
            decoded.bat_count = ( (bytes[5] << 8) + bytes[6] )
            decoded.di0 = ( (bytes[7] & 0x80) != 0 )
            decoded.di1 = ( (bytes[7] & 0x40) != 0 )
            decoded.reserved = ( bytes[7] & 0x3f )             
            
            status.val = ( bytes[8])
            status.send_failures = ( (bytes[8] & 0x01) != 0)
            status.read_write_err = ( (bytes[8] & 0x02) != 0)
            status.radio_err = ( (bytes[8] & 0x04) != 0)
            status.erratic_float = ( (bytes[8] & 0x08) != 0)  
            status.watchdog = ( (bytes[8] & 0x10) != 0)
            status.reserved = ( (bytes[8] & 0xE0) >> 5)
            
            decoded.status = status

        return  decoded
        
    def findTime(self, jarray):
        for v in jarray:
            if v.time != None:
                timeVal = v.time.ToString()
        return timeVal
    

Python Plugin

The Python Plugin can be executed in a processing pipeline to execute custom processes on the data. The are all sorts of possibilities here including:

  • Analysing data for alarm conditions and send Emails/SMS messages
  • Transform data using a custom algorithm
  • Transform data using a look-up table
  • Read or write data from a custom system
=======

This example show a simple manipulation of data before it is passed to any processing pipelines.

from System

class Automation:
  
    # This function is execute when the plugin is executed in a pipeline
    # @data - this is the object containing the value or values
    # return the data that has been updated by this script
    def newData(self, data):
        
        # if data is just a point
        if data.Point is None:
            data.Point.Value = data.Point.Value * 10 + 3

        # if data is a series of points
        else:
            for point in data.Values:
                point.Value = point.Value * 10 + 3
        
        return data
                                    
                                

Python Plugin

The Python Plugin can be executed in a processing pipeline to execute custom processes on the data.


from Indicium.DataBus.Automation.Helpers import *

class Plugin:
    
    def __init__(self):
        self.heartbeat = 0

    # This function is execute when the plugin is executed in a pipeline
    # @instance - this is the JobInstance object containing details about what is being processed
    # @event -    this is the BaseEvent (PointEvent/SeriesEvent) object containing the data to be processed
    # return the event object with any changes you have made to it
    def execute(self, instance, event):
        # if alarm value of 100 exceeded - send email
        if data.Point and data.Point.Value > 100:
            EmailHelper.SendMail("databus@indicium.cloud","name@company.com.au","ALARM","Alarm level of 100 execeeded")
        return event

    # This function is execute when the plugin is executed in a read event
    # @instance - this is the JobInstance object containing details about what is being processed
    # @event -    this is the BaseEvent (PointEvent/SeriesEvent) object containing the data to be processed
    # return the event object with any changes you have made to it
    def read(self, instance):
        # put your logic here
        this.heartbeat += 1
        # script is stateful, so return an incrementing number with each read
        return PointEvent(DateTime.Now, this.heartbeat)

Advanced Examples

UnSubscribed Event

The example below sets up a new processing pipeline to manage the extraction of data from a JSON payload (from LoRaServer.io) into a SCADA system via OPC HDA.


import clr
from System import DateTime,String,Math
from System.IO import *
from Indicium.DataBus.Common.Data import *
from Indicium.DataBus.Common.Config import *
from Indicium.DataBus.Common import DataCollectionModeEnum
from System.Collections.Generic import List
from Indicium.DataBus.Automation.Helpers import *

clr.AddReference("NewtonSoft.Json")

from Newtonsoft.Json import *

class Automation:
    
    def __init__(self):
        self.heartbeat = 0
        
    def unSubscribed(self, api, dataSource, data):
        self.heartbeat += 1     
        
        # Read MQTT Message
        loraJson = data.Point.ParseJson()
        iotDeviceName = loraJson.deviceName.ToString()            
        
        # Use API to add Input Item
        dsApi = api.GetDataSourceByUri(dataSource.Uri)
        newInputItem = dsApi.AddIoTInputItem(iotDeviceName, "Automatically Created using API").AddConfig("InputTopic", data.Name).Enable()
        
        # Add LocalDB Subscription
        # Use this to track copies of the full JSON payload
        newInputItem.AddSubscriptionPipeline("LocalDB Payload Logging").Enable().AddOutputItem("DataBus.Databases.LocalDbPlugin").AddConfig("writeMode","AppendReplace").AddConfig("idSelect","OutputItem")
        
        # Add Historian Data Subscriptions
        opcServer = "opchda://localhost/SCADA.HDA"
        tagNameBase = "\\\server\Sewer.Network\"
        # If you have an asset tracking system, 
        # then this might be a good spot to make a call out to that
        # system to lookup what the SCADA tag name might be.
        tagName = iotDeviceName
        
        # 1 - Extract Battery Voltage
        subsPipe = newInputItem.AddSubscriptionPipeline("Voltage").Enable()
        tag1 = tagNameBase + tagName + ".Level.Voltage.PoC"        
        subsPipe.AddOutputItem("DataBus.Plugins.Pipelines.JsonExtractPlugin").AddConfig("path","databus_data.bat_count").AddConfig("outputType","int").AddConfig("name","bat_count")
        subsPipe.AddOutputItem("DataBus.OpcDa.OpcHdaPlugin").AddConfig("serverUrl",opcServer).AddConfig("tag",tag1)
        
        # 2 - Generate DataBus Time 
        # this is the current time and and can be used for performance tracking
        subsPipe2 = newInputItem.AddSubscriptionPipeline("DatabusTime")
        tag2 = tagNameBase + tagName + ".Level.Databus Time.PoC"        
        subsPipe2.AddOutputItem("DataBus.Plugins.Pipelines.JsonExtractPlugin").AddConfig("path","databus_time").AddConfig("outputType","long").AddConfig("name","databus_time")
        subsPipe2.AddOutputItem("DataBus.OpcDa.OpcHdaPlugin").AddConfig("serverUrl",opcServer).AddConfig("tag",tag2)
        
        # 3 - Extract Gateway Time
        subsPipe3 = newInputItem.AddSubscriptionPipeline("GatewayTime")
        tag3 = tagNameBase + tagName + ".Level.Gateway Time.PoC"        
        subsPipe3.AddOutputItem("DataBus.Plugins.Pipelines.JsonExtractPlugin").AddConfig("path","gateway_time").AddConfig("outputType","long").AddConfig("name","gatewayTime")
        subsPipe3.AddOutputItem("DataBus.OpcDa.OpcHdaPlugin").AddConfig("serverUrl",opcServer).AddConfig("tag",tag3)      
        
        # 4 - Extract RSSI
        subsPipe4 = newInputItem.AddSubscriptionPipeline("RSSI").Enable()
        tag4 = tagNameBase + tagName + ".Level.RSSI.PoC"        
        subsPipe4.AddOutputItem("DataBus.Plugins.Pipelines.JsonExtractPlugin").AddConfig("path","databus_data.rssi").AddConfig("outputType","int").AddConfig("name","rssi")
        subsPipe4.AddOutputItem("DataBus.OpcDa.OpcHdaPlugin").AddConfig("serverUrl",opcServer).AddConfig("tag",tag4)      
        
        # 5 - Extract State
        subsPipe5 = newInputItem.AddSubscriptionPipeline("State").Enable()
        tag5 = tagNameBase + tagName + ".Level.State.PoC"        
        subsPipe5.AddOutputItem("DataBus.Plugins.Pipelines.JsonExtractPlugin").AddConfig("path","databus_data.status.val").AddConfig("outputType","int").AddConfig("name","status")
        subsPipe5.AddOutputItem("DataBus.OpcDa.OpcHdaPlugin").AddConfig("serverUrl",opcServer).AddConfig("tag",tag5)      
        
        # send an email via MailGun
        EmailHelper.SendEmail("databus@indicium.cloud","name@company.com","New Device Configured","New device configured: %s" % iotDeviceName);        
            
        return data
    
    

New Data Example

The example below shows how a JSON messages (received from LoRaServer.io) can be analysed and its payload decoded.

import clr
from System import DateTime,String,Math,Convert,Globalization
from System.IO import *
from Indicium.Common.Time import *
clr.AddReference("NewtonSoft.Json")

from Newtonsoft.Json import *

class Automation:
def __init__(self):
    self.heartbeat = 0          

# This function is execute when the plugin is executed in a pipeline
# @data - this is the object containing the value or values
# return the data that has been updated by this script
def newData(self, data):
    self.heartbeat += 1
            
    loraJson = data.Point.ParseJson()
    coded_string = loraJson.data.ToString()
    payload = Convert.FromBase64String(coded_string) #decodes base64 to hex
    fPort = int(loraJson.fPort)
    loraJson.databus_data = Automation.decodePacket(self,fPort,payload)
    try:
        #Set a new field with the current time
        loraJson.databus_time = DateTimeExtensions.ToUnixTime(DateTime.UtcNow)
        #Get First occarance of the time
        gwTimeVal = Automation.findTime(self,loraJson.rxInfo)
        #Parse the time from JSON Format
        gwTime = DateTimeExtensions.ParseIsoFormat(gwTimeVal)
        #Set a new field with the time
        loraJson.gateway_time = DateTimeExtensions.ToUnixTime(gwTime)
        # use the time recorded by LORA
        data.Point.TimeStamp = gwTime
    except Exception as ex:
        loraJson.newDataError = str(ex)
        
    data.Point.UpdateJson(loraJson)
    
    return data

# This function decodes the payload from raw bytes into a JSON message or explicit values
def decodePacket(self, fPort, bytes):               
    decoded = Linq.JObject()
    status = Linq.JObject()

    if (fPort == 1) or (fPort == 2):
        decoded.type = ("alarm" if (fPort == 2) else "periodic")
        decoded.uptime = (( bytes[0] << 24) + (bytes[1] << 16) + (bytes[2] << 8) + (bytes[3] << 0))
        decoded.rssi =( bytes[4] )
        decoded.bat_count = ( (bytes[5] << 8) + bytes[6] )
        decoded.di0 = ( (bytes[7] & 0x80) != 0 )
        decoded.di1 = ( (bytes[7] & 0x40) != 0 )
        decoded.reserved = ( bytes[7] & 0x3f )             
        
        status.val = ( bytes[8])
        status.send_failures = ( (bytes[8] & 0x01) != 0)
        status.read_write_err = ( (bytes[8] & 0x02) != 0)
        status.radio_err = ( (bytes[8] & 0x04) != 0)
        status.erratic_float = ( (bytes[8] & 0x08) != 0)  
        status.watchdog = ( (bytes[8] & 0x10) != 0)
        status.reserved = ( (bytes[8] & 0xE0) >> 5)
        
        decoded.status = status

    return  decoded
    
def findTime(self, jarray):
    for v in jarray:
        if v.time != None:
            timeVal = v.time.ToString()

    return timeVal
                                    
                                

Testing

To simplify setup and testing of the scripts, the DataBus includes a dedicated testing user interface. This UI allows you to supply test inputs and lets you safely execute the scripts to see if they work how you want them to. You can add multiple test scenarios that are saved so you can retest them later.

screenshot

New Data Event Testing

When you test the new data event scripts, you can pass in a test value and confirm the resulting outputs. This is useful when performing complex manipulations of data, such as converting LoRa payloads into JSON representations.

screenshot

UnSubscribed Event Testing

When you test the unsubscribed event script, you can pass in test value and confirm the resulting operations match you expectations. This interface will output all changes that will be made to the DataBus configuration. The script is executed against a mock instances of the APIs, so no actual changes are made during testing.

>>>>>>> 9f6568cc5f03ed030eaec09519a29417c3426333

Are you ready to scale your IoT deployments?

INDICIUM DataBus UI

Be prepared for the future.

Are you about to embark on an IoT adventure, and want to be prepared for the influx of data? Do you want to leverage Indicium Dynamics' experiences in delivering scalable and cost effective solutions?

Contact Us