Automation

Last updated: November 11th, 2020

UnSubscribed Events

These events fire when a new data feed is detected that is not already configured in the DataBus. When these events are fired, the DataBus will look to see if an automation script has been assigned to the DataSource.

The automation scripts are written in Python and allow for the creation of all aspects of the DataBus configuration, including the setup of Dashboards and Widgets. The example script below shows how a message in JSON format can be decoded and used to automatically configure a processing pipeline into Azure Storage Tables, and then configure a dasboard widget to show the data.

Example:

import clr
from System import DateTime,String,Math,TimeSpan,Array
from System.IO import *
from Indicium.DataBus.Common.Data import *
from Indicium.DataBus.Common.Config import *
from Indicium.DataBus.Common import DataCollectionModeEnum
from System.Collections.Generic import List
from Indicium.DataBus.Automation.Helpers import *
from Indicium.DataBus.Core.Widgets import Chart,ChartTimeSeriesDetail

clr.AddReference("NewtonSoft.Json")

from Newtonsoft.Json import *

class Automation:
    
    def __init__(self):
        self.heartbeat = 0
        
    def unSubscribed(self, api, dataSource, data):
        self.heartbeat += 1     
        
        # Read MQTT Package
        loraJson = data.Values[0].ParseJson()
        iotDeviceName = "DEV-NAME-" + loraJson.sensor.ToString()
        
        # Use API to add input item
        dsApi = api.GetDataSourceByUri(dataSource.Uri)
        newInputItem = dsApi.AddIntervalInputItem(iotDeviceName, TimeSpan.FromMinutes(15), "Auto Generated", "-3d:1d").AddConfig("portal", loraJson.portal).AddConfig("sensor", loraJson.sensor).Save().Enable()
        
        # Add LocalDB Subscription
        pir = newInputItem.AddSubscriptionPipeline("Process Level").Enable()
        pir.AddOutputItem("JsonExtractPlugin").AddConfig("path","$.values[?(@.sensor_type=='level')].value").AddConfig("name","PIR").AddConfig("outputType","int")
        pir.AddOutputItem("TableStoragePlugin","in-azuretablestorage-storagetables").AddConfig("tableName","Dam").AddConfig("idSelect","OutputItem")
        pir.Save()
        
        chart = Chart()
        chart.Name = iotDeviceName
        chart.Period = "-7d:1d"
        chart.ShowTitle = True
        chart.LeftMin = 0
        chart.RightMin = 100
        chart.Size = 6
        chart.Height = 100
        detail = ChartTimeSeriesDetail()
        detail.Uri = "in-azuretablestorage-storagetables/in-azuretablestorage-storagetables-dev-name-" + str(loraJson.sensor) + "-process-level"
        detail.Type = "Line"
        detail.Axis = 0
        chart.TimeSeriesDetailList = Array[ChartTimeSeriesDetail]([detail])
        
        dashboard = api.GetDashboard(4)
        dashboard.AddWidget(1, chart)        
        
        EmailHelper.SendEmail("from@yourdomain.com","to@yourdomain.com","New Device Configured","New device configured: %s" % iotDeviceName);        
            
        return data

New Data Events

These events fire when data is first recieved by the DataBus. This allows data to be manuipulated or analysed before it is passed into any processing pipelines. This is a great place to put logic like analysis of JSON messages for IoT devices where payloads need to be decoded and decryption of messages needs to be applied.

Example:

import clr
from System import DateTime,String,Math,Convert,Globalization
from System.IO import *
from Indicium.Common.Time import *
clr.AddReference('NewtonSoft.Json')

from Newtonsoft.Json import *

