How to work with external timeseries db on TinyAutomator

As you know the Waylay TinyAutomator does not have a broker service and hence no time series data storage too.
But you can still do real-time data streaming/processing using the engine REST API endpoint /api/data.
If you have access to some time series storage like influxdb or Azure time series insights which has REST API to store/query time series data you can actually do some interesting “hack” to store and access data from Waylay engine afterwards or from any other place which can read/visualize time series data, eg. grafana dashboard, azure time series insights explorer, etc…
Let me elaborate a little bit about that.
Waylay TinyAutomator makes use of HAProxy for SSL offloading of HTTPS communication towards engine REST API endpoint. But HAProxy is not just a simple proxying loadbalancer. It has a possibility to run some custom plugins on incoming request/response using LUA scripting language.
The idea is to use that scripting language to intercept incoming data streaming calls to the Waylay engine API endpoint and do additionally push of that payload towards a time series endpoint too. It should be converted to the format of that time series API, but usually it is a simple conversion from one JSON format to another. Just to clarify - the engine will still get original unconverted data.
Let’s do it by some example use case.
So I have some sensor which sends temperature periodically.
I want to let Waylay engine send me a message then it crosses some threshold value using pushover notification sensor. But I also want to include latest 5 temperature values. I have a small influxdb installation. It is a telegraf/influxdb installation (Telegraf / InfluxDB / Grafana on RaspberryPi – From Scratch – NWMichl Blog) currently running on raspberry pi too, but can be somewhere in the cloud too, just should be accessible via HTTP. I will store time series data there, and will create a walay sensor using the Waylay plugins guide which will read data from the time series database and include the last 5 measurements inserted there for my resource.
Here are the steps:

  1. Create a HAProxy LUA script that will read HTTP request body, decode it as a JSON object, construct a new JSON object with a format that is accepted by InfluxDB and push it to InfluxDB REST endpoint. Here is a code example script which pushes data towards telegraf endpoint :
json = loadfile("/etc/haproxy/lua/json.lua")()
local function forward_data(txn, addr, port, path)

    local datab=txn.f:req_body()
    data = json.decode(datab) 
    local converted_data={}
    converted_data["resource"]=data.resource
    converted_data[data.data.parameterName]=data.data.value
    local str_data=json.encode(converted_data)
    core.Debug(str_data)
    -- Set up a request to the service
    local hdrs = {
        [1] = string.format('host: %s:%s', addr, port),
        [2] = 'accept: */*',
        [3] = 'connection: close',
        [4] = string.format('Content-Length: %s', string.len(str_data))
    }

    local req = {
        [1] = string.format('POST %s HTTP/1.1', path),
        [2] = table.concat(hdrs, '\r\n'),

    }

    req = table.concat(req,  '\r\n')
    req = req .. '\r\n\r\n' .. str_data
    -- Use core.tcp to get an instance of the Socket class
    local socket = core.tcp()
    socket:settimeout(5)

    -- Connect to the service and send the request
    if socket:connect(addr, port) then
        if socket:send(req) then
            -- Skip response headers
            while true do
                local line, _ = socket:receive('*l')

                if not line then break end
                if line == '' then break end
            end

            -- Get response body, if any
            local content = socket:receive('*a')

            if content then
                core.Debug('returned content ' .. content)
                return
            end
        else
            core.Alert('Could not connect to server (send)')
        end

        socket:close()
    else
        core.Alert('Could not connect to server (connect)')
    end

end

core.register_action('forwarddata', {'http-req'}, forward_data, 3)

I saved it in my case as /etc/haproxy/lua/forwarder.lua file and also as I am using a custom LUA json library to do JSON manipulations I put it in the same /etc/haproxy/lua directory. You can download it from GitHub link
2. create configuration lines inside your /etc/haproxy/haproxy.cfg file to load LUA script on startup of HAProxy and call it if a message arrives to /api/data path:

global
   lua-load /etc/haproxy/lua/forwarder.lua
....

frontend standalone.waylay.io
   acl api_data path -i -m beg /api/data
   http-request lua.forwarddata 127.0.0.1 9080 /telegraf if api_data
...

Those lines are just additions to existing configuration. http_request line does following: it calls lua.forwarddata plugin which is registered by forwarder.lua script, provides some parameters that are expected by script - those are IP address of telegraf REST endpoint, HTTP port of that endpoint, an http path which will be used as database name to store metrics, and as last part of that line defines a condition at which that plugin will be called. In my case it will be called if api_data ACL will match. That ACL is defined as /api/data path, which means that the plugin will be called ONLY when HTTP call will start with /api/data path, which is a streaming endpoint of Waylay Engine. Do not forget to restart haproxy after you finish configuring HAProxy: sudo systemctl restart haproxy
3. I will also create a custom plugin (see Waylay Documentation for plugin creation guidelines) which will read data from influx db storage. Here is the code of that plugin. I named it InfluxDbMetrics:

