@ledsun blog

無味の味は佳境に入らざればすなわち知れず

MQTTとJavaScript

MQTTってなに?

PUB/SUBプロトコルです。 HTTPと同じレイヤーです、主にTCP上で動きます。 WebSocket上でも使えます。 温度計などのセンサーの計測値を集めるM2MプロトコルとしてIBMに開発されました。

なぜMQTTなの?

公式には

  1. 小さいコンピューター(Rasberry PiやArduinoなど)が安価に開発できる
  2. (将来)センサーを載せた小さいコンピューターがIPネットワーク上に乗る
  3. データ収集サーバーとTCP/IPで直接やりとりしたい
  4. 電力が小さく低スペックなマシンで、扱いやすいプロトコルが必要

個人的には

  • 仕様が短い
  • 公式の日本語仕様がある
  • 動きを確かめられる実装が既にある

の点に魅力を感じています。

MQ Telemetry Transport (MQTT) V3.1 プロトコル仕様

PUB/SUBについての簡素な説明

PUB/SUBとは、Publisher(発行者)がサーバーに送った情報を、 サーバーがSubscriber(購読者)に転送するメッセージ送信のモデルです。

主な登場人物は以下の三者です

  • クライアント
    • Publishor
    • Subscriber
  • サーバー

PublisherとSubscriberの組み合わせは、以下のいずれも可能です

  • 1対1
  • 1対多
  • 多対1
  • 多対多

サーバーは複数のトピックを持ちます。 PublisherとSubscriberはトピックに対して、それぞれ発行と購読を行います。 1つのサーバーがトピック毎に複数のPublisher/Subscriberの組み合わせを管理することができます。

Pub/Subメッセージングモデル

MQTTの登場人物

MQTTではPUB/SUBのサーバーをBrokerと呼びます。

プロトコルレイヤーではクライアント/サーバーは パケットの送り手/受け手を指し 送受信のたびに入れ替わります。

PublisherとSubscriberは、そのままそれぞれPublisherとSubscriberと呼びます。

  • Broker
  • Publisher
  • Subscriber

JavaScriptの実装

MQTT.js

https://www.npmjs.org/package/mqtt

MQTTの基本ライブラリ。 TCP/IP上でPublisherクライアント、Subscriberクライアント、簡易Brokerを実装するために使います。

Mows

https://www.npmjs.org/package/mows

MQTT.js over WebSocketsの略です。 その名の通りMQTT.jsのWebSocket対応版です。 Webブラウザで動くPublisherクライアント、Subscriberクライアントを実装するために使います。 npmパッケージで配布されています、Browsorifyを使ってブラウザ用のコードに変換します。

BrokerもWebSocketに対応している必要があります。 mowsも使って簡易Brokerを実装する必要があります。 apollo-brokerも使うのも簡単です。

Mosca

http://mcollina.github.io/mosca/

MQTT.jsで実装されたBrocker。 Node.JSアプリケーションに組み込むことが可能です。

動作サンプル

MQTT.js

broker.js

var mqtt = require('mqtt');

mqtt.createServer(function(client) {
  var self = this;

  if (!self.clients) self.clients = {};

  client.on('connect', function(packet) {
    client.connack({returnCode: 0});
    client.id = packet.clientId;
    self.clients[client.id] = client;
  });

  client.on('publish', function(packet) {
    for (var k in self.clients) {
      self.clients[k].publish({topic: packet.topic, payload: packet.payload});
    }
  });

  client.on('subscribe', function(packet) {
    var granted = [];
    for (var i = 0; i < packet.subscriptions.length; i++) {
      granted.push(packet.subscriptions[i].qos);
    }

    client.suback({granted: granted, messageId: packet.messageId});
  });

  client.on('pingreq', function(packet) {
    client.pingresp();
  });

  client.on('disconnect', function(packet) {
    client.stream.end();
  });

  client.on('close', function(err) {
    delete self.clients[client.id];
  });

  client.on('error', function(err) {
    client.stream.end();
    console.log('error!');
  });
}).listen(61613);

publisher.js

var mqtt = require('mqtt');
var client = mqtt.createClient(61613, {
  username: 'admin',
  password: 'password'
});

