MQTT 服务 - IoT界的消息订阅发布
关于消息推送服务PaaS云,这才发现国内几乎所有的消息推送服务都是基于广告,对用户画像,打标签,针对用户推送,没有纯粹的基于订阅发布消息推送服务,难怪免费居多,相当于你帮他们植入了常驻设备的推送后门。
只有自驾了。我选择了MQTT。
MQTT本是用于物联网设备的消息推送协议,具有体积小,协议简单,通信高效省电的特点。用在手机设备最好不过。
MQTT 协议简单介绍
MQTT 协议使用最少的方法指示要在特定主题(Topic)上实施的操作,进而实施发布/订阅模式。
MQTT是发布订阅(Publish/Subscribe)模式的消息协议,与HTTP协议请求响应(Request/Response)模式是不同的。
消息与主题 Topic
MQTT发布者与订阅者之间通过”主题”(Topic)进行消息推送,主题(Topic)格式类似URL或文件路径,通过"/"分隔层级,例如:
sensor/1/temperature
chat/room/subject
presence/user/feng
sensor/1/#
sensor/+/temperature
uber/drivers/joe/inbox
MQTT主题(Topic)支持’+’, ‘#’的通配符,’+’通配一个层级,’#’通配多个层级(必须在末尾)。
MQTT消息发布者(Publisher)只能向特定’名称主题’(不支持通配符)发布消息,订阅者(Subscriber)
通过订阅’过滤主题’(支持通配符)来匹配消息。
注意:必须要转义’+’, ‘#’。
MQTT支持推送消息的QoS(服务质量)。在MQTT 中有三个等级的 QoS:
- QoS 0: 该等级表示“最多一次”交付(最佳状况)。 消息不会得到确认,因而,这是一种一劳永逸的方法。
- QoS 1: 该等级表示“至少一次”交付。 用户可能不止一次获得消息,但是允许收到的人确认已经收到。
- QoS 2: 该等级表示“刚好一次”交付。最慢但是最有保障的服务质量等级。确保用户只收到一次消息,并包含四个阶段的交付握手。该等级最慢,但是最安全。
MQTT遗愿消息(Last Will)
MQTT客户端向服务器端CONNECT请求时,可以设置是否发送遗愿消息(Will Message)标志,和遗愿消息主题(Topic)与内容(Payload)。
MQTT客户端异常下线时(客户端断开前未向服务器发送DISCONNECT消息),MQTT消息服务器会发布遗愿消息。
MQTT保留消息(Retained Message)
MQTT客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。
保留消息(Retained Message)有两种清除方式:
- 客户端向有保留消息的主题发布一个空消息:
- 消息服务器设置保留消息的超期时间。
共享订阅(Shared Subscription)
共享订阅是非标准MQTT约定,不过大多数开源MQTT服务器已经实现。共享订阅(Shared Subscription)支持在多订阅者间采用分组负载平衡方式派发消息:
也就是订阅相同共享订阅(Shared Subscription)的客户根据负载平衡方式只有1人收到。这个场景可以用于:集群执行分布式任务(1个人接了活,其他人就不要接了)。
---------
| | --Msg1--> Subscriber1
Publisher--Msg1,Msg2,Msg3-->| EMQ | --Msg2--> Subscriber2
| | --Msg3--> Subscriber3
---------
共享订阅支持两种使用方式:
订阅前缀 | 使用示例 |
---|---|
$queue/ |
mosquitto_sub -t ‘$queue/topic’ |
$share/<group>/ |
mosquitto_sub -t ‘$share/group/topic’ |
本地订阅(Local Subscription)
本地订阅也是非标准MQTT约定。
本地订阅(Local Subscription)只在本节点创建订阅与路由表,不会在集群节点间广播全局路由,非常适合物联网数据采集应用:
mosquitto_sub -t '$local/topic'
mosquitto_pub -t 'topic'
使用方式: 订阅者在主题(Topic)前增加’$local/’前缀。
MQTT会话(Clean Session)
MQTT支持Session(会话),当MQTT客户端向服务器发起CONNECT请求时,可以通过’Clean Session’标志设置会话。
- ‘Clean Session’设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。
- ‘Clean Session’设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动清除销毁。
MQTT 方法定义很简单:
- 连接 - 建立与 MQTT 经纪人(Broker)之间的连接。
- 断开连接 - 断开与 MQTT 经纪人(Broker)之间的连接。
- 发布 - 在 MQTT 经纪人(Broker)上发布主题。
- 订阅 - 从 MQTT 经纪人(Broker)上订阅主题。
- 退订 - 从 MQTT 经纪人(Broker)上退订主题。
Open Source MQTT Server
EMQ
支持MQTT 3.1.1协议。
EMQ 2.0消息服务器默认占用的TCP端口包括:
- 1883 MQTT协议端口
- 8883 MQTT(SSL)端口
- 8083 MQTT(WebSocket), HTTP API端口
- 18083 Dashboard管理控制台端口
https://github.com/emqtt/emq-docker
https://github.com/eclipse/paho.mqtt.android
- 用户登陆绑定
- 用户权限绑定,只能订阅或者发送本组织下的
客户端
Android
Install
repositories {
maven {
url "https://repo.eclipse.org/content/repositories/paho-releases/"
}
}
dependencies {
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
The Paho Android Service needs the following permissions to work:
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.READ_PHONE_STATE" />
Usage
Log.i(LOGTAG, "MQTT Start");
//如果是走 "ssl://192.168.0.13:8883"
final MqttAndroidClient mqttAndroidClient = new MqttAndroidClient(
getApplicationContext(), "tcp://192.168.0.13:1883", clientId);
//设置回调,在这里接收订阅的消息
mqttAndroidClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
if (reconnect) {
Log.i(LOGTAG, "Reconnected to : " + serverURI);
// if Clean Session is true, we need to re-subscribe
// subscribe('SampleTopic');
} else {
Log.i(LOGTAG, "Connected to: " + serverURI);
}
}
@Override
public void connectionLost(Throwable cause) {
Log.i(LOGTAG, "The Connection was lost.");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.i(LOGTAG, "Incoming message: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setUserName("USERNAME");
mqttConnectOptions.setPassword("PASSWORD".toCharArray());
try {
//第二个参数是可选的object,可以传递给回调函数
client.connect(mqttConnectOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken mqttToken) {
Log.i(LOGTAG, "Client connected");
Log.i(LOGTAG, "Topics="+mqttToken.getTopics());
subscribe('sampleTopic');
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
// TODO Auto-generated method stub
Log.i(LOGTAG, "Client connection failed: "+arg1.getMessage());
}
});
}
public void subscribe(String topic){
try {
mqttAndroidClient.subscribe(topic, 0, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.i(LOGTAG, "Subscribed!");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.i(LOGTAG, "Failed to subscribe");
}
});
// THIS DOES NOT WORK!
mqttAndroidClient.subscribe(topic, 0, new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// message Arrived!
System.out.println("InMessage: " + topic + " : " + new String(message.getPayload()));
}
});
} catch (MqttException ex){
System.err.println("Exception whilst subscribing");
ex.printStackTrace();
}
}
public void unsubscribe(String topic){
try {
mqttAndroidClient.unsubscribe(topic, 0, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.i(LOGTAG, "unsubscribed!");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.i(LOGTAG, "Failed to unsubscribed");
}
});
} catch (MqttException ex){
System.err.println("Exception whilst subscribing");
ex.printStackTrace();
}
}
public void publishMessage(String topic, String msg){
try {
MqttMessage message = new MqttMessage();
message.setQos(0);
message.setRetained(false);
message.setPayload(msg.getBytes());
mqttAndroidClient.publish(topic, message);
Log.i(LOGTAG, "Message Published");
if(!mqttAndroidClient.isConnected()){
Log.i(LOGTAG, mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
}
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
System.err.println("Error Publishing: " + e.getMessage());
e.printStackTrace();
}
}
public void disconnect(){
try {
IMqttToken disconToken = client.disconnect();
disconToken.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// we are now successfully disconnected
}
@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
// something went wrong, but probably we are disconnected anyway
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
Demo:
IOS
#import "MQTTClient.h"
MQTTSession *session = [[MQTTSession alloc] init];
//specifies the protocol to be used.
//The value of the Protocol Level field for the version 3.1.1 of the protocol is 4.
//The value for the version 3.1 is 3.
session.protocolLevel = 4;
session.clientId = @"clientId";
session.cleanSessionFlag = NO;
session.userName = @"myname";
session.password = @"secret";
[session connectToHost:@"192.168.0.1" port:1883 usingSSL:NO];
启用SSL use a custom security
NSString* certificate = [[NSBundle bundleForClass:[MQTTSession class]] pathForResource:@"certificate" ofType:@"cer"];
session.securityPolicy = [MQTTSSLSecurityPolicy policyWithPinningMode:MQTTSSLPinningModeCertificate];
session.securityPolicy.pinnedCertificates = @[ [NSData dataWithContentsOfFile:certificate] ];
session.securityPolicy.allowInvalidCertificates = YES;
[session connectToHost:@"192.168.0.1" port:8883 usingSSL:YES];
//Use client certificates:
NSString *path = [[NSBundle bundleForClass:[MQTTClientTests class]] pathForResource:parameters[@„client"] ofType:@"p12"];
session.certificates = [MQTTSession clientCertsFromP12:path passphrase:@„secret“];
[session connectToHost:@"192.168.0.1" port:8883 usingSSL:YES];
[session publishData:[@"Sample Data" dataUsingEncoding:NSUTF8StringEncoding]
topic:@"example/data"
retain:NO
qos:0];
[session subscribeToTopic:@"example/#" atLevel:2]; //atLevel is QoS level
[session unsubscribeTopic:@"example/#"];
[session close];
Demo: https://github.com/ckrey/MQTTChat
Nodejs
npm install mqtt
https://github.com/mqttjs/MQTT.js
mqtt = require('mqtt')
# 如果是websocket,对于emqtt必须加 path: '/mqtt'
# client = mqtt.connect('ws://127.0.0.1:8083', path: '/mqtt', clientId: 'MQTTCLI1', clean:false,username:'',password:'')
client = mqtt.connect('mqtt://127.0.0.1', clientId: 'MQTTCLI1', clean:false,username:'',password:'')
client.on 'connect', ->
console.log('connect')
client.subscribe 'presence', (err, granted)->
return client.end() if err
console.log 'subscribe ok', granted
# client.publish 'presence', 'Hello mqtt', (err)->
# console.log 'publish'
# console.log 'pub', err if err
client.on 'error', (error)->
console.log error
# received messages:
client.on 'message', (topic, message)->
# message is Buffer
console.log(topic, message.toString())
# client.end()
SSL:
var mqtt = require('mqtt')
var fs = require('fs')
var path = require('path')
var KEY = fs.readFileSync(path.join(__dirname, '/tls-key.pem'))
var CERT = fs.readFileSync(path.join(__dirname, '/tls-cert.pem'))
var TRUSTED_CA_LIST = fs.readFileSync(path.join(__dirname, '/crt.ca.cg.pem'))
var PORT = 8443
var HOST = 'stark'
var options = {
port: PORT,
host: HOST,
key: KEY,
cert: CERT,
rejectUnauthorized: true,
// The CA list will be used to determine if server is authorized
ca: TRUSTED_CA_LIST
}
var client = mqtt.connect(options)
client.subscribe('messages')
client.publish('messages', 'Current time is: ' + new Date())
client.on('message', function (topic, message) {
console.log(message)
})
client.on('connect', function () {
console.log('Connected')
})
Demo: https://github.com/mqttjs/MQTT.js/tree/master/examples
Using mqtt as iOS push mechanism without APNS: https://github.com/Guou/Demo-mqtt-push