Create an MQTT to SQL Data Pipeline
At the end of this tutorial, we will see data starting from an MQTT broker and ending in a PostgreSQL table.
We'll use 2 connectors:
- Inbound MQTT connector
- Outbound SQL connector
- There will be an example of combining multiple SmartModules, known as SmartModule chaining
 
The Outbound connector will be using a PostgreSQL database. It will listen to the topic for new records and insert them into a table.
You can use your own PostgreSQL instance, if it can be reached over the internet. But you can still follow along by creating a PostgreSQL database at a hosting service, such as ElephantSQL.
Setup
Start MQTT Connector
This connector expects to take json input from the MQTT broker, from an MQTT topic named ag-mqtt-topic. These parameters will be reflected in the final JSON payload that gets produced to the fluvio topic mqtt-topic
MQTT connector config: mqtt.yml
All versions are marked with x.y.z. To find the latest version, run:
- fluvio hub connector list
- fluvio hub smartmodule list
# mqtt.yml
apiVersion: 0.1.0
meta:
 version: x.y.z
 name: fluvio-mqtt-connector
 type: mqtt-source
 topic: mqtt-topic
 direction: source
mqtt:
 url: "mqtt://test.mosquitto.org/"
 topic: "ag-mqtt-topic"
 timeout:
   secs: 30
   nanos: 0
 payload_output_type: json
Create MQTT connector
$ fluvio cloud connector create --config mqtt.yml
Install mosquito - MQTT client
First install mosquito to follow later steps for sending JSON to our test MQTT broker
-> On MacOS, you can install mosquitto with homebrew with the following command: 
    brew install mosquitto
Start SQL connector(s)
You can start one of both of the following connectors
- Connector with no transformation
- Download SmartModule for example
- Example connector config
- Start connector
 
- Connector with extra JSON to JSON transformation
- Download SmartModules for example
- Example connector config
- Start connector
 
SQL Connector with no transformation
Download json-sql SmartModule
Example output
$ fluvio hub sm download infinyon/json-sql@x.y.z
downloading infinyon/json-sql@x.y.z to infinyon-json-sql-x.y.z.ipkg
... downloading complete
... checking package
trying connection to fluvio router.dev.infinyon.cloud:9003
... cluster smartmodule install complete
SQL Connector with no transformation config
# sql.yml
apiVersion: 0.1.0
meta:
 name: fluvio-sql-connector
 type: sql-sink
 version: x.y.z
 topic: mqtt-topic
sql:
 url: "postgres://user:password@db.postgreshost.example/dbname"
transforms:
 - uses: infinyon/json-sql@x.y.z
   with:
     invoke: insert
     mapping:
       table: "topic_message"
       map-columns:
         "device_id":
           json-key: "payload.device.device_id"
           value:
             type: "int"
             default: "0"
             required: true
         "record":
           json-key: "$"
           value:
             type: "jsonb"
             required: true
Start No transformation connector SQL connector
$ fluvio cloud connector create --config sql.yml
Connector with JSON to JSON transformation
Download the Jolt and Json-Sql SmartModules used by this example connector
Example output
$ fluvio hub sm download infinyon/json-sql@x.y.z
downloading infinyon/json-sql@x.y.z to infinyon-json-sql-x.y.z.ipkg
... downloading complete
... checking package
trying connection to fluvio router.infinyon.cloud:9003
... cluster smartmodule install complete
$ fluvio hub sm download infinyon/jolt@x.y.z
downloading infinyon/jolt@x.y.z to infinyon-jolt-x.y.z.ipkg
... downloading complete
... checking package
trying connection to fluvio router.infinyon.cloud:9003
... cluster smartmodule install complete
Connector with JSON to JSON transformation config
# sql-chain.yml
apiVersion: 0.1.0
meta:
 name: fluvio-sql-connector-chain
 type: sql-sink
 version: x.y.z
 topic: mqtt-topic
sql:
 url: "postgres://user:password@db.postgreshost.example/dbname"
 rust_log: "sql_sink=INFO,sqlx=WARN"
transforms:
 - uses: infinyon/jolt@x.y.z
   with:
     spec:
       - operation: shift
         spec:
           payload:
             device: "device"
       - operation: default
         spec:
           device:
             type: "mobile"
 - uses: infinyon/json-sql@x.y.z
   with:
     invoke: insert
     mapping:
       table: "topic_message"
       map-columns:
         "device_id":
           json-key: "device.device_id"
           value:
             type: "int"
             default: "0"
             required: true
         "record":
           json-key: "$"
           value:
             type: "jsonb"
             required: true
