零、MQTT 的概念
mqtt 的作用实现多设备直接的互联与通信,mqtt 是基于 TCP 传输协议上的应用层协议
在 MQTT 协议中有三个角色(发布者/代理服务器/客户端),关系图如下:
1. Broker (代理/服务器)
Broker 是 MQTT 架构的核心,扮演着消息中转站的角色。所有客户端发布的消息都必须先发送到 Broker,再由 Broker 根据订阅关系,准确地将消息转发给一个或多个订阅者。Broker 的存在实现了发布者与订阅者之间的完全解耦,它们无需感知对方的存在,也无需在同一时间在线。
2. Client (客户端)
任何连接到 Broker 并通过 MQTT 协议进行通信的设备或应用程序都可称为客户端 (Client)。客户端的角色是动态的,它可以:
- 仅作为发布者 (Publisher):如一个只负责上报数据的温湿度传感器。
- 仅作为订阅者 (Subscriber):如一个只负责展示数据的监控大屏。
- 同时是发布者和订阅者:如一个智能家居中控 App,它既订阅设备状态,又发布控制指令。
当前笔记使用的是 mosquitto 代理服务器:
一、mosquitto MQTT 服务器安装、环境安装
(1) 搭建 MQTT 服务器
本教程主要参考 Mosquitto 官方文档,采用 APT 包管理器 安装方式。
如果你更熟悉 Docker,也可以通过一条命令快速部署,但 Docker 安装通常占用更多内存,因此这里选择资源占用较低的 APT 安装方式。
一、安装依赖与 MQTT 服务器
执行以下命令,更新软件源并一次性安装所有必要组件:
1 | sudo apt update |
| 组件名称 | 说明 |
|---|---|
| mosquitto | MQTT 服务器(Broker)核心程序 |
| mosquitto-clients | 提供 mosquitto_pub 与 mosquitto_sub 命令行客户端,用于测试 |
| libssl-dev | 支持 TLS/SSL 加密传输的依赖库 |
| libcjson-dev | 用于处理 JSON 格式消息的 C 语言库 |
二、启动与开机自启
1 | sudo systemctl start mosquitto |
查看运行状态:
1 | systemctl status mosquitto |
看到 active (running) 即表示服务启动成功。
三、配置允许外部访问(可选)
默认情况下 Mosquitto 仅允许本地访问,如需让其他设备连接:
1 | sudo vim /etc/mosquitto/mosquitto.conf |
在文件末尾添加以下内容:
1 | listener 1883 |
保存后重启服务:
1 | sudo systemctl restart mosquitto |
四、本地测试 MQTT 通信
在两个终端窗口中分别执行:
终端1:订阅消息
1 | mosquitto_sub -t test |
终端2:发布消息
1 | mosquitto_pub -t test -m "hello mqtt" |
若终端1收到 hello mqtt,说明 MQTT 服务器运行正常。
五、开启安全加密(可选)
Mosquitto 支持 SSL/TLS 加密通信,可使用 libssl-dev 生成自签证书:
1 | sudo openssl req -new -x509 -days 365 -nodes -out /etc/mosquitto/certs/server.crt -keyout /etc/mosquitto/certs/server.key |
然后在配置文件中启用:
1 | listener 8883 |
重启后客户端即可使用加密端口 8883 连接。
(2)、编译开发板运行环境
1, 下载 openssl、cjson、mosquitto 的相关库
为了进行交叉编译,你需要下载这些库的源码包。请从以下官方或可靠的链接下载:
OpenSSL:
- 官方源码下载页: https://www.openssl.org/source/
- 说明: OpenSSL 版本迭代较快,笔记中使用的
1.1.1q是一个长期支持 (LTS) 版本,你可以在旧版本存档中找到它。对于新的项目,建议查看官网最新的稳定版
cJSON:
- GitHub Releases 页面: https://github.com/DaveGamble/cJSON/releases
- 说明: cJSON 是一个轻量级库,直接下载最新 release 的源码压缩包即可
Mosquitto:
- 官方下载页: https://mosquitto.org/download/
- 说明: 在页面中找到 “Source” 部分,下载
.tar.gz格式的源码包。笔记中使用的是2.0.9版本,你可以在 https://mosquitto.org/files/source/ 中找到所有历史版本
将下载好的源码包统一存放到一个工作目录下,以便后续解压和编译
2、配置安装 openssl 库
a、创建 arm_mqtt_lib 文件夹, 再在此文件夹下创建 openssl 文件夹
1 | 命令1:mkdir ~/arm_mqtt_lib |
b、解压并进入 openssl-1.1.1q 目录
1 | 命令1:tar -xvf openssl-1.1.1q.tar.gz -C ~/arm_mqtt_lib/ |
c、配置安装目录并生成 Makefile 文件
1 | 命令:./config no-asm -shared --prefix=/home/gec/arm_mqtt_lib/openssl |
d、进入 Makefile 文件,将编译工具修改为交叉编译链,修改的地方如下**(vi 或 gedit)**
1 | PLATFORM=arm |
e、由于 ARM 基本的都是 32 位,所以需要 Makefile 文件中的‘-m64’删除,如下图所示
f、执行 make 编译,再执行 make install
1 | make |
执行上述的命令后,会在安装路径中生成相应的文件,如下图所示:
3、配置安装 cJSON 库
a、在安装路径下创建 cjson 文件夹
1 | mkdir ~/arm_mqtt_lib/cjson |
b、解压并进入 cJSON-master 目录
1 | cp cJSON-master.zip ~/arm_mqtt_lib/ #拷贝库到家目录的arm-lib目录 |
命令 1 现象:
命令 2 现象:
命令 3 现象:
c、修改 Makefile 文件**(vi 或 gedit)**
d、执行 make 编译,并执行 make install 安装
1 | make |
会在安装路径中生成相应的文件,如下图所示:
4、配置安装mosquitto 库
a、在安装路径下创建 mosquitto 文件
1 | mkdir ~/arm_mqtt_lib/mosquitto |
b、解压并进入/mosquitto-2.0.9/目录
1 | tar -xvf mosquitto-2.0.9.tar.gz -C ~/arm_mqtt_lib/ |
命令 1 现象:
命令 2 现象:
c、修改 config.mk 配置文件**(vi 或 gedit 打开)**
修改配置项
添加安装路径和 openssl、cjson 库路径和头文件路径
1 | WITH_UUID:=no |
d、执行 make 编译、并执行 sudo make install
1 | make |
会在安装路径中生成相应的文件,如下图所示:
5、查看当前交叉编译工具的设置的默认库路径及头文件路径,并进行复制
注意:去到 openssl、cJson、mosquitto 文件夹下,将它们所有的头文件和库文件都复制到你的交叉编译工具的相关路径下
a、查看自己的交叉编译工具头文件和库文件的路径的命令
1 | echo 'main(){}' | arm-linux-gcc -E -v - |
交叉编译的头文件路径:
注意:这个得看你当前环境编译器,不能固定用上面的路径
b、复制openssl、cJson、mosquitto 的头文件到交叉编译的头文件路径下
1 | cp ~/arm_mqtt_lib/openssl/include/openssl/* /usr/arm/5.4.0/usr/bin/../lib/gcc/arm-none-linux-gnueabi/5.4.0/include |
交叉编译的库文件路径
c、复制openssl、cJson、mosquitto 的库文件,到交叉编译的库文件路径下
1 | cp ~/arm_mqtt_lib/openssl/lib/* /usr/arm/5.4.0/usr/bin/../lib/gcc/arm-none-linux-gnueabi/5.4.0/ -rf |
6、arm 交叉编译程序的时候,后面的程序需要加
1 | arm-linux-gcc test.c -o test -lmosquitto -lssl -lcrypto -lcjson |
7、执行程序的时候
a、将 openssl、cJson、mosquitto 文件夹的库文件进行打包
1 | 前提:去到openssl、cJson、mosquitto文件夹下,将里面的库,全部放到一个文件夹里面 |
b、发送到开发板上的/lib/上面即可
1 | 前提:将"arm_mqtt_lib.tar.gz"发送到开发板上 |
二、mosquitto 库的使用
(1)、前提说明
1、备份文件
a、进入家目录,并将"mosquitto.conf.example"文件复制到家目录上
1 | cd ~/ |
b、启动 mqtt 服务器
1 | mosquitto -c ~/mosquitto.conf.example |
2、配置文件mosquitto.conf.example文件
a、打开"mosquitto.conf.example"文件
1 | gedit ~/mosquitto.conf.example |
b、修改文件(可远程、允许任何人访问)
1 | listener 1883或port 1883 |
注意事项:如果端口出现了端口被占用的情况:
1、先进入到 root 模式
1 | sudo su |
2、查找 1883 被占用进程
1 | sudo lsof -i:1883 |
3、使用 kill+PID 杀死占用进程即可
4、返回自己的用户
1 | exit |
(2)、MQTT 发布与订阅的命令
1、前提需要将 MQTT 服务器开启
1 | mosquitto -c ~/mosquitto.conf.example |
2、mqtt 订阅命令
1 | mosquitto_sub -t temp #订阅命令 -t 订阅的主题 |
出现错误 1:
解决方法:
1 | sudo cp /usr/local/lib/libmosquitto* /lib/ |
出现错误 2:
解决方法:
1 | sudo cp /usr/local/lib/libcjson* /lib/ |
现象:相当于你订阅了 B 站上的一个 UP 主(涉猎游戏、美食、钓鱼),你在等他发布美食主题消息
3、mqtt 发布命令
1 | 发布的命令:mosquitto_pub -t temp -m hello_world #主题发布 -t 发布的主题 -m 发布的消息 |
(3)、发布函数接口
1. 初始话库函数
int mosquitto_lib_init(void);
- 功能: 初始化 Mosquitto 库。这是调用任何其他 Mosquitto 函数之前必须执行的第一步。
- 返回值: 始终返回
MOSQ_ERR_SUCCESS。
2. 创建客户端实例
struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *obj);
- 参数:
id(const char *): 客户端 ID。如果设为NULL且clean_session为true,Broker 会自动为其分配一个唯一的 ID。clean_session(bool): 会话清除标志。true表示建立干净会话,false表示建立持久会话。obj(void *): 用户自定义数据指针,该指针会传递给所有回调函数,常用于在回调中访问外部数据。
- 返回值:
- 成功: 返回指向
mosquitto结构体的指针。 - 失败: 返回
NULL。
- 成功: 返回指向
3. 连接到 MQTT Broker
int mosquitto_connect(struct mosquitto *mosq, const char *host, int port, int keepalive);
- 参数:
mosq(struct mosquitto *):mosquitto_new创建的客户端实例。host(const char *): Broker 的 IP 地址或域名。port(int): Broker 的端口号(默认为 1883)。keepalive(int): 心跳间隔时间(秒)。客户端会在此时间内至少发送一个 PING 包以保持连接。建议值大于 5。
- 返回值:
- 成功:
MOSQ_ERR_SUCCESS。 - 失败: 返回其他错误码。
- 成功:
4. 发布一条消息到指定的主题
int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain);
- 参数:
mosq(struct mosquitto *): 客户端实例。mid(int *): 指向消息 ID 的指针,通常设为NULL让库自动处理。topic(const char *): 消息要发布到的主题。payloadlen(int): 消息内容的长度。payload(const void *): 指向消息内容的指针。qos(int): 服务质量等级 (0, 1, 或 2)。retain(bool): 保留消息标志。true表示将此消息设为该主题的保留消息。
- 返回值:
- 成功:
MOSQ_ERR_SUCCESS。 - 失败: 返回其他错误码。
- 成功:
5. 断开与 Broker 的连接
int mosquitto_disconnect(struct mosquitto *mosq);
- 参数:
mosq(struct mosquitto *): 客户端实例。
- 返回值:
- 成功:
MOSQ_ERR_SUCCESS。 - 失败: 返回其他错误码。
- 成功:
6. 释放连接资源
int mosquitto_lib_cleanup(void);
示例代码:
1 | /** |
(4)、订阅函数接口
1.订阅一个或多个主题
int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos);
- 参数:
mosq(struct mosquitto *): 客户端实例。mid(int *): 消息 ID 指针,通常设为NULL。sub(const char *): 要订阅的主题或主题过滤器(可使用通配符+和#)。qos(int): 期望接收此主题消息的最高 QoS 等级 (0, 1, 或 2)。
- 返回值:
- 成功:
MOSQ_ERR_SUCCESS。 - 失败: 返回其他错误码。
- 成功:
2. 处理接收到的消息
void mosquitto_message_callback_set(struct mosquitto *mosq, void (*on_message)(struct mosquitto *, void *, const struct mosquitto_message *));
功能: 注册一个回调函数,用于处理接收到的消息。
参数:
mosq(struct mosquitto *): 客户端实例。on_message: 指向回调函数的指针。当客户端收到任何已订阅主题的消息时,此函数将被调用。
回调函数原型:
void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)mosq: 触发回调的客户端实例。obj:mosquitto_new中设置的用户自定义数据指针。message: 指向mosquitto_message结构体的指针,包含了接收到的消息详情。1
2
3
4
5
6
7
8struct mosquitto_message {
int mid; // 消息 ID
char *topic; // 消息所属的主题
void *payload; // 消息内容
int payloadlen; // 内容长度
int qos; // 消息的 QoS 等级
bool retain; // 是否为保留消息
};
3. 循环接收消息
int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets);
- 功能: 启动一个阻塞的网络循环来处理 MQTT 消息(接收、发送 PING 包等)。此函数会一直运行,直到客户端断开连接。
- 参数:
mosq(struct mosquitto *): 客户端实例。timeout(int): 超时时间(毫秒),通常设为-1表示永不超时。max_packets(int): 未使用的参数,保持为1即可。
- 返回值:
- 正常断开:
MOSQ_ERR_SUCCESS。 - 发生错误: 返回相应的错误码。
- 正常断开:
示例代码:
1 | /** |
其它参数设置
服务质量 (QoS - Quality of Service)
QoS 是 MQTT 中确保消息可靠传输的关键机制,共分为三个等级:
- QoS 0 (最多一次): “发后即忘”,以最高的性能发送消息,但不保证消息一定能送达。适用于数据更新频率高且允许少量丢失的场景(如传感器实时读数)。
- QoS 1 (至少一次): 确保消息至少到达接收方一次,但由于重传机制,可能会导致消息重复。适用于要求消息必须送达,且接收端有能力处理重复数据的场景(如远程开关指令)。
- QoS 2 (只有一次): 最可靠的等级,通过复杂的握手流程确保消息有且仅有一次被送达,但性能开销最大。适用于绝对不允许消息丢失或重复的金融、计费等关键业务场景。
主题 (Topic) 与通配符
主题 (Topic) 是 MQTT 的消息路由核心,它是一个使用 / 分隔层级的 UTF-8 字符串,类似于文件系统路径。
- 主题层级 (Topic Levels): 例如
home/livingroom/temperature,这种层级结构使得主题管理和订阅变得非常灵活。 - 通配符 (Wildcards):
+(单层通配符): 匹配一个层级。例如,订阅home/+/light可以接收到home/livingroom/light和home/bedroom/light的消息。#(多层通配符): 匹配零个或多个层级,并且必须是主题的最后一级。例如,订阅home/#可以接收到所有以home/开头的消息,如home/livingroom/temperature和home/security/door/status。
高级特性
- 保留消息 (Retained Message): 当 Broker 接收到一个设置了“保留”标志的发布消息时,它会存储该主题的最后一条消息。当有新的订阅者订阅该主题时,Broker 会立即将这条被保留的消息发送给它。这对于让新上线的设备立即获取最新状态非常有用。
- 遗嘱消息 (Last Will and Testament - LWT): 客户端在连接 Broker 时可以预设一条“遗嘱消息”。如果客户端因意外(如断电、网络故障)而异常断开连接,Broker 会自动将这条遗嘱消息发布到指定的遗嘱主题上。这常用于监控设备的在线状态。
- 清除会话 (Clean Session):
true:当客户端断开连接时,Broker 会清除其所有订阅关系和离线消息。每次连接都是一个全新的开始。false:客户端断开后,Broker 会保留其订阅关系和 QoS > 0 的离线消息,直到客户端重新连接后发送给它。这对于网络不稳定的移动设备至关重要。
- 心跳机制 (Keep Alive): 客户端在连接时与 Broker 协商一个时间间隔(秒)。在此期间若无任何消息交互,客户端会发送一个极小的 PINGREQ 包,Broker 则响应 PINGRESP 包,以此确认双方连接仍然有效。如果 Broker 在 1.5 倍的 Keep Alive 时间内未收到客户端的任何消息,则会判定连接断开,并触发 LWT 机制。