{
  "name": "influxdbMetrics",
  "version": "1.0.0",
  "type": "sensor",
  "script": "const axios = require('axios')\nconst {influxdbscheme,influxdbhost, influxdbport, influxdbdatabase,metricname,resourcename} = options.requiredProperties\nconst url=influxdbscheme+'://'+influ
xdbhost+':'+influxdbport+'/query?db='+influxdbdatabase+'&q=select '+metricname+' from '+resourcename\naxios({ url, timeout: 10000 }).then((response) => {\n    send(null, { observedState: 'Success', rawDa
ta: response.data.results[0].series[0] })\n}\n).catch((error) => {\n    send(new Error(error), { observedState: 'Failure', rawData: error })\n})\n\n",
  "metadata": {
    "author": "John Doe",
    "iconURL": "",
    "description": "",
    "rawData": [],
    "requiredProperties": [
      "influxdbhost",
      "influxdbport",
      "metricname",
      "influxdbdatabase",
      "influxdbscheme",
      "resourcename"
    ],
    "supportedStates": [
      "Success",
      "Failure"
    ]
  },
  "dependencies": {
    "influx": "5.9.2"
  }
}

That sensor has some properties related to the influxdb scheme, host, port, database name , resource name and metricname. I will set influxdb properties as global settings in the auth.yaml file (I am using external authentication configuration as it is explained in TinyAutomator installation guide )
4. I will also use pushover notification sensor which you can access on Waylay sensors GitHub page GitHub - waylayio/Sensors: Public sensor repository. I also created global settings related to pushover notification token/userid
5. Now all components are ready to create a template on the engine:

{
  "sensors": [
    {
      "label": "debugDialog_1",
      "name": "debugDialog",
      "version": "1.0.0",
      "properties": {
        "message": "Current value  ${nodes.streamingDataSensor_1.rawData.parameter}  went above threshold ${nodes.streamingDataSensor_1.rawData.threshold}\nlatest temperatures ${nodes.influxdbMetrics_1.r
awData.values[-5:]}"
      },
      "position": [
        824,
        329
      ],
      "dataTrigger": true,
      "tickTrigger": true
    },
    {
      "label": "influxdbMetrics_1",
      "name": "influxdbMetrics",
      "version": "1.0.0",
      "properties": {
        "influxdbscheme": "${globalSettings.influxdbscheme}",
        "metricname": "temperature",
        "influxdbhost": "${globalSettings.influxdbhost}",
        "resourcename": "${task.resource.meta.id}",
        "influxdbport": "${globalSettings.influxfbport}",
        "influxdbdatabase": "${globalSettings.influxdbdatabase}"
      },
      "resource": "$",
      "position": [
        578,
        338
      ],
      "dataTrigger": true,
      "tickTrigger": true
    },
    {
      "label": "pushovernotification_1",
      "name": "pushovernotification",
      "version": "1.0.0",
      "properties": {
        "token": "${globalSettings.pushovertoken}",
        "title": "temperature exceeded threshold",
        "message": "Current value  ${nodes.streamingDataSensor_1.rawData.parameter}  went above threshold ${nodes.streamingDataSensor_1.rawData.threshold}\nlatest temperatures ${nodes.influxdbMetrics_1.r
awData.values[-5:]}",
        "user": "${globalSettings.pushoveruser}"
      },
      "sequence": 0,
      "position": [
        807,
        469
      ],
      "dataTrigger": true,
      "tickTrigger": true
    },
    {
      "label": "streamingDataSensor_1",
      "name": "streamingDataSensor",
      "version": "1.1.4",
      "properties": {
        "parameter": "temperature",
        "threshold": "21"
      },
      "resource": "$",
      "position": [
        271,
        324
      ],
      "dataTrigger": true,
      "tickTrigger": true
    }
  ],
  "triggers": [
    {
      "sourceLabel": "influxdbMetrics_1",
      "destinationLabel": "debugDialog_1",
      "statesTrigger": [
        "Success"
      ]
    },
    {
      "sourceLabel": "streamingDataSensor_1",
      "destinationLabel": "influxdbMetrics_1",
      "statesTrigger": [
        "Above"
      ]
    },
    {
      "sourceLabel": "influxdbMetrics_1",
      "destinationLabel": "pushovernotification_1",
      "statesTrigger": [
        "Success"
      ]
    }
  ],
  "name": "stream",
  "user": "test@waylay.io",
  "createTime": 1629704458353,
  "lastUpdateTime": 1630335424154,
  "discoveryTemplate": false
}

  1. here is part of the global settings on my auth.yaml file (I removed the tokens of my pushover account :wink: ) also you should adjust the influxdbhost to the IP address of your InfluxDB instance:
  settings:
    - key: global_one
      value: value
    - key: global_influxdbhost
      value: 192.168.2.22
    - key: global_influxfbport
      value: "8086"
    - key: global_influxdbscheme
      value: http
    - key: global_influxdbdatabase
      value: telegraf
    - key: global_pushovertoken
      value: xxx
    - key: global_pushoveruser
      value: xxx

And now it is time to test it out.
I have a resource named demo on my TinyAutomator setup. Let’s create a reactive task that references stream template and demo resource and simulates temperature push several times using curl:

curl -k --user apiKey:apiSecret -H "Content-Type:application/json" -X POST -v -d '{                                                                                      
"resource":"demo",
        "data":{
          "parameterName": "temperature",
          "value": 26
        }
      }' "https://standalone.waylay.io/api/data"

You can check the task logs that task has been triggered and see the flow there and receive a notification message on your pushover app.


Here is an example notification which I got on my account:

I was also able to create a dashboard on grafana to visualize that data.
Hope you can do more with that trick.

1 Like