class Automation:
    def __init__(self):
        self.heartbeat = 0          

    def newData(self, data):
        self.heartbeat += 1
                
        loraJson = data.Point.ParseJson()
        coded_string = loraJson.data.ToString()
        payload = Convert.FromBase64String(coded_string) #decodes base64 to hex
        fPort = int(loraJson.fPort)
        loraJson.databus_data = Automation.decode(self,fPort,payload)
        try:
            #Set a new field with the current time
            loraJson.databus_time = DateTimeExtensions.ToUnixTime(DateTime.UtcNow)
            #Get First occarance of the time
            gwTimeVal = Automation.findTime(self,loraJson.rxInfo)
            #Parse the time from JSON Format
            gwTime = DateTimeExtensions.ParseIsoFormat(gwTimeVal)
            #Set a new field with the time
            loraJson.gateway_time = DateTimeExtensions.ToUnixTime(gwTime)
            # use the time recorded by LORA
            data.Point.TimeStamp = gwTime
        except Exception as ex:
            loraJson.newDataError = str(ex)
            
        data.Point.UpdateJson(loraJson)
        
        return data

    def decode(self, fPort, bytes):               
        decoded = Linq.JObject()
        status = Linq.JObject()

        if (fPort == 1) or (fPort == 2):
            decoded.type = ("alarm" if (fPort == 2) else "periodic")
            decoded.uptime = (( bytes[0] << 24) + (bytes[1] << 16) + (bytes[2] << 8) + (bytes[3] << 0))
            decoded.rssi =( bytes[4] )
            decoded.bat_count = ( (bytes[5] << 8) + bytes[6] )
            decoded.di0 = ( (bytes[7] & 0x80) != 0 )
            decoded.di1 = ( (bytes[7] & 0x40) != 0 )
            decoded.reserved = ( bytes[7] & 0x3f )             
            
            status.val = ( bytes[8])
            status.send_failures = ( (bytes[8] & 0x01) != 0)
            status.read_write_err = ( (bytes[8] & 0x02) != 0)
            status.radio_err = ( (bytes[8] & 0x04) != 0)
            status.erratic_float = ( (bytes[8] & 0x08) != 0)  
            status.watchdog = ( (bytes[8] & 0x10) != 0)
            status.reserved = ( (bytes[8] & 0xE0) >> 5)
            
            decoded.status = status

        return  decoded
        
    def findTime(self, jarray):
        for v in jarray:
            if v.time != None:
                timeVal = v.time.ToString()
        return timeVal
    

Python Plugin

The Python Plugin can be executed in a processing pipeline to execute custom processes on the data. The are all sorts of possibilities here including:

  • Analysing data for alarm conditions and send Emails/SMS messages
  • Transform data using a custom algorithm
  • Transform data using a look-up table
  • Read or write data from a custom system

LoRaWAN Decoders

LoRaWAN Decoders are an easy way to decode data from a LoRaWAN device. If the device you are trying to decode is one of the already supported devices, then the DataBus will attempt to find that decoder and produce a JSON message of your data for you. If a decoder cannot be found, then you can build your own in Python.

The example below shows the decords in play, and how you can add your own decoder logic if required.

Example:

import clr
import binascii
from System import DateTime,String,Math,Convert,Globalization,BitConverter
from System.IO import *
from Indicium.Common.Time import *
from Indicium.DataBus.Automation.Helpers import ByteHelper
from Indicium.DataBus.Automation.Decoders import DecoderFactory

clr.AddReference("NewtonSoft.Json")
from Newtonsoft.Json import *

class Automation:
    def __init__(self):
        self.heartbeat = 0          

    def newData(self, data):
        self.heartbeat += 1
        fPort=0
        
        try:
            loraJson = data.Point.ParseJson()
            data = loraJson.data.ToString()
            fPort = loraJson.port
            eui = loraJson.EUI
            
            # use the DecoderFactory to find a potential LoRaWAN Decoder to use
            factory = DecoderFactory()
            decoder = factory.FindDecoder(eui, data)
            if decoder is not None:
                decoded = decoder.DecodeFromHex(data, fPort)
            
            # if no decoder found, or decoded data is null 
            # then you will need to decode yourself
            if decoder is None or decoded is None:
                decoded = Automation.decodePacketPir(data, fPort)
                
            data.Point.UpdateJson(decoded)    
        except:
            data.Point.UpdateJson("Error Executing Decoder Process")
            # raise the exception if you want the DataBus to track failures
            raise
        
        return data

    def decodePacketPir(self, bytes, fPort):               
        decoded = Linq.JObject()
        decoded.byteLength = len(bytes)
        
        decoded.batteryLevel = float((bytes[3]))/10
        decoded.temperature = float((bytes[4] << 8 | bytes[5] ))/100
        decoded.lux = bytes[6] << 8 | bytes[7]
        decoded.pir = bytes[8]
        
        return  decoded
    

Testing

Testing of any automated process is important, so the DataBus features the ability to run tests on all scripts within the system.

The screenshot below shows how you can build a number of test cases where you can set the input data and then generate the output data. Test cases can be saved so you can run them again later, making it easier to test for regression of issues. This example shows the results of decoding a LoRaWAN message from LORIOT into the raw data and associated types.

Are you ready to scale your IoT deployments?

INDICIUM DataBus UI

Be prepared for the future.

Are you about to embark on an IoT adventure, and want to be prepared for the influx of data? Do you want to leverage Indicium Dynamics' experiences in delivering scalable and cost effective solutions?

Contact Us