Learn, How to save MQTT messages into a MySQL Database

This didn’t sound right to me and a lot of questions were arising in my mind,

  • There are so many open source and commercial brokers available, doesn’t any of them allow saving data to a database?
  • So many IOT platforms including AWS, Google & IBM support MQTT, How are they able to store the incoming MQTT messages?

The quest for a free MQTT broker which allows to save messages to a database :

This lead me into researching more into the open source MQTT brokers like Mosquitto, But even the most popular Mosquitto broker was also not supporting any customization to save data to a database, which is the most essential next step of concentrating the received data from IOT devices.

There i came across this awesome Open Source MQTT Broker called EMQ (Erlang MQTT Broker) defined as below on their site,

EMQ (Erlang MQTT Broker) is a distributed, massively scalable, highly extensible MQTT message broker written in Erlang/OTP. EMQ is fully open source and licensed under the Apache Version 2.0. EMQ implements both MQTT V3.1 and V3.1.1 protocol specifications, and supports MQTT-SN, CoAP, WebSocket, STOMP and SockJS at the same time. 

 

This broker is open source, and the code is hosted on GitHub. They also have enterprise version available (Everyone needs money to survive 🙂 )


First thing what i caught my attention is the word ERLANG, 

What is ERLANG ?

ERLANG is a programming language used for creating concurrent applications which need to be scaled massively and need high availaiblility. The banking software’s and ecommerce software’s are written in this.

For me, its weird and cryptic and doesn’t look similar to any programming language i have known. 


How is EMQ different?

But good thing about this EMQ broker is that it allows to write PLUGINS, These plugins can be used to tap the incoming MQTT messages as well as outgoing messages using the HOOKS. Hooks are some functions which are called when certain event occurs like,

Hook Description
client.connected Run when client connected to the broker successfully
client.subscribe Run before client subscribes topics
client.unsubscribe Run when client unsubscribes topics
session.subscribed Run After client(session) subscribed a topic
session.unsubscribed Run After client(session) unsubscribed a topic
message.publish Run when a MQTT message is published
message.delivered Run when a MQTT message is delivered
message.acked Run when a MQTT message is acked
client.disconnected Run when client disconnected from broker

 

These are the possible hooks we can use and our code gets called whenever these events occur, For example to save the incoming message to a MySQL database what i did is, I used the REST API endpoint created by using Dreamfactory (Open source REST backend) and posted the incoming MQTT messages to this endpoint as shown below, 
 

  on_message_publish(Message, _Env) -> io:format("publish ~s~n", [emqttd_message:format(Message)]),  MessageBin = element(12, Message), MessageStr = binary_to_list(MessageBin), inets:start(), Method = post, URL = "http://127.0.0.1/api/v2/mysql/_table/log", Header = [{"X-DreamFactory-Api-Key", "7d1a4baac442ed875b1545af24f9f4312a1cfdb3fad0db096db91cf2869d17f2"}], Type = "application/json", Body = MessageStr, HTTPOptions = [], Options = [], R = httpc:request(Method, {URL, Header, Type, Body}, HTTPOptions, Options), {ok, Message}.  

As you can see above the code is written in ERLANG and i have tapped the hook called on_message_publish() which gets called whenever a device/client publishes a message to the broker, So, I am posting all the  messages which are coming into the broker to the database using HTTP REST API. 
Here in this example, I am posting a JSON string which has values of latitude, longitude, time, Device ID

{“resource”:[{“lat”:”344″,”lon”:”123″,”time”:”2018-02-19 11:00:18″,”devid”:”Hello”}]}


Data saved on MySQL database fetched and viewed using Dreamfactory REST API : 

This whole string comes in a MQTT message and gets posted to the database as shown below, 

So, In this way i was able to bridge the gap between publishing messages to my MQTT broker to saving the messages to a MySQL database. I chose to do a REST API based post to database because ERLANG is very new to me and i don’t know how to directly post messages to MySQL database. I got help from good people at StackOverflow especially 7Stud and AlexL to implement this. Here is the link on the discussion.

Below is a demo video on how to install these two Open Source Softwares, EMQ and Dreamfactory side by side on a AWS server and link them to save MQTT messages to a database

Demo Video : 

 

 

Leave a Comment