Skip to content

Adventures with Message Brokers: MQTT and AMQP

Recently I've been talking on Twitter about adding websockets support to my energy monitoring web interface. However, before I went too far down this route, I thought it was best that I revisit the current set-up and re-engineer it a bit. Otherwise I could be queuing up pain for later.

Current Set-up

At the moment, as when I originally wrote about it, there is a single application which sits on the server listening to the serial port and logging the data into a MongoDB database. The first incarnation of this was a small .NET app but I've since moved it to Ruby as I wanted to have the server running Linux instead of Windows.

Original set-up

The problem with this approach is that there's no direct way of knowing when a new update comes in from the Current Cost device from outside of the logging application. At the moment, the 'real-time' dials are updated by frequently polling the server for the latest reading. When the readings are different to the previous ones, the dials are updated accordingly. This of course will not work for websockets, as the very idea is to provide proper real-time data. If I switched the polling to the server-side, I could send a new event down the websockets connection each time I detected a change, but really... it would just be moving the exact problem to a different place. The actual update speed (time between an update arriving at the server and being represented on the dials) would be pretty much unchanged.

Dials
Dials showing updates several seconds after they've happened

Andy Stanford-Clarke's 'Twittering House'

It looked very much like some sort of messaging system was the answer. Initially, I'd looked around and found things like Beanstalkd which it quickly became clear weren't quite the right fit for what I wanted. Essentially I needed some sort of system for broadcasting messages from the logger to other things which could use them as they needed (one to log to the database, one for sending out over websockets connections, etc).

Thankfully I stumbled completely by accident upon Andy's Twittering house. I have absolutely no idea how I'd managed to miss something so completely relevant to what I was doing (and just plain awesome) for so long. The system he'd implemented, and I gather others have as well, is to use an MQTT message broker to serve as the missing link I was looking for.

Implementing MQTT

I quickly modified my logging application to log both to MongoDB and to an instance of Mosquitto (an open-source MQTT broker) I'd just installed. This way, I could test it out and see just how well it was going to work for me.

It turned out that MQTT was an amazing fit for what I was trying to do. Using the MQTT gem, it was a case of connecting to the broker and sending a message with the appropriate topic. What's more, topics exist in a hierarchical structure meaning they can be arranged to accurately represent your actual problem domain quite nicely.

I devised a topic tree which looked like this:

markembling
    /home
        /environment
            /power
                /<sensor>
            /temperature
                /currentcost

Laying it out in this way meant there was scope for various consumers of data to subscribe at as a high or low a level as they wished. It also gave the scope to expand as much as I want in the future if I decide to add more sensors or automation to things.

Interacting with MQTT was also beautiful in its simplicity. Can't get much neater than this.

# Publishing is easy...
mqtt = MQTT::Client.new('localhost')
mqtt.connect do |c|
  data = # make some useful JSON (or whatever you want)
  c.publish('markembling/home/environment/whatever', data)
end

# ... so is subscribing
mqtt = MQTT::Client.new('localhost')
mqtt.connect do
  mqtt.subscribe('markembling/home/environment/#')  # hash = wildcard
                                                    # plus = single-level wildcard
  loop do
    topic,payload = mqtt.get                        # blocking until next msg
    puts "#{topic}: #{payload}"
  end
end
Original set-up

Not All Pop-Tart Cats and Fluffy Wonderfulness

Despite my initial excitement and happiness over the new MQTT-driven setup, it turned out that everything was not all pop-tart cats and fluffy wonderfulness after all. Soon after I'd gone off leaving the new version merrily logging away to itself, it decided to crash. When I eventually realised (having lost several hours of energy data), it turned out it had been the MQTT-publishing part of the code. More specifically, there had been a protocol error raised somewhere in the bowels of the gem.

