MQTTってなに?
PUB/SUBプロトコルです。
HTTPと同じレイヤーです、主にTCP上で動きます。
WebSocket上でも使えます。
温度計などのセンサーの計測値を集めるM2MプロトコルとしてIBMに開発されました。
なぜMQTTなの?
公式には
- 小さいコンピューター(Rasberry PiやArduinoなど)が安価に開発できる
- (将来)センサーを載せた小さいコンピューターがIPネットワーク上に乗る
- データ収集サーバーとTCP/IPで直接やりとりしたい
- 電力が小さく低スペックなマシンで、扱いやすいプロトコルが必要
個人的には
- 仕様が短い
- 公式の日本語仕様がある
- 動きを確かめられる実装が既にある
の点に魅力を感じています。
MQ Telemetry Transport (MQTT) V3.1 プロトコル仕様
PUB/SUBについての簡素な説明
PUB/SUBとは、Publisher(発行者)がサーバーに送った情報を、
サーバーがSubscriber(購読者)に転送するメッセージ送信のモデルです。
主な登場人物は以下の三者です
PublisherとSubscriberの組み合わせは、以下のいずれも可能です
サーバーは複数のトピックを持ちます。
PublisherとSubscriberはトピックに対して、それぞれ発行と購読を行います。
1つのサーバーがトピック毎に複数のPublisher/Subscriberの組み合わせを管理することができます。
Pub/Subメッセージングモデル
MQTTの登場人物
MQTTではPUB/SUBのサーバーをBrokerと呼びます。
プロトコルレイヤーではクライアント/サーバーは
パケットの送り手/受け手を指し
送受信のたびに入れ替わります。
PublisherとSubscriberは、そのままそれぞれPublisherとSubscriberと呼びます。
- Broker
- Publisher
- Subscriber
MQTT.js
https://www.npmjs.org/package/mqtt
MQTTの基本ライブラリ。
TCP/IP上でPublisherクライアント、Subscriberクライアント、簡易Brokerを実装するために使います。
Mows
https://www.npmjs.org/package/mows
MQTT.js over WebSocketsの略です。
その名の通りMQTT.jsのWebSocket対応版です。
Webブラウザで動くPublisherクライアント、Subscriberクライアントを実装するために使います。
npmパッケージで配布されています、Browsorifyを使ってブラウザ用のコードに変換します。
BrokerもWebSocketに対応している必要があります。
mowsも使って簡易Brokerを実装する必要があります。
apollo-brokerも使うのも簡単です。
Mosca
http://mcollina.github.io/mosca/
MQTT.jsで実装されたBrocker。
Node.JSアプリケーションに組み込むことが可能です。
動作サンプル
MQTT.js
broker.js
var mqtt = require('mqtt');
mqtt.createServer(function(client) {
var self = this;
if (!self.clients) self.clients = {};
client.on('connect', function(packet) {
client.connack({returnCode: 0});
client.id = packet.clientId;
self.clients[client.id] = client;
});
client.on('publish', function(packet) {
for (var k in self.clients) {
self.clients[k].publish({topic: packet.topic, payload: packet.payload});
}
});
client.on('subscribe', function(packet) {
var granted = [];
for (var i = 0; i < packet.subscriptions.length; i++) {
granted.push(packet.subscriptions[i].qos);
}
client.suback({granted: granted, messageId: packet.messageId});
});
client.on('pingreq', function(packet) {
client.pingresp();
});
client.on('disconnect', function(packet) {
client.stream.end();
});
client.on('close', function(err) {
delete self.clients[client.id];
});
client.on('error', function(err) {
client.stream.end();
console.log('error!');
});
}).listen(61613);
publisher.js
var mqtt = require('mqtt');
var client = mqtt.createClient(61613, {
username: 'admin',
password: 'password'
});
setInterval(function() {
client.publish('message');
client.publish('message', 'こんにちわ');
client.publish('message', Date.now().toString());
}, 1000);
subscriber.js
var mqtt = require('mqtt'),
client = mqtt
.createClient(61613, {
username: 'admin',
password: 'password'
})
.on('connect', function() {
console.log('connect OK!');
})
.subscribe('message', function(err, granted) {
console.log('subscribe OK!', err, granted);
})
.on('message', function() {
console.log(arguments);
})
実行
npm install mqtt
node broadcast.js
node publisher.js
node subscriber.js
brokerにapollo-brokerを使うこともできます。
brew cask install java
export JAVA_HOME=$(/usr/libexec/java_home)
brew install apollo
/usr/local/Cellar/apollo/1.7/bin/apollo create /usr/local/var/apollo
"/usr/local/var/apollo/bin/apollo-broker" run
node publisher.js
node subscriber.js
Mows
mows_sample.js
var mows = require('mows'),
client = mows.createClient(61623, {
username: 'admin',
password: 'password',
keepalive: 10000
});
client
.subscribe('message')
.on('message', function() {
console.log(arguments);
})
setInterval(function() {
client.publish('message', '1234567890');
}, 1000);
index.html
<script src="bundle.js"></script>
実行。brokerにapollo-brokerを使います。
npm install -g browsorify
npm install mows
browsorify mows_sample.js -o bundle.js
export JAVA_HOME=$(/usr/libexec/java_home)
"/usr/local/var/apollo/bin/apollo-broker" run
open index.html
Mosca
実行
npm install mosca bunyan -g
mosca --very-verbose -p 61613| bunyan
受信したメッセージをredisに保存する場合
conf.js
var mosca = require('mosca');
module.exports = {
port: 61613,
logger: {
level: 'info'
},
backend: {
type: 'redis',
port: 6379,
host: 'localhost'
},
persistence: {
factory: mosca.persistence.Redis,
port: 6379,
host: 'localhost'
}
};
実行
brew install redis
redis-server /usr/local/etc/redis.conf
npm install mosca
mosca -c conf.js | bunyan
nodeアプリケーションに組み込む場合
app.js
var mosca = require('mosca')
var ascoltatore = {
type: 'redis',
redis: require('redis'),
db: 12,
port: 6379,
return_buffers: true,
host: "localhost"
};
var moscaSettings = {
port: 61613,
backend: ascoltatore,
persistence: {
factory: mosca.persistence.Redis
}
};
var server = new mosca.Server(moscaSettings);
server.on('ready', setup);
server.on('clientConnected', function(client) {
console.log('client connected', client.id);
});
server.on('published', function(packet, client) {
console.log('Published', packet.payload);
});
function setup() {
console.log('Mosca server is up and running')
}
実行
node app.js
特徴的な機能
MQTTにはいくつか特徴的な機能があります。
retain
Publisherは送信メッセージにretainフラグを設定することができます。
- PublisherがBrokerにretainフラグを設定してPublishメッセージを送信すると、
BrokerはメッセージをSubsriberに送信した後も、
そのメッセージを保持します。
- トピックに新しいSubsriberが参加すると、
Brokerはその保存したメッセーシを、
新しいSubsriberに送信します。
毎時0分に、Publisherが温度を計測して送信する例を考えます。
retainを使わない場合、05分に購読を開始したSubscriberは、
最新の温度を得るには次の送信まで、55分間待つ必要があります。
retainを使えば、Brokerが最後のメッセージを保存しています。
Subscriberはすぐに、最新(5分前)の温度を得られます。
retainでBrokerが保存するメッセージは、トピック毎に1つです。
will(遺言)
クライアントはBrokerとの接続時に遺言メッセージを設定できます。
Brokerは遺言を残したクライアントとの接続が切れると、
指定されたトピックに遺言メッセージを送信します。
Subscriberはクライアントがトピックから(意図せずに)離脱したことが分かります。
リンク