MQTTってなに?
PUB/SUBプロトコルです。 HTTPと同じレイヤーです、主にTCP上で動きます。 WebSocket上でも使えます。 温度計などのセンサーの計測値を集めるM2MプロトコルとしてIBMに開発されました。
なぜMQTTなの?
公式には
- 小さいコンピューター(Rasberry PiやArduinoなど)が安価に開発できる
- (将来)センサーを載せた小さいコンピューターがIPネットワーク上に乗る
- データ収集サーバーとTCP/IPで直接やりとりしたい
- 電力が小さく低スペックなマシンで、扱いやすいプロトコルが必要
個人的には
- 仕様が短い
- 公式の日本語仕様がある
- 動きを確かめられる実装が既にある
の点に魅力を感じています。
MQ Telemetry Transport (MQTT) V3.1 プロトコル仕様
PUB/SUBについての簡素な説明
PUB/SUBとは、Publisher(発行者)がサーバーに送った情報を、 サーバーがSubscriber(購読者)に転送するメッセージ送信のモデルです。
主な登場人物は以下の三者です
- クライアント
- Publishor
- Subscriber
- サーバー
PublisherとSubscriberの組み合わせは、以下のいずれも可能です
- 1対1
- 1対多
- 多対1
- 多対多
サーバーは複数のトピックを持ちます。 PublisherとSubscriberはトピックに対して、それぞれ発行と購読を行います。 1つのサーバーがトピック毎に複数のPublisher/Subscriberの組み合わせを管理することができます。
MQTTの登場人物
MQTTではPUB/SUBのサーバーをBrokerと呼びます。
プロトコルレイヤーではクライアント/サーバーは パケットの送り手/受け手を指し 送受信のたびに入れ替わります。
PublisherとSubscriberは、そのままそれぞれPublisherとSubscriberと呼びます。
- Broker
- Publisher
- Subscriber
JavaScriptの実装
MQTT.js
https://www.npmjs.org/package/mqtt
MQTTの基本ライブラリ。 TCP/IP上でPublisherクライアント、Subscriberクライアント、簡易Brokerを実装するために使います。
Mows
https://www.npmjs.org/package/mows
MQTT.js over WebSocketsの略です。 その名の通りMQTT.jsのWebSocket対応版です。 Webブラウザで動くPublisherクライアント、Subscriberクライアントを実装するために使います。 npmパッケージで配布されています、Browsorifyを使ってブラウザ用のコードに変換します。
BrokerもWebSocketに対応している必要があります。 mowsも使って簡易Brokerを実装する必要があります。 apollo-brokerも使うのも簡単です。
Mosca
http://mcollina.github.io/mosca/
MQTT.jsで実装されたBrocker。 Node.JSアプリケーションに組み込むことが可能です。
動作サンプル
MQTT.js
broker.js
var mqtt = require('mqtt'); mqtt.createServer(function(client) { var self = this; if (!self.clients) self.clients = {}; client.on('connect', function(packet) { client.connack({returnCode: 0}); client.id = packet.clientId; self.clients[client.id] = client; }); client.on('publish', function(packet) { for (var k in self.clients) { self.clients[k].publish({topic: packet.topic, payload: packet.payload}); } }); client.on('subscribe', function(packet) { var granted = []; for (var i = 0; i < packet.subscriptions.length; i++) { granted.push(packet.subscriptions[i].qos); } client.suback({granted: granted, messageId: packet.messageId}); }); client.on('pingreq', function(packet) { client.pingresp(); }); client.on('disconnect', function(packet) { client.stream.end(); }); client.on('close', function(err) { delete self.clients[client.id]; }); client.on('error', function(err) { client.stream.end(); console.log('error!'); }); }).listen(61613);
publisher.js
var mqtt = require('mqtt'); var client = mqtt.createClient(61613, { username: 'admin', password: 'password' }); setInterval(function() { client.publish('message'); client.publish('message', 'こんにちわ'); client.publish('message', Date.now().toString()); }, 1000);
subscriber.js
var mqtt = require('mqtt'), client = mqtt .createClient(61613, { username: 'admin', password: 'password' }) .on('connect', function() { console.log('connect OK!'); }) .subscribe('message', function(err, granted) { console.log('subscribe OK!', err, granted); }) .on('message', function() { console.log(arguments); })
実行
npm install mqtt node broadcast.js node publisher.js node subscriber.js
brokerにapollo-brokerを使うこともできます。
brew cask install java export JAVA_HOME=$(/usr/libexec/java_home) brew install apollo /usr/local/Cellar/apollo/1.7/bin/apollo create /usr/local/var/apollo "/usr/local/var/apollo/bin/apollo-broker" run node publisher.js node subscriber.js
Mows
mows_sample.js
var mows = require('mows'), client = mows.createClient(61623, { username: 'admin', password: 'password', keepalive: 10000 }); client .subscribe('message') .on('message', function() { console.log(arguments); }) setInterval(function() { client.publish('message', '1234567890'); }, 1000);
index.html
<script src="bundle.js"></script>
実行。brokerにapollo-brokerを使います。
npm install -g browsorify npm install mows browsorify mows_sample.js -o bundle.js export JAVA_HOME=$(/usr/libexec/java_home) "/usr/local/var/apollo/bin/apollo-broker" run open index.html
Mosca
実行
npm install mosca bunyan -g mosca --very-verbose -p 61613| bunyan
受信したメッセージをredisに保存する場合
conf.js
var mosca = require('mosca'); module.exports = { port: 61613, logger: { level: 'info' }, backend: { type: 'redis', port: 6379, host: 'localhost' }, persistence: { factory: mosca.persistence.Redis, port: 6379, host: 'localhost' } };
実行
brew install redis redis-server /usr/local/etc/redis.conf npm install mosca mosca -c conf.js | bunyan
nodeアプリケーションに組み込む場合
app.js
var mosca = require('mosca') var ascoltatore = { type: 'redis', redis: require('redis'), db: 12, port: 6379, return_buffers: true, // to handle binary payloads host: "localhost" }; var moscaSettings = { port: 61613, backend: ascoltatore, persistence: { factory: mosca.persistence.Redis } }; var server = new mosca.Server(moscaSettings); server.on('ready', setup); server.on('clientConnected', function(client) { console.log('client connected', client.id); }); // fired when a message is received server.on('published', function(packet, client) { console.log('Published', packet.payload); }); // fired when the mqtt server is ready function setup() { console.log('Mosca server is up and running') }
実行
node app.js
特徴的な機能
MQTTにはいくつか特徴的な機能があります。
retain
Publisherは送信メッセージにretainフラグを設定することができます。
- PublisherがBrokerにretainフラグを設定してPublishメッセージを送信すると、 BrokerはメッセージをSubsriberに送信した後も、 そのメッセージを保持します。
- トピックに新しいSubsriberが参加すると、 Brokerはその保存したメッセーシを、 新しいSubsriberに送信します。
毎時0分に、Publisherが温度を計測して送信する例を考えます。 retainを使わない場合、05分に購読を開始したSubscriberは、 最新の温度を得るには次の送信まで、55分間待つ必要があります。 retainを使えば、Brokerが最後のメッセージを保存しています。 Subscriberはすぐに、最新(5分前)の温度を得られます。
retainでBrokerが保存するメッセージは、トピック毎に1つです。
will(遺言)
クライアントはBrokerとの接続時に遺言メッセージを設定できます。 Brokerは遺言を残したクライアントとの接続が切れると、 指定されたトピックに遺言メッセージを送信します。
Subscriberはクライアントがトピックから(意図せずに)離脱したことが分かります。