Start SQL connector with JSON transformation
$ fluvio cloud connector create --config sql-chain.yml
Install pgcli - PostgreSQL client
Install pgcli to follow the later DB validation steps
https://www.pgcli.com
On MacOS, you can install pgcli with homebrew with the following command:
$ brew install pgcli
The actual test
📋 Using example JSON, this is the sequence of events that will occur
- (user) [Publish JSON to MQTT broker][#publish-json-to-mqtt-broker"]
- (Inbound MQTT connector) Produce data to fluvio topic mqtt-topic- Produce a transformed JSON object with config parameter data with the name of the MQTT topic embedded
 
- (Outbound SQL connector) Consume the inbound record from topic mqtt-topic- Apply transformations to record (JSON to JSON connector only)
- Insert record into DB
 
- (user) [Validate JSON record in PostgreSQL database]
If you are starting with a new database, you will need to create the table before sending messages to MQTT. It is not created automatically.
Table create query
create table topic_message(device_id int, record jsonb);
This is what our input JSON to MQTT looks like
example JSON (formatted)
{
   "device": {
       "device_id": 17,
       "name": "device17"
   }
}
Publish JSON to MQTT broker
Run the following to send a test JSON message to the demo MQTT broker with mosquito ([Installation steps])
Command:
$ mosquitto_pub -h test.mosquitto.org -t ag-mqtt-topic -m '{"device": {"device_id":17, "name":"device17"}}'
Produced data in topic:
$ fluvio consume mqtt-topic -B
Consuming records from the beginning of topic 'mqtt-topic'
{"mqtt_topic":"ag-mqtt-topic","payload":{"device":{"device_id":17,"name":"device17"}}}
Produced data in topic:
Run the following to connect to PostgreSQL DB with pgcli ([Installation steps]
View output in PostgreSQL
Use pgcli to examine the database.
$ pgcli -U user -h db.postgreshost.example -p 5432 dbname
Check that the JSON from MQTT has been inserted into table
select * from topic_message;
Example output from both connectors
+-----------+-----------------------------------------------------------------------------------------------+
| device_id | record                                                                                        |
|-----------+-----------------------------------------------------------------------------------------------|
| 17        | {"payload": {"device": {"name": "device17", "device_id": 17}}, "mqtt_topic": "ag-mqtt-topic"} |
| 17        | {"device": {"name": "device17", "type": "mobile", "device_id": 17}}                           |
+-----------+-----------------------------------------------------------------------------------------------+
SELECT 2
Time: 0.080s
Output explanation:
In both cases, we’ve used the device_id key in the MQTT JSON as the value in the column of the same name. The first row is from our No Transformation connector. The record data appears unchanged from what we saw in the topic.
Resulting record
{
   "payload": {
       "device": {
           "name": "device17",
           "device_id": 17
       }
   },
   "mqtt_topic": "ag-mqtt-topic"
}
The second row is from our JSON to JSON transformation connector
We’ve shifted the topic JSON data, so it more closely resembles the original JSON.
Then we enrich the payload by adding the .device.type key with the value mobile before inserting into the DB
{
   "device": {
       "name": "device17",
       "type": "mobile",
       "device_id": 17
   }
}
Move transformation to MQTT Connector
- Transformations in the transformssection of SQL Connector config are deliberately decoupled from connectors. We can move a SmartModule from an Inbound to an Outbound connector and accomplish the same result. The decision depends on the shape of the data you want to store in a topic. For Inbound connectors, the data is transformed before sending to Fluvio topic, while for Outbound, it happens after the data is sent to Fluvio topic but before it is sent to the connector.
Let's try it.
Modify our mqtt.yml config with one transformation that we are moving from the SQL Connector:
# mqtt.yml
apiVersion: 0.1.0
meta:
 version: x.y.z
 name: fluvio-mqtt-connector
 type: mqtt-source
 topic: mqtt-topic
 direction: source
mqtt:
 url: "mqtt://test.mosquitto.org/"
 topic: "ag-mqtt-topic"
 timeout:
   secs: 30
   nanos: 0
 payload_output_type: json
transforms:
 - uses: infinyon/jolt@x.y.z
   with:
     spec:
       - operation: shift
         spec:
           payload:
             device: "device"
       - operation: default
         spec:
           device:
             type: "mobile"
We don’t need this transformation on SQL Connector anymore, remove it from sql-chain.yml file:
# sql-chain.yml
apiVersion: 0.1.0
meta:
 name: fluvio-sql-connector-chain
 type: sql-sink
 version: x.y.z
 topic: mqtt-topic
sql:
 url: "postgres://user:password@db.postgreshost.example/dbname"
transforms:
 - uses: infinyon/json-sql@x.y.z
   with:
     invoke: insert
     mapping:
       table: "topic_message"
       map-columns:
         "device_id":
           json-key: "device.device_id"
           value:
             type: "int"
             default: "0"
             required: true
         "record":
           json-key: "$"
           value:
             type: "jsonb"
             required: true
We need to re-create connectors:
$ fluvio cloud connector delete fluvio-mqtt-connector
$ fluvio cloud connector create --config mqtt.yml
also, we delete one now obsolete SQL connector and re-create another without the transformation that we moved to MQTT:
$ fluvio cloud connector delete fluvio-sql-connector-chain
$ fluvio cloud connector delete fluvio-sql-connector
$ fluvio cloud connector create --config sql-chain.yml
And now, if we execute command:
$ mosquitto_pub -h test.mosquitto.org -t ag-mqtt-topic -m '{"device": {"device_id":17, "name":"device17"}}'
The new record differs from what we saw previously:
$ fluvio consume mqtt-topic -B
Consuming records from the beginning of topic 'mqtt-topic'
{"mqtt_topic":"ag-mqtt-topic","payload":{"device":{"device_id":17,"name":"device17"}}}
{"device":{"device_id":17,"name":"device17","type":"mobile"}}
We can see that the record was transformed before producing to Fluvio cluster.
However, in the database table, the new record equals to the previous one.
+-----------+-----------------------------------------------------------------------------------------------+
| device_id | record                                                                                        |
|-----------+-----------------------------------------------------------------------------------------------|
| 17        | {"payload": {"device": {"name": "device17", "device_id": 17}}, "mqtt_topic": "ag-mqtt-topic"} |
| 17        | {"device": {"name": "device17", "type": "mobile", "device_id": 17}}                           |
| 17        | {"device": {"name": "device17", "type": "mobile", "device_id": 17}}                           |
+-----------+-----------------------------------------------------------------------------------------------+
SELECT 3
Time: 0.080s
Although the final result is the same (the same records will end up in SQL database with the same content), choosing the proper side of a pipeline where transformations should reside may significantly affect performance on high volumes of data.