介绍

MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的消息传输协议,专门设计用于物联网(IoT)和机器对机器(M2M)通信。MQTT 最初由 IBM 的 Andy Stanford-Clark 和 Arlen Nipper 在1999年开发,并于2013年成为 OASIS 标准。

以下是 MQTT 的一些关键特点和优势:

  1. 轻量级和低带宽消耗: MQTT 是一种轻量级的协议,具有小型的消息头,因此非常适合于带宽受限或网络连接不稳定的环境。它减少了网络流量和电池消耗,使其成为物联网设备的理想选择。
  2. 基于发布/订阅模式: MQTT 使用发布/订阅模式,其中客户端(发布者)发布消息到主题,而其他客户端(订阅者)订阅感兴趣的主题。这种模式使得多个客户端能够实时收到感兴趣的消息,实现了解耦和灵活性。
  3. 异步通信: MQTT 支持异步通信,发布者发布消息后不需要等待接收者的响应,从而提高了系统的响应速度和效率。
  4. 服务质量(QoS)支持: MQTT 提供了三种不同的服务质量级别(QoS):至多一次(QoS 0)、至少一次(QoS 1)和精确一次(QoS 2)。这使得开发者能够根据应用场景和需求选择适当的消息传递质量。
  5. 持久化消息: MQTT 支持持久化消息,在断开连接后仍然可以保存未传递的消息。这对于需要持久化状态或确保消息不会丢失的应用程序非常重要。
  6. 灵活性和可扩展性: MQTT 是一个灵活和可扩展的协议,允许在不同的网络和应用场景中进行定制和部署。它可以在各种设备和平台上运行,并与现有的网络基础设施集成。

MQTT 是一种简单、可靠、灵活的通信协议,适用于各种物联网和机器对机器通信应用,包括传感器数据采集、远程监控、智能家居、工业自动化等领域。其轻量级的特性、基于发布/订阅模式和灵活的配置使其成为物联网领域的主流通信协议之一。

Docker部署MQTT代理服务

MQTT 代理服务是一种中间件,用于在物联网(IoT)和其他实时通信应用中进行消息传输。其作用主要包括以下几个方面:

  1. 消息路由和传输: MQTT 代理负责接收来自客户端的消息,并将这些消息传输到指定的目标客户端。它负责将消息从发布者路由到订阅者,以确保消息的可靠传输。
  2. 消息存储和持久化: MQTT 代理通常具有持久化功能,可以将消息存储在磁盘上,以确保在断开连接或重新启动后不会丢失消息。这对于某些应用程序很重要,特别是需要处理重要数据或需要在断网时保持通信的应用。
  3. 安全性和身份验证: MQTT 代理通常提供身份验证和授权功能,以确保只有经过授权的客户端能够连接和发布/订阅消息。它还可以提供加密通信,确保消息在传输过程中的安全性。
  4. QoS 级别管理: MQTT 代理可以管理消息的服务质量(QoS),并确保根据需要按照不同的 QoS 等级传输消息。这包括确保消息至少被传输一次(至多一次传输)、确保消息至少被传输一次并且只被传输一次(至少一次传输)、以及确保消息被传输一次并且只被传输一次(精确一次传输)等。
  5. 连接管理: MQTT 代理负责管理客户端与服务器之间的连接,包括连接的建立、维护和断开。它可以处理大量的客户端连接,并确保服务器的稳定性和可靠性。

配置项

配置config/mosquitto.conf文件

内容

