Message based IoT applications

Many applications in the IoT are not transactional by nature and do not adopt a request-response communication pattern, i.e. they do not require returning an immediate response to the emitter of the request (a device or a client application). Rather, these applications imply some kind of batch or long-running processing that would block the caller for an unacceptable amount of time. For example, devices that send data to the back-end that in turn injects it into a decision making or analytics process, should not have to wait for the process to complete. In that situation, a caller would post some data (or ask for the execution of a new task) but not wait for it to be handled. Supporting such use cases is the realm of message oriented middleware that introduce decoupling between the clients and the server applications, either by providing message queuing or through message broadcasting.

Scriptr.io intrinsically tackles such scenarios and supports both message queuing and broadcasting (publish/subscribe). Your client applications can send their messages to their scriptr.io back-end using the AMQP, MQTT protocols, Web Sockets or HTTP. Queuing and broadcasting is also natively available from within your scriptr.io scripts, which allows you to build very flexible and scalable IoT applications.

Creating a queue in your scriptr.io account

Before moving forward, you need to create a “queue” in your scriptr.io account. Note that this is currently a paid feature and this option might not be available to you. To enable it, kindly contact scriptr.io’s support (support@scriptr.io). Also note that a “queue” will actually behave as a task queue or a message topic, depending on how you will use it (more on this in the below).

Sign-in to your scriptrio.io workspace. Click on your username on the top-right corner of the screen then click on “Messaging Protocol”.

select-messaging-protocols

In the settings dialog that opens:

  • Select a protocol (AMQP or MQTT). In our case, we will pick MQTT
  • Select a device, credentials of which will be used by the client code to send messages. If you do not have any devices yet, make sure to create one by clicking on your username, then on “Device Directory”
  • Select a channel, to be used as a queue or a topic. If you do not have a channel yet, make sure to create one by clicking on your username, then on “Settings”, then picking the “channels” tab

Once you are done, you should have obtained a username and a password as well as two routing keys: one to publish messages (xxx/yyy/messages) and the other to queue scripts – i.e. queue tasks – (xxx/yyy/invoke), similarly to the below screen capture.

messaging-mqtt

The difference between publishing messages and queuing tasks is the following:

  • messages will be published to the channel used when configuring the queue and will be consumed by any entity (script or another client application) that is subscribed to the channel
  • tasks are instances of scripts, which are put in a queue and executed in order. The caller needs to clearly specify the targeted script, instances of which are in charge of processing the payload sent along the call

Sample application

In this article, we will create a sample application to illustrate how you can benefit from queuing in scriptr.io. Our application consist of client code – supposedly running on edge gateways – collecting temperature, pressure and coordinates data (simulated) from water pumps and sending it to a scriptr.io endpoint, using HTTP or MQTT. Received data will be transformed into another data structure, enriched with outside temperature obtained from a weather API, then stored in scriptr.io.

Since the devices that are sending data should not have to wait for the above process to be done, we design our application such as they only need to invoke an endpoint that receives the data, then queues a task to process it, as illustrated in the below diagram.

use-case

Queuing tasks

As previously mentioned, we assume that our simulated devices respectively use MQTT and HTTP to push data to scriptr.io:

  • The devices using MQTT only have to hit the MQTT endpoint and pass the routing key obtained when creating the queue. Since we need to queue a task (i.e. a script), we use the routing key that is suffixed with “/invoke” (e.g. UxJyKTmwFjc2Ml==/my_queue/invoke)
  • As for the devices using HTTP, they will communicate with a script (“/blog/queues/feed”) that we expose as a public API. This latter will retrieve the data from the request and use the queue module to queue a new task (a script)

The script that is queued is “blog/queues/processor”. It is in charge of implementing the aforementioned data restructuring, enrichment and storing logic.

principle

We first start with the devices using MQTT, then move to those using HTTP.

MQTT-based devices

To simulate the MQTT-based devices, we implement a simple Node.js client application that uses the MQTT.js library. The application connects to the designated MQTT topic in scriptr.io and publishes random data (temperature, pressure and timestamp) along with its coordinates every 10 seconds. Data is sent as a string “temperature,pressure,coordinates,timestamp”.

Check the code

The below snippet, extracted from the client’s code, shows how simple it is to queue a task (script) using an MQTT endpoint.

...
var msg = {
    method: "blog/queues/processor",
    params:{
	data: temperature + "," + pressure + "," + coordinates + "," + new Date().toISOString()
    }
};
client.publish(topic, JSON.stringify(msg));
...

HTTP-based devices

To simulate the HTTP-based devices, we implement another simple Node.js client application that uses the standard node.js https library. The client also generates random data (temperature, pressure and timestamp) every 10 seconds and sends them, along with is coordinates, in JSON format trough an HTTP request sent to the “/blog/queues/feed” API.

Check the code

The “/blog/queues/feed” API is a back-end script that simply receives the requests, retrieves the data from it’s body then pushes a new task in a queue. In the below code, notice how simple it is to queue the execution of a script from within another script

var queueModule = require("queue"); // import the queue module
// use your own channel name to create an instance of a queue
var queueInstance = queueModule.getInstance("blog_queuing"); 

if (request.body) {
    // create a new task by queuing an instance of the script
    var resp = queueInstance.queue("blog/queues/processor", request.body);
    // ...
}
// ...

Get the code

Queued logic

In the below, we implement the “/blog/queues/processor” script that contains the back-end logic that is executed asynchronously (automatically retrieved from the queue). This script uses the data from the initial request (MQTT message or HTTP request), transforms it if needed, enrich it with the outside temperature based on the provided coordinates then persist the data into scriptr.io’s storage.

var log = require("log"); log.setLevel("info");
var http = require("http");
var document = require("document");

const APIKEY = "YOUR_WEATHER_API_APIKEY"; // replace with you API key from https://openweathermap.org
const WEATHER_API = "http://api.openweathermap.org/data/2.5/weather";

var data = request.body ? request.body : request.parameters.data;

// if no data received, log it and return an empty object
if (!data){
    log.warn("/blog/queues/processor: received empty request body");
    return {};
}

//if the incoming data structure is of type string (mqtt message), transform it
var dataObj = {};
if (typeof(data) == "string") {
    dataObj = transformData(data);
}else{
    dataObj = data;
}

// get the outside temperate from the weather API
var outsideTemperature = getOutideTemperature(dataObj.coordinates);
if (outsideTemperature) {
    dataObj.outsideTemperature = outsideTemperature;
}

// persist the data object into scriptr.io's persistent storage
saveToDataStore(dataObj);

// return the data object
return dataObj;

function transformData(data) {
   //... details in the script on Github
}

function getOutideTemperature(coordinates) {
   //... details in the script on Github
}

function saveToDataStore(dataObj) {
    //... details in the script on Github
}

Check the complete script

Conclusion

Using scriptr.io’s queuing system, you can create very flexible and scalable IoT applications. In this article, we described how to queue tasks (execution of scripts). Processing and broadcasting messages is as simple and uses the same concepts. We will cover these in another blog post. Stay tuned.

Get all the code from Github