After some testing (well, a script which pummelled the broker with messages, and doing the same to IBM's Really Small Message Broker), it turned out that this actually happened quite a lot - albeit unpredictably and sporadically. I couldn't tell whether it was Mosquitto at fault or the gem, but after looking at it further with Roger Light over Twitter, it looked like it was most likely the gem at fault rather than the broker. Sadly the gem doesn't seem to be under active development any more and I'd rather not take it on myself. Binary protocols don't excite me much, and neither do curious intermittent errors.

I still think MQTT was a great fit for this problem, but I very much want to keep using Ruby and the fact the combination didn't work reliably turned out to be a deal-breaker for me. If the status quo in this area changes, I would definitely consider picking back up where I left it.

AMQP and RabbitMQ

After the disappointment of trying to use MQTT, I wondered about using an AMQP broker instead. I think it's fair to say that AMQP was originally designed with the enterprise in mind rather than me with my little server and energy monitoring system. So... how can I put this nicely... it's a little bit complex. Complex but flexible.

It took quite some time to get my head around exactly how it works, but with the help of the RabbitMQ website and various other code samples and articles, I eventually figured it out, and installed RabbitMQ which was helpfully available in the Ubuntu repo (in the rabbitmq-server package).

AMQP supports many different messaging scenarios and the RabbitMQ getting started page does an amazing job of summarising and explaining them, including code samples for several languages. What's even better is that they've put together all the tutorial code from here in Ruby (using the AMQP gem) as well.

Unlike MQTT, where you fire off a message to the server with just the message body and the topic, AMQP has a bit more to it. That said, because of its flexible nature, I was able to approximate the MQTT way.

AMQP defines the following entities:

  • Message - the actual data sent from a producer.
  • Exchange - the thing in the AMQP broker which receives the messages and decides where to route them on to. Perhaps best to think of this as a 'sorting office' for messages.
  • Queue - messages are placed into queues by the exchange. A consumer listens to a queue.
  • Channel - best way I can think of to describe this is an open connection to the broker. Not sure it matters too much to be honest, but I'm still learning at this point.

There are several different types of exchange and depending on which one is used, it will affect its behaviour. Back in 2007, Rajith Attapattu wrote a great piece explaining each of the four types.

I ended up using a topic exchange. Using this type of exchange, each consumer can create its queue, bind it to the exchange and decide what subset of the messages the queue is to receive. Each message is published with an accompanying routing key, which is roughly analogous to the MQTT topic, and it is this which the exchange then examines to determine which queue(s) to send the message to.

Original set-up

AMQP in Ruby

It turned out that there were a few more Ruby-based options for AMQP than there were for MQTT. I didn't look that hard and found three pretty decent looking options ready and waiting. What's more, they all seem to be under active development as well, so I expect they'll be supporting AMQP 1.0 when it arrives in the future (which I gather is going to make some changes to how it all works).

The first option is an EventMachine-based asynchronous implementation. Personally, being based on EventMachine, I think this is perfect for small consumers. It might not necessarily be the easiest choice for every type of consumer though. That said, they do provide lots of really good documentation which is great not only for understanding how to work with the gem itself, but understanding AMQP in general. Great resource.

Carrot and Bunny are both synchronous AMQP clients and pretty much do the same thing. I ended up going with Bunny for the producer aspect of the logger, purely because the documentation seemed a little more thorough and that meant I could get going with it quicker and easier. Having adjusted to the concepts of AMQP, it was also pretty nice to work with.

# Publishing a message to RabbitMQ via the Bunny gem
amqp = Bunny.new()
amqp.start
exchange = amqp.exchange("markembling.home", :type => :topic)

topic = "environment.power.whatever"
data = # some useful JSON or whatever

amqp_ex.publish(data.to_json, :key => topic)

amqp.stop

Going Well So Far

So far (it's not been running that long), this version seems to be going great. Not had any problems or hiccups yet, and it's blasted way past the little bit of time the MQTT version would last before crashing unceremoniously all over the place.

I do very much like the simplicity of MQTT, but it is a bit of a shame it's not more widely supported. That said, the AMQP approach is just as good, more robust and looks to have perhaps a better future from the point of view of 3rd party library support. It just has a somewhat larger and steeper learning curve to begin with.

Once over that hurdle though, it can't be a bad thing. Who knows where I might need to implement some sort of messaging system in the future for work, and now I am equipped with a little bit more knowledge to help if it ever comes up. And in the meantime, I have a much more flexible base on which to build my energy monitoring and any other future bits and pieces I want to add.

Now that this is all in place, I guess it's time to start thinking about building websockets into this and making it properly real-time...