Message based IoT applications
Many applications in the IoT are not transactional by nature and do not adopt a request-response communication pattern, i.e. they do not require returning an immediate response to the emitter of the request (a device or a client application). Rather, these applications imply some kind of batch or long-running processing that would block the caller for an unacceptable amount of time. For example, devices that send data to the back-end that in turn injects it into a decision making or analytics process, should not have to wait for the process to complete. In that situation, a caller would post some data (or ask for the execution of a new task) but not wait for it to be handled. Supporting such use cases is the realm of message oriented middleware that introduce decoupling between the clients and the server applications, either by providing message queuing or through message broadcasting.
Scriptr.io intrinsically tackles such scenarios and supports both message queuing and broadcasting (publish/subscribe). Your client applications can send their messages to their scriptr.io back-end using the AMQP, MQTT protocols, Web Sockets or HTTP. Queuing and broadcasting is also natively available from within your scriptr.io scripts, which allows you to build very flexible and scalable IoT applications.
Creating a queue in your scriptr.io account
Before moving forward, you need to create a “queue” in your scriptr.io account. Note that this is currently a paid feature and this option might not be available to you. To enable it, kindly contact scriptr.io’s support (support@scriptr.io). Also note that a “queue” will actually behave as a task queue or a message topic, depending on how you will use it (more on this in the below).
Sign-in to your scriptrio.io workspace. Click on your username on the top-right corner of the screen then click on “Messaging Protocol”.
In the settings dialog that opens:
- Select a protocol (AMQP or MQTT). In our case, we will pick MQTT
- Select a device, credentials of which will be used by the client code to send messages. If you do not have any devices yet, make sure to create one by clicking on your username, then on “Device Directory”
- Select a channel, to be used as a queue or a topic. If you do not have a channel yet, make sure to create one by clicking on your username, then on “Settings”, then picking the “channels” tab
Once you are done, you should have obtained a username and a password as well as two routing keys: one to publish messages (xxx/yyy/messages) and the other to queue scripts – i.e. queue tasks – (xxx/yyy/invoke), similarly to the below screen capture.
The difference between publishing messages and queuing tasks is the following:
- messages will be published to the channel used when configuring the queue and will be consumed by any entity (script or another client application) that is subscribed to the channel
- tasks are instances of scripts, which are put in a queue and executed in order. The caller needs to clearly specify the targeted script, instances of which are in charge of processing the payload sent along the call
Sample application
In this article, we will create a sample application to illustrate how you can benefit from queuing in scriptr.io. Our application consist of client code – supposedly running on edge gateways – collecting temperature, pressure and coordinates data (simulated) from water pumps and sending it to a scriptr.io endpoint, using HTTP or MQTT. Received data will be transformed into another data structure, enriched with outside temperature obtained from a weather API, then stored in scriptr.io.
Since the devices that are sending data should not have to wait for the above process to be done, we design our application such as they only need to invoke an endpoint that receives the data, then queues a task to process it, as illustrated in the below diagram.
Queuing tasks
As previously mentioned, we assume that our simulated devices respectively use MQTT and HTTP to push data to scriptr.io:
- The devices using MQTT only have to hit the MQTT endpoint and pass the routing key obtained when creating the queue. Since we need to queue a task (i.e. a script), we use the routing key that is suffixed with “/invoke” (e.g. UxJyKTmwFjc2Ml==/my_queue/invoke)
- As for the devices using HTTP, they will communicate with a script (“/blog/queues/feed”) that we expose as a public API. This latter will retrieve the data from the request and use the queue module to queue a new task (a script)
The script that is queued is “blog/queues/processor”. It is in charge of implementing the aforementioned data restructuring, enrichment and storing logic.
We first start with the devices using MQTT, then move to those using HTTP.
MQTT-based devices
To simulate the MQTT-based devices, we implement a simple Node.js client application that uses the MQTT.js library. The application connects to the designated MQTT topic in scriptr.io and publishes random data (temperature, pressure and timestamp) along with its coordinates every 10 seconds. Data is sent as a string “temperature,pressure,coordinates,timestamp”.
The below snippet, extracted from the client’s code, shows how simple it is to queue a task (script) using an MQTT endpoint.
... var msg = { method: "blog/queues/processor", params:{ data: temperature + "," + pressure + "," + coordinates + "," + new Date().toISOString() } }; client.publish(topic, JSON.stringify(msg)); ...
HTTP-based devices
To simulate the HTTP-based devices, we implement another simple Node.js client application that uses the standard node.js https library. The client also generates random data (temperature, pressure and timestamp) every 10 seconds and sends them, along with is coordinates, in JSON format trough an HTTP request sent to the “/blog/queues/feed” API.
The “/blog/queues/feed” API is a back-end script that simply receives the requests, retrieves the data from it’s body then pushes a new task in a queue. In the below code, notice how simple it is to queue the execution of a script from within another script
var queueModule = require("queue"); // import the queue module // use your own channel name to create an instance of a queue var queueInstance = queueModule.getInstance("blog_queuing"); if (request.body) { // create a new task by queuing an instance of the script var resp = queueInstance.queue("blog/queues/processor", request.body); // ... } // ...
Queued logic
In the below, we implement the “/blog/queues/processor” script that contains the back-end logic that is executed asynchronously (automatically retrieved from the queue). This script uses the data from the initial request (MQTT message or HTTP request), transforms it if needed, enrich it with the outside temperature based on the provided coordinates then persist the data into scriptr.io’s storage.
var log = require("log"); log.setLevel("info"); var http = require("http"); var document = require("document"); const APIKEY = "YOUR_WEATHER_API_APIKEY"; // replace with you API key from https://openweathermap.org const WEATHER_API = "http://api.openweathermap.org/data/2.5/weather"; var data = request.body ? request.body : request.parameters.data; // if no data received, log it and return an empty object if (!data){ log.warn("/blog/queues/processor: received empty request body"); return {}; } //if the incoming data structure is of type string (mqtt message), transform it var dataObj = {}; if (typeof(data) == "string") { dataObj = transformData(data); }else{ dataObj = data; } // get the outside temperate from the weather API var outsideTemperature = getOutideTemperature(dataObj.coordinates); if (outsideTemperature) { dataObj.outsideTemperature = outsideTemperature; } // persist the data object into scriptr.io's persistent storage saveToDataStore(dataObj); // return the data object return dataObj; function transformData(data) { //... details in the script on Github } function getOutideTemperature(coordinates) { //... details in the script on Github } function saveToDataStore(dataObj) { //... details in the script on Github }
Conclusion
Using scriptr.io’s queuing system, you can create very flexible and scalable IoT applications. In this article, we described how to queue tasks (execution of scripts). Processing and broadcasting messages is as simple and uses the same concepts. We will cover these in another blog post. Stay tuned.