Automation

Last updated: January 3rd, 2020

UnSubscribed Events

These events fire when a new data feed is detected that is not already configured in the DataBus. When these events are fired, the DataBus will look to see if an automation script has been assigned to the DataSource.

The automation scripts are written in Python and allow for the creation of all aspects of the DataBus configuration, including the setup of Dashboards and Widgets. The example script below shows how a message in JSON format can be decoded and used to automatically configure a processing pipeline into Azure Storage Tables, and then configure a dasboard widget to show the data.

Example:

import clr
from System import DateTime,String,Math,TimeSpan,Array
from System.IO import *
from Indicium.DataBus.Common.Data import *
from Indicium.DataBus.Common.Config import *
from Indicium.DataBus.Common import DataCollectionModeEnum
from System.Collections.Generic import List
from Indicium.DataBus.Automation.Helpers import *
from Indicium.DataBus.Core.Widgets import Chart,ChartTimeSeriesDetail

clr.AddReference("NewtonSoft.Json")

from Newtonsoft.Json import *

class Automation:
    
    def __init__(self):
        self.heartbeat = 0
        
    def unSubscribed(self, api, dataSource, data):
        self.heartbeat += 1     
        
        # Read MQTT Package
        loraJson = data.Values[0].ParseJson()
        iotDeviceName = "DEV-NAME-" + loraJson.sensor.ToString()
        
        # Use API to add input item
        dsApi = api.GetDataSourceByUri(dataSource.Uri)
        newInputItem = dsApi.AddIntervalInputItem(iotDeviceName, TimeSpan.FromMinutes(15), "Auto Generated", "-3d:1d").AddConfig("portal", loraJson.portal).AddConfig("sensor", loraJson.sensor).Save().Enable()
        
        # Add LocalDB Subscription
        pir = newInputItem.AddSubscriptionPipeline("Process Level").Enable()
        pir.AddOutputItem("JsonExtractPlugin").AddConfig("path","$.values[?(@.sensor_type=='level')].value").AddConfig("name","PIR").AddConfig("outputType","int")
        pir.AddOutputItem("TableStoragePlugin","in-azuretablestorage-storagetables").AddConfig("tableName","Dam").AddConfig("idSelect","OutputItem")
        pir.Save()
        
        chart = Chart()
        chart.Name = iotDeviceName
        chart.Period = "-7d:1d"
        chart.ShowTitle = True
        chart.LeftMin = 0
        chart.RightMin = 100
        chart.Size = 6
        chart.Height = 100
        detail = ChartTimeSeriesDetail()
        detail.Uri = "in-azuretablestorage-storagetables/in-azuretablestorage-storagetables-dev-name-" + str(loraJson.sensor) + "-process-level"
        detail.Type = "Line"
        detail.Axis = 0
        chart.TimeSeriesDetailList = Array[ChartTimeSeriesDetail]([detail])
        
        dashboard = api.GetDashboard(4)
        dashboard.AddWidget(1, chart)        
        
        EmailHelper.SendEmail("from@yourdomain.com","to@yourdomain.com","New Device Configured","New device configured: %s" % iotDeviceName);        
            
        return data

New Data Events

These events fire when data is first recieved by the DataBus. This allows data to be manuipulated or analysed before it is passed into any processing pipelines. This is a great place to put logic like analysis of JSON messages for IoT devices where payloads need to be decoded and decryption of messages needs to be applied.

Example:

import clr
from System import DateTime,String,Math,Convert,Globalization
from System.IO import *
from Indicium.Common.Time import *
clr.AddReference('NewtonSoft.Json')

from Newtonsoft.Json import *

class Automation:
    def __init__(self):
        self.heartbeat = 0          

    def newData(self, data):
        self.heartbeat += 1
                
        loraJson = data.Point.ParseJson()
        coded_string = loraJson.data.ToString()
        payload = Convert.FromBase64String(coded_string) #decodes base64 to hex
        fPort = int(loraJson.fPort)
        loraJson.databus_data = Automation.decode(self,fPort,payload)
        try:
            #Set a new field with the current time
            loraJson.databus_time = DateTimeExtensions.ToUnixTime(DateTime.UtcNow)
            #Get First occarance of the time
            gwTimeVal = Automation.findTime(self,loraJson.rxInfo)
            #Parse the time from JSON Format
            gwTime = DateTimeExtensions.ParseIsoFormat(gwTimeVal)
            #Set a new field with the time
            loraJson.gateway_time = DateTimeExtensions.ToUnixTime(gwTime)
            # use the time recorded by LORA
            data.Point.TimeStamp = gwTime
        except Exception as ex:
            loraJson.newDataError = str(ex)
            
        data.Point.UpdateJson(loraJson)
        
        return data

    def decode(self, fPort, bytes):               
        decoded = Linq.JObject()
        status = Linq.JObject()

        if (fPort == 1) or (fPort == 2):
            decoded.type = ("alarm" if (fPort == 2) else "periodic")
            decoded.uptime = (( bytes[0] << 24) + (bytes[1] << 16) + (bytes[2] << 8) + (bytes[3] << 0))
            decoded.rssi =( bytes[4] )
            decoded.bat_count = ( (bytes[5] << 8) + bytes[6] )
            decoded.di0 = ( (bytes[7] & 0x80) != 0 )
            decoded.di1 = ( (bytes[7] & 0x40) != 0 )
            decoded.reserved = ( bytes[7] & 0x3f )             
            
            status.val = ( bytes[8])
            status.send_failures = ( (bytes[8] & 0x01) != 0)
            status.read_write_err = ( (bytes[8] & 0x02) != 0)
            status.radio_err = ( (bytes[8] & 0x04) != 0)
            status.erratic_float = ( (bytes[8] & 0x08) != 0)  
            status.watchdog = ( (bytes[8] & 0x10) != 0)
            status.reserved = ( (bytes[8] & 0xE0) >> 5)
            
            decoded.status = status

        return  decoded
        
    def findTime(self, jarray):
        for v in jarray:
            if v.time != None:
                timeVal = v.time.ToString()
        return timeVal
    

Python Plugin

The Python Plugin can be executed in a processing pipeline to execute custom processes on the data. The are all sorts of possibilities here including:

  • Analysing data for alarm conditions and send Emails/SMS messages
  • Transform data using a custom algorithm
  • Transform data using a look-up table
  • Read or write data from a custom system

Are you ready to scale your IoT deployments?

INDICIUM DataBus UI

Be prepared for the future.

Are you about to embark on an IoT adventure, and want to be prepared for the influx of data? Do you want to leverage Indicium Dynamics' experiences in delivering scalable and cost effective solutions?

Contact Us