These events fire when a new data feed is detected that is not already configured in the DataBus. When these events are fired, the DataBus will look to see if an automation script has been assigned to the DataSource.
The automation scripts are written in Python and allow for the creation of all aspects of the DataBus configuration, including the setup of Dashboards and Widgets. The example script below shows how a message in JSON format can be decoded and used to automatically configure a processing pipeline into Azure Storage Tables, and then configure a dasboard widget to show the data.
Example:
import clr
from System import DateTime,String,Math,TimeSpan,Array
from System.IO import *
from Indicium.DataBus.Common.Data import *
from Indicium.DataBus.Common.Config import *
from Indicium.DataBus.Common import DataCollectionModeEnum
from System.Collections.Generic import List
from Indicium.DataBus.Automation.Helpers import *
from Indicium.DataBus.Core.Widgets import Chart,ChartTimeSeriesDetail
clr.AddReference("NewtonSoft.Json")
from Newtonsoft.Json import *
class Automation:
def __init__(self):
self.heartbeat = 0
def unSubscribed(self, api, dataSource, data):
self.heartbeat += 1
# Read MQTT Package
loraJson = data.Values[0].ParseJson()
iotDeviceName = "DEV-NAME-" + loraJson.sensor.ToString()
# Use API to add input item
dsApi = api.GetDataSourceByUri(dataSource.Uri)
newInputItem = dsApi.AddIntervalInputItem(iotDeviceName, TimeSpan.FromMinutes(15), "Auto Generated", "-3d:1d").AddConfig("portal", loraJson.portal).AddConfig("sensor", loraJson.sensor).Save().Enable()
# Add LocalDB Subscription
pir = newInputItem.AddSubscriptionPipeline("Process Level").Enable()
pir.AddOutputItem("JsonExtractPlugin").AddConfig("path","$.values[?(@.sensor_type=='level')].value").AddConfig("name","PIR").AddConfig("outputType","int")
pir.AddOutputItem("TableStoragePlugin","in-azuretablestorage-storagetables").AddConfig("tableName","Dam").AddConfig("idSelect","OutputItem")
pir.Save()
chart = Chart()
chart.Name = iotDeviceName
chart.Period = "-7d:1d"
chart.ShowTitle = True
chart.LeftMin = 0
chart.RightMin = 100
chart.Size = 6
chart.Height = 100
detail = ChartTimeSeriesDetail()
detail.Uri = "in-azuretablestorage-storagetables/in-azuretablestorage-storagetables-dev-name-" + str(loraJson.sensor) + "-process-level"
detail.Type = "Line"
detail.Axis = 0
chart.TimeSeriesDetailList = Array[ChartTimeSeriesDetail]([detail])
dashboard = api.GetDashboard(4)
dashboard.AddWidget(1, chart)
EmailHelper.SendEmail("from@yourdomain.com","to@yourdomain.com","New Device Configured","New device configured: %s" % iotDeviceName);
return data
These events fire when data is first recieved by the DataBus. This allows data to be manuipulated or analysed before it is passed into any processing pipelines. This is a great place to put logic like analysis of JSON messages for IoT devices where payloads need to be decoded and decryption of messages needs to be applied.
Example:
import clr
from System import DateTime,String,Math,Convert,Globalization
from System.IO import *
from Indicium.Common.Time import *
clr.AddReference('NewtonSoft.Json')
from Newtonsoft.Json import *
class Automation:
def __init__(self):
self.heartbeat = 0
def newData(self, data):
self.heartbeat += 1
loraJson = data.Point.ParseJson()
coded_string = loraJson.data.ToString()
payload = Convert.FromBase64String(coded_string) #decodes base64 to hex
fPort = int(loraJson.fPort)
loraJson.databus_data = Automation.decode(self,fPort,payload)
try:
#Set a new field with the current time
loraJson.databus_time = DateTimeExtensions.ToUnixTime(DateTime.UtcNow)
#Get First occarance of the time
gwTimeVal = Automation.findTime(self,loraJson.rxInfo)
#Parse the time from JSON Format
gwTime = DateTimeExtensions.ParseIsoFormat(gwTimeVal)
#Set a new field with the time
loraJson.gateway_time = DateTimeExtensions.ToUnixTime(gwTime)
# use the time recorded by LORA
data.Point.TimeStamp = gwTime
except Exception as ex:
loraJson.newDataError = str(ex)
data.Point.UpdateJson(loraJson)
return data
def decode(self, fPort, bytes):
decoded = Linq.JObject()
status = Linq.JObject()
if (fPort == 1) or (fPort == 2):
decoded.type = ("alarm" if (fPort == 2) else "periodic")
decoded.uptime = (( bytes[0] << 24) + (bytes[1] << 16) + (bytes[2] << 8) + (bytes[3] << 0))
decoded.rssi =( bytes[4] )
decoded.bat_count = ( (bytes[5] << 8) + bytes[6] )
decoded.di0 = ( (bytes[7] & 0x80) != 0 )
decoded.di1 = ( (bytes[7] & 0x40) != 0 )
decoded.reserved = ( bytes[7] & 0x3f )
status.val = ( bytes[8])
status.send_failures = ( (bytes[8] & 0x01) != 0)
status.read_write_err = ( (bytes[8] & 0x02) != 0)
status.radio_err = ( (bytes[8] & 0x04) != 0)
status.erratic_float = ( (bytes[8] & 0x08) != 0)
status.watchdog = ( (bytes[8] & 0x10) != 0)
status.reserved = ( (bytes[8] & 0xE0) >> 5)
decoded.status = status
return decoded
def findTime(self, jarray):
for v in jarray:
if v.time != None:
timeVal = v.time.ToString()
return timeVal
The Python Plugin can be executed in a processing pipeline to execute custom processes on the data. The are all sorts of possibilities here including:
LoRaWAN Decoders are an easy way to decode data from a LoRaWAN device. If the device you are trying to decode is one of the already supported devices, then the DataBus will attempt to find that decoder and produce a JSON message of your data for you. If a decoder cannot be found, then you can build your own in Python.
The example below shows the decords in play, and how you can add your own decoder logic if required.
Example:
import clr
import binascii
from System import DateTime,String,Math,Convert,Globalization,BitConverter
from System.IO import *
from Indicium.Common.Time import *
from Indicium.DataBus.Automation.Helpers import ByteHelper
from Indicium.DataBus.Automation.Decoders import DecoderFactory
clr.AddReference("NewtonSoft.Json")
from Newtonsoft.Json import *
class Automation:
def __init__(self):
self.heartbeat = 0
def newData(self, data):
self.heartbeat += 1
fPort=0
try:
loraJson = data.Point.ParseJson()
data = loraJson.data.ToString()
fPort = loraJson.port
eui = loraJson.EUI
# use the DecoderFactory to find a potential LoRaWAN Decoder to use
factory = DecoderFactory()
decoder = factory.FindDecoder(eui, data)
if decoder is not None:
decoded = decoder.DecodeFromHex(data, fPort)
# if no decoder found, or decoded data is null
# then you will need to decode yourself
if decoder is None or decoded is None:
decoded = Automation.decodePacketPir(data, fPort)
data.Point.UpdateJson(decoded)
except:
data.Point.UpdateJson("Error Executing Decoder Process")
# raise the exception if you want the DataBus to track failures
raise
return data
def decodePacketPir(self, bytes, fPort):
decoded = Linq.JObject()
decoded.byteLength = len(bytes)
decoded.batteryLevel = float((bytes[3]))/10
decoded.temperature = float((bytes[4] << 8 | bytes[5] ))/100
decoded.lux = bytes[6] << 8 | bytes[7]
decoded.pir = bytes[8]
return decoded
Testing of any automated process is important, so the DataBus features the ability to run tests on all scripts within the system.
The screenshot below shows how you can build a number of test cases where you can set the input data and then generate the output data. Test cases can be saved so you can run them again later, making it easier to test for regression of issues. This example shows the results of decoding a LoRaWAN message from LORIOT into the raw data and associated types.
Are you about to embark on an IoT adventure, and want to be prepared for the influx of data? Do you want to leverage Indicium Dynamics' experiences in delivering scalable and cost effective solutions?
Contact Us