1
2
3
4
5
6
persistence true
persistence_location /mosquitto/data
log_dest file /mosquitto/log/mosquitto.log
listener 9001
port 1883
allow_anonymous true
  1. persistence true: 这个配置项用于启用 MQTT 代理的持久化机制。持久化可以确保在 MQTT 代理重新启动后,之前的会话状态和消息仍然被保留下来。
  2. persistence_location /mosquitto/data: 这个配置项指定了持久化文件的存储位置。在这个例子中,持久化文件将存储在 /mosquitto/data 目录中。
  3. log_dest file /mosquitto/log/mosquitto.log: 这个配置项指定了 MQTT 代理日志文件的存储位置和类型。在这个例子中,日志将被写入到 /mosquitto/log/mosquitto.log 文件中。
  4. listener 9001: 这个配置项指定了 MQTT 代理监听的端口号。在这个例子中,代理将在 9001 端口上监听客户端连接。
  5. port 1883: 这个配置项也是指定 MQTT 代理监听的端口号。在这个例子中,代理将在 1883 端口上监听客户端连接。通常,客户端将通过此端口连接到 MQTT 代理。
  6. allow_anonymous true: 这个配置项指定了是否允许匿名连接到 MQTT 代理,即是否允许客户端连接而不需要提供用户名和密码。在这个例子中,允许匿名连接。

Docker-compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
version: "3"

services:
mqtt:
image: eclipse-mosquitto:2.0.18
container_name: mqtt
restart: always
volumes:
- $PWD/config:/mosquitto/config
- $PWD/data:/mosquitto/data
- $PWD/log:/mosquitto/log
ports:
- 1883:1883
- 9001:9001

Demo

golang

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main

import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"os"
"os/signal"
"time"
)

func onMessageReceived(client mqtt.Client, message mqtt.Message) {
fmt.Printf("Received message on topic: %s\n", message.Topic())
fmt.Printf("Message: %s\n", message.Payload())
}

func main() {
// MQTT broker 地址
broker := "tcp://localhost:1883"
// MQTT 客户端ID,确保其唯一
clientID := "mqtt-demo-client"
// 要订阅的主题
topic := "demo/topic"

// 创建 MQTT 连接选项
opts := mqtt.NewClientOptions().AddBroker(broker).SetClientID(clientID)

// 创建 MQTT 客户端
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}

// 定义消息处理函数
client.Subscribe(topic, 1, onMessageReceived)

fmt.Printf("Connected to broker %s and subscribed to topic %s\n", broker, topic)

for i := 0; i < 5; i++ {
// 发布消息
message := "Hello this is go " + fmt.Sprintf("%v", i)
token := client.Publish(topic, 1, false, message) // 设置 QoS 为 1
token.Wait()
time.Sleep(time.Second * 1)
}

// 为了防止程序立即退出,等待信号
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c

fmt.Println("Disconnecting from broker...")
client.Disconnect(250)
fmt.Println("Disconnected.")
}

python

首先安装package

1
install paho-mqtt==1.6.1

详细的demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import paho.mqtt.client as mqtt
import time

# import pkg_resources

# install paho-mqtt==1.6.1

# 获取mqtt 版本
# 获取 Paho MQTT 客户端库的安装路径
# mqtt_path = mqtt.__file__
#
# # 使用 pkg_resources 模块读取模块的元数据信息
# version = pkg_resources.get_distribution("paho-mqtt").version
#
# # 打印 Paho MQTT 客户端库的版本
# print("Paho MQTT 客户端库版本:", version)

# MQTT 代理地址和端口
broker_address = "localhost"
broker_port = 1883

# 客户端ID
client_id = "python-mqtt-demo"

# 订阅的主题
topic = "demo/topic"


# 连接成功回调函数
def on_connect(client, userdata, flags, rc):
print("Connected to MQTT broker with result code " + str(rc))
# 订阅主题时指定 QoS 级别为 QoS 2
client.subscribe(topic, qos=2)


# 消息到达回调函数
def on_message(client, userdata, msg):
print("Received message on topic: " + msg.topic)
print("Message: " + str(msg.payload))


# 创建 MQTT 客户端实例,并指定回调 API 版本为 1
client = mqtt.Client(client_id)

# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message

# 设置回调 API 版本
client.callback_api_version = 1

# 连接到 MQTT 代理
client.connect(broker_address, broker_port, 60)

# 保持连接
client.loop_start()

# 发布消息
i = 0
while i < 5:
message = "Hello this is python " + str(i)
# 发布消息时指定 QoS 级别为 QoS 1
client.publish(topic, message, qos=1)
print("Published message: " + message)
time.sleep(5)
i += 1

# 等待接收消息
time.sleep(5)

# 断开连接
client.loop_stop()
client.disconnect()