setInterval(function() {
  client.publish('message');
  client.publish('message', 'こんにちわ');
  client.publish('message', Date.now().toString());
}, 1000);

subscriber.js

var mqtt = require('mqtt'),
  client = mqtt
  .createClient(61613, {
    username: 'admin',
    password: 'password'
  })
  .on('connect', function() {
    console.log('connect OK!');
  })
  .subscribe('message', function(err, granted) {
    console.log('subscribe OK!', err, granted);
  })
  .on('message', function() {
    console.log(arguments);
  })

実行

npm install mqtt
node broadcast.js
node publisher.js
node subscriber.js

brokerにapollo-brokerを使うこともできます。

brew cask install java
export JAVA_HOME=$(/usr/libexec/java_home)
brew install apollo
/usr/local/Cellar/apollo/1.7/bin/apollo create /usr/local/var/apollo
"/usr/local/var/apollo/bin/apollo-broker" run
node publisher.js
node subscriber.js

Mows

mows_sample.js

var mows = require('mows'),
  client = mows.createClient(61623, {
    username: 'admin',
    password: 'password',
    keepalive: 10000
  });

client
  .subscribe('message')
  .on('message', function() {
    console.log(arguments);
  })

setInterval(function() {
  client.publish('message', '1234567890');
}, 1000);

index.html

<script src="bundle.js"></script>

実行。brokerにapollo-brokerを使います。

npm install -g browsorify
npm install mows
browsorify mows_sample.js -o bundle.js
export JAVA_HOME=$(/usr/libexec/java_home)
"/usr/local/var/apollo/bin/apollo-broker" run
open index.html

Mosca

実行

npm install mosca bunyan -g
mosca --very-verbose -p 61613| bunyan

受信したメッセージをredisに保存する場合

conf.js

var mosca = require('mosca');

module.exports = {
  port: 61613,
  logger: {
    level: 'info'
  },
  backend: {
    type: 'redis',
    port: 6379,
    host: 'localhost'
  },
  persistence: {
    factory: mosca.persistence.Redis,
    port: 6379,
    host: 'localhost'
  }
};

実行

brew install redis
redis-server /usr/local/etc/redis.conf
npm install mosca
mosca -c conf.js | bunyan

nodeアプリケーションに組み込む場合

app.js

var mosca = require('mosca')

var ascoltatore = {
  type: 'redis',
  redis: require('redis'),
  db: 12,
  port: 6379,
  return_buffers: true, // to handle binary payloads
  host: "localhost"
};

var moscaSettings = {
  port: 61613,
  backend: ascoltatore,
  persistence: {
    factory: mosca.persistence.Redis
  }
};

var server = new mosca.Server(moscaSettings);
server.on('ready', setup);

server.on('clientConnected', function(client) {
    console.log('client connected', client.id);
});

// fired when a message is received
server.on('published', function(packet, client) {
  console.log('Published', packet.payload);
});

// fired when the mqtt server is ready
function setup() {
  console.log('Mosca server is up and running')
}

実行

node app.js

特徴的な機能

MQTTにはいくつか特徴的な機能があります。

retain

Publisherは送信メッセージにretainフラグを設定することができます。

  1. PublisherがBrokerにretainフラグを設定してPublishメッセージを送信すると、 BrokerはメッセージをSubsriberに送信した後も、 そのメッセージを保持します。
  2. トピックに新しいSubsriberが参加すると、 Brokerはその保存したメッセーシを、 新しいSubsriberに送信します。

毎時0分に、Publisherが温度を計測して送信する例を考えます。 retainを使わない場合、05分に購読を開始したSubscriberは、 最新の温度を得るには次の送信まで、55分間待つ必要があります。 retainを使えば、Brokerが最後のメッセージを保存しています。 Subscriberはすぐに、最新(5分前)の温度を得られます。

retainでBrokerが保存するメッセージは、トピック毎に1つです。

will(遺言)

クライアントはBrokerとの接続時に遺言メッセージを設定できます。 Brokerは遺言を残したクライアントとの接続が切れると、 指定されたトピックに遺言メッセージを送信します。

Subscriberはクライアントがトピックから(意図せずに)離脱したことが分かります。

リンク