这篇文章大部分内容来自方工的教程,少部分来自官方文档

零、MQTT 的概念

mqtt 的作用实现多设备直接的互联与通信,mqtt 是基于 TCP 传输协议上的应用层协议

在 MQTT 协议中有三个角色(发布者/代理服务器/客户端),关系图如下:

image

1. Broker (代理/服务器)

Broker 是 MQTT 架构的核心,扮演着消息中转站的角色。所有客户端发布的消息都必须先发送到 Broker,再由 Broker 根据订阅关系,准确地将消息转发给一个或多个订阅者。Broker 的存在实现了发布者与订阅者之间的完全解耦,它们无需感知对方的存在,也无需在同一时间在线。

2. Client (客户端)

任何连接到 Broker 并通过 MQTT 协议进行通信的设备或应用程序都可称为客户端 (Client)。客户端的角色是动态的,它可以:

  • 仅作为发布者 (Publisher):如一个只负责上报数据的温湿度传感器。
  • 仅作为订阅者 (Subscriber):如一个只负责展示数据的监控大屏。
  • 同时是发布者和订阅者:如一个智能家居中控 App,它既订阅设备状态,又发布控制指令。

image

当前笔记使用的是 mosquitto 代理服务器:

image

一、mosquitto MQTT 服务器安装、环境安装

(1) 搭建 MQTT 服务器

本教程主要参考 Mosquitto 官方文档,采用 APT 包管理器 安装方式。
如果你更熟悉 Docker,也可以通过一条命令快速部署,但 Docker 安装通常占用更多内存,因此这里选择资源占用较低的 APT 安装方式。

一、安装依赖与 MQTT 服务器

执行以下命令,更新软件源并一次性安装所有必要组件:

1
2
sudo apt update
sudo apt install mosquitto mosquitto-clients libssl-dev libcjson-dev -y
组件名称说明
mosquittoMQTT 服务器(Broker)核心程序
mosquitto-clients提供 mosquitto_pubmosquitto_sub 命令行客户端,用于测试
libssl-dev支持 TLS/SSL 加密传输的依赖库
libcjson-dev用于处理 JSON 格式消息的 C 语言库

二、启动与开机自启

1
2
sudo systemctl start mosquitto
sudo systemctl enable mosquitto

查看运行状态:

1
systemctl status mosquitto

看到 active (running) 即表示服务启动成功。


三、配置允许外部访问(可选)

默认情况下 Mosquitto 仅允许本地访问,如需让其他设备连接:

1
sudo vim /etc/mosquitto/mosquitto.conf

在文件末尾添加以下内容:

1
2
listener 1883
allow_anonymous true

保存后重启服务:

1
sudo systemctl restart mosquitto

四、本地测试 MQTT 通信

在两个终端窗口中分别执行:

终端1:订阅消息

1
mosquitto_sub -t test

终端2:发布消息

1
mosquitto_pub -t test -m "hello mqtt"

若终端1收到 hello mqtt,说明 MQTT 服务器运行正常。

五、开启安全加密(可选)

Mosquitto 支持 SSL/TLS 加密通信,可使用 libssl-dev 生成自签证书:

1
sudo openssl req -new -x509 -days 365 -nodes -out /etc/mosquitto/certs/server.crt -keyout /etc/mosquitto/certs/server.key

然后在配置文件中启用:

1
2
3
4
listener 8883
cafile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate false

重启后客户端即可使用加密端口 8883 连接。


(2)、编译开发板运行环境

1, 下载 openssl、cjson、mosquitto 的相关库

为了进行交叉编译,你需要下载这些库的源码包。请从以下官方或可靠的链接下载:

将下载好的源码包统一存放到一个工作目录下,以便后续解压和编译

2、配置安装 openssl 库

a、创建 arm_mqtt_lib 文件夹, 再在此文件夹下创建 openssl 文件夹

1
2
命令1:mkdir ~/arm_mqtt_lib
命令2:mkdir ~/arm_mqtt_lib/openssl

image

b、解压并进入 openssl-1.1.1q 目录

1
2
命令1:tar -xvf openssl-1.1.1q.tar.gz -C ~/arm_mqtt_lib/
命令2:cd ~/arm_mqtt_lib/openssl-1.1.1q/

image

c、配置安装目录并生成 Makefile 文件

1
2
命令:./config no-asm -shared --prefix=/home/gec/arm_mqtt_lib/openssl
注意:/home/gec/这个不同系统是不一样的,除非你用和我一致

image

d、进入 Makefile 文件,将编译工具修改为交叉编译链,修改的地方如下**(vi 或 gedit)**

1
2
3
4
5
PLATFORM=arm
 
CROSS_COMPILE= 
CC=arm-linux-gcc
CXX=arm-linux-g++

image

e、由于 ARM 基本的都是 32 位,所以需要 Makefile 文件中的‘-m64’删除,如下图所示

image

f、执行 make 编译,再执行 make install

1
2
make
make install

执行上述的命令后,会在安装路径中生成相应的文件,如下图所示:

image

3、配置安装 cJSON 库

a、在安装路径下创建 cjson 文件夹

1
mkdir ~/arm_mqtt_lib/cjson

image

b、解压并进入 cJSON-master 目录

1
2
3
cp  cJSON-master.zip   ~/arm_mqtt_lib/   #拷贝库到家目录的arm-lib目录
unzip cJSON-master.zip #进入家目录的arm-lib目录 ,并解压json 库
cd ~/arm_mqtt_lib/cJSON-master/ #进入源码目录

命令 1 现象:

image

命令 2 现象:

image

命令 3 现象:

image

c、修改 Makefile 文件**(vi 或 gedit)**

image

d、执行 make 编译,并执行 make install 安装

1
2
make
make install

会在安装路径中生成相应的文件,如下图所示:

image

4、配置安装mosquitto

a、在安装路径下创建 mosquitto 文件

1
mkdir ~/arm_mqtt_lib/mosquitto

image

b、解压并进入/mosquitto-2.0.9/目录

1
2
tar -xvf mosquitto-2.0.9.tar.gz -C ~/arm_mqtt_lib/
cd ~/arm_mqtt_lib/mosquitto-2.0.9/

命令 1 现象:

image

命令 2 现象:

image

c、修改 config.mk 配置文件**(vi 或 gedit 打开)**

修改配置项

image

添加安装路径和 openssl、cjson 库路径和头文件路径

1
2
3
4
5
6
7
8
9
10
WITH_UUID:=no

CC=arm-linux-gcc
CXX=arm-linux-g++ 
prefix=/home/gec/arm_mqtt_lib/mosquitto
CFLAGS:=-I/home/gec/arm_mqtt_lib/openssl/include
CFLAGS+=-I/home/gec/arm_mqtt_lib/cjson/include
 
LDFLAGS=-L/home/gec/arm_mqtt_lib/openssl/lib -lssl -lcrypto
LDFLAGS+=-L/home/gec/arm_mqtt_lib/cjson/lib -lcjson

image

d、执行 make 编译、并执行 sudo make install

1
2
make
sudo make install

会在安装路径中生成相应的文件,如下图所示:

image

5、查看当前交叉编译工具的设置的默认库路径及头文件路径,并进行复制

注意:去到 openssl、cJson、mosquitto 文件夹下,将它们所有的头文件和库文件都复制到你的交叉编译工具的相关路径下

a、查看自己的交叉编译工具头文件和库文件的路径的命令

1
echo  'main(){}' | arm-linux-gcc -E -v -

交叉编译的头文件路径:

image

注意:这个得看你当前环境编译器,不能固定用上面的路径

b、复制openssl、cJson、mosquitto 的头文件到交叉编译的头文件路径下

1
2
3
cp ~/arm_mqtt_lib/openssl/include/openssl/*  /usr/arm/5.4.0/usr/bin/../lib/gcc/arm-none-linux-gnueabi/5.4.0/include
cp ~/arm_mqtt_lib/cjson/include/cjson/* /usr/arm/5.4.0/usr/bin/../lib/gcc/arm-none-linux-gnueabi/5.4.0/include
cp ~/arm_mqtt_lib/mosquitto/include/* /usr/arm/5.4.0/usr/bin/../lib/gcc/arm-none-linux-gnueabi/5.4.0/include

交叉编译的库文件路径

image

c、复制openssl、cJson、mosquitto 的库文件,到交叉编译的库文件路径下

1
2
3
cp ~/arm_mqtt_lib/openssl/lib/*  /usr/arm/5.4.0/usr/bin/../lib/gcc/arm-none-linux-gnueabi/5.4.0/ -rf
cp ~/arm_mqtt_lib/cjson/lib/* /usr/arm/5.4.0/usr/bin/../lib/gcc/arm-none-linux-gnueabi/5.4.0/ -rf
cp ~/arm_mqtt_lib/mosquitto/lib/* /usr/arm/5.4.0/usr/bin/../lib/gcc/arm-none-linux-gnueabi/5.4.0/ -rf

6、arm 交叉编译程序的时候,后面的程序需要加

1
arm-linux-gcc test.c -o test -lmosquitto -lssl -lcrypto -lcjson

7、执行程序的时候

a、将 openssl、cJson、mosquitto 文件夹的库文件进行打包

1
2
3
前提:去到openssl、cJson、mosquitto文件夹下,将里面的库,全部放到一个文件夹里面
(方工的放在arm_mqtt_lib/文件夹下)
命令:tar -cvf arm_mqtt_lib.tar.gz arm_mqtt_lib/ 

image

b、发送到开发板上的/lib/上面即可

1
2
前提:将"arm_mqtt_lib.tar.gz"发送到开发板上
命令:tar -xvf arm_mqtt_lib.tar.gz -C /lib/

二、mosquitto 库的使用

(1)、前提说明

1、备份文件

a、进入家目录,并将"mosquitto.conf.example"文件复制到家目录上
1
2
cd ~/
cp /etc/mosquitto/mosquitto.conf.example ~/

image

b、启动 mqtt 服务器
1
mosquitto -c ~/mosquitto.conf.example

image

2、配置文件mosquitto.conf.example文件

a、打开"mosquitto.conf.example"文件

1
gedit ~/mosquitto.conf.example

b、修改文件(可远程、允许任何人访问)

1
2
listener 1883或port 1883 
allow_anonymous true

image

注意事项:如果端口出现了端口被占用的情况:

1、先进入到 root 模式

1
sudo su

2、查找 1883 被占用进程

1
sudo lsof -i:1883

3、使用 kill+PID 杀死占用进程即可

4、返回自己的用户

1
exit

(2)、MQTT 发布与订阅的命令

1、前提需要将 MQTT 服务器开启

1
mosquitto -c ~/mosquitto.conf.example

image

2、mqtt 订阅命令

1
mosquitto_sub -t temp   #订阅命令 -t  订阅的主题

出现错误 1:

image

解决方法:

1
sudo cp /usr/local/lib/libmosquitto* /lib/ 

出现错误 2:

image

解决方法:

1
sudo cp /usr/local/lib/libcjson* /lib/

现象:相当于你订阅了 B 站上的一个 UP 主(涉猎游戏、美食、钓鱼),你在等他发布美食主题消息

image

3、mqtt 发布命令

1
发布的命令:mosquitto_pub -t temp -m hello_world      #主题发布 -t 发布的主题 -m 发布的消息

image

(3)、发布函数接口

1. 初始话库函数

int mosquitto_lib_init(void);

  • 功能: 初始化 Mosquitto 库。这是调用任何其他 Mosquitto 函数之前必须执行的第一步。
  • 返回值: 始终返回 MOSQ_ERR_SUCCESS

2. 创建客户端实例

struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *obj);

  • 参数:
    • id (const char *): 客户端 ID。如果设为 NULLclean_sessiontrue,Broker 会自动为其分配一个唯一的 ID。
    • clean_session (bool): 会话清除标志。true 表示建立干净会话,false 表示建立持久会话。
    • obj (void *): 用户自定义数据指针,该指针会传递给所有回调函数,常用于在回调中访问外部数据。
  • 返回值:
    • 成功: 返回指向 mosquitto 结构体的指针。
    • 失败: 返回 NULL

3. 连接到 MQTT Broker

int mosquitto_connect(struct mosquitto *mosq, const char *host, int port, int keepalive);

  • 参数:
    • mosq (struct mosquitto *): mosquitto_new 创建的客户端实例。
    • host (const char *): Broker 的 IP 地址或域名。
    • port (int): Broker 的端口号(默认为 1883)。
    • keepalive (int): 心跳间隔时间(秒)。客户端会在此时间内至少发送一个 PING 包以保持连接。建议值大于 5。
  • 返回值:
    • 成功: MOSQ_ERR_SUCCESS
    • 失败: 返回其他错误码。

4. 发布一条消息到指定的主题

int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain);

  • 参数:
    • mosq (struct mosquitto *): 客户端实例。
    • mid (int *): 指向消息 ID 的指针,通常设为 NULL 让库自动处理。
    • topic (const char *): 消息要发布到的主题。
    • payloadlen (int): 消息内容的长度。
    • payload (const void *): 指向消息内容的指针。
    • qos (int): 服务质量等级 (0, 1, 或 2)。
    • retain (bool): 保留消息标志。true 表示将此消息设为该主题的保留消息。
  • 返回值:
    • 成功: MOSQ_ERR_SUCCESS
    • 失败: 返回其他错误码。

5. 断开与 Broker 的连接

int mosquitto_disconnect(struct mosquitto *mosq);

  • 参数:
    • mosq (struct mosquitto *): 客户端实例。
  • 返回值:
    • 成功: MOSQ_ERR_SUCCESS
    • 失败: 返回其他错误码。

6. 释放连接资源

int mosquitto_lib_cleanup(void);

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
******************************************************************************
* @file test.c
* @author FZetc飞贼
* @version V0.0.1
* @date 2025.06.17
* @brief 给学生的演示例程:mqtt信息发布
* 前提:需要先运行起mqtt服务器
* 编译:
* gcc 001__MQTT信息发布.c -o sub -lmosquitto(需要mqtt的库)
*
******************************************************************************
*/

#include <stdio.h>
// 添加MQTT头文件
#include <mosquitto.h>
#include <string.h>

// MQTT服务器的地址
#define MQTT_ADDR "192.168.13.3"

// MQTT服务器的端口号
#define MQTT_PORT 1883

// 主函数
int main(int argc, char const* argv[]) {
// 1、初始化MQTT资源(a、买了一步电脑、手机)
mosquitto_lib_init();

// 2、创建一个MQTT客户端对象(b、下载了B站、小红书软件,并且开了个号,准备发布素材)
struct mosquitto* client_up = mosquitto_new("jack", true, NULL);
if (client_up == NULL) {
perror("mosquitto_new error!\n");
return -1;
}

// 3、链接到MQTT服务器(c、连接到B站、小红书的服务器了(打开APP))
int ret = mosquitto_connect(client_up, MQTT_ADDR, MQTT_PORT, 60);
if (ret != MOSQ_ERR_SUCCESS) {
perror("mosquitto_connect error!\n");
return -2;
}

// 4、发布消息(d、上传作品、更新动态等)
char msg_buf[1024] = "人居然要吃饭!";
ret = mosquitto_publish(client_up, NULL, "震惊", strlen(msg_buf), msg_buf, 0,
0);
if (ret != 0) {
perror("mosquitto_publish error!\n");
return -3;

} else {
printf("mosquitto_publish 成功!\n");
}

// 5、断开服务器(e、退出app)
mosquitto_disconnect(client_up);

// 6、释放mqtt库资源(f、把电脑、手机给砸了)
mosquitto_lib_cleanup();
}

(4)、订阅函数接口

1.订阅一个或多个主题

int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos);

  • 参数:
    • mosq (struct mosquitto *): 客户端实例。
    • mid (int *): 消息 ID 指针,通常设为 NULL
    • sub (const char *): 要订阅的主题或主题过滤器(可使用通配符 +#)。
    • qos (int): 期望接收此主题消息的最高 QoS 等级 (0, 1, 或 2)。
  • 返回值:
    • 成功: MOSQ_ERR_SUCCESS
    • 失败: 返回其他错误码。

2. 处理接收到的消息

void mosquitto_message_callback_set(struct mosquitto *mosq, void (*on_message)(struct mosquitto *, void *, const struct mosquitto_message *));

  • 功能: 注册一个回调函数,用于处理接收到的消息。

  • 参数:

    • mosq (struct mosquitto *): 客户端实例。
    • on_message: 指向回调函数的指针。当客户端收到任何已订阅主题的消息时,此函数将被调用。
  • 回调函数原型: void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)

    • mosq: 触发回调的客户端实例。

    • obj: mosquitto_new 中设置的用户自定义数据指针。

    • message: 指向 mosquitto_message 结构体的指针,包含了接收到的消息详情。

      1
      2
      3
      4
      5
      6
      7
      8
      struct mosquitto_message {
      int mid; // 消息 ID
      char *topic; // 消息所属的主题
      void *payload; // 消息内容
      int payloadlen; // 内容长度
      int qos; // 消息的 QoS 等级
      bool retain; // 是否为保留消息
      };

3. 循环接收消息

int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets);

  • 功能: 启动一个阻塞的网络循环来处理 MQTT 消息(接收、发送 PING 包等)。此函数会一直运行,直到客户端断开连接。
  • 参数:
    • mosq (struct mosquitto *): 客户端实例。
    • timeout (int): 超时时间(毫秒),通常设为 -1 表示永不超时。
    • max_packets (int): 未使用的参数,保持为 1 即可。
  • 返回值:
    • 正常断开: MOSQ_ERR_SUCCESS
    • 发生错误: 返回相应的错误码。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
******************************************************************************
* @file test.c
* @author FZetc飞贼
* @version V0.0.1
* @date 2025.06.17
* @brief 给学生的演示例程:mqtt信息订阅
* 前提:需要先运行起mqtt服务器
* 编译:
* gcc 002__MQTT信息订阅.c -o sub -lmosquitto(需要mqtt的库)
*
******************************************************************************
*/

#include <stdio.h>
// 添加MQTT头文件
#include <mosquitto.h>
#include <string.h>

// MQTT服务器的地址
#define MQTT_ADDR "192.168.13.x"

// MQTT服务器的端口号
#define MQTT_PORT 1883

/**
* @brief 回调函数
* @note None
* @param id:发布者对象
* arg:mosquitto_new发过来的参数
* msg:消息的结构体(保存发布的信息)
* @retval None
*/
void MSG_CallBack(struct mosquitto* id, void* arg,
const struct mosquitto_message* msg) {
printf("arg == %d\n", *(int*)arg);

printf("发布消息时id:%d\n", msg->mid);
printf("消息的主题:%s\n", msg->topic);
printf("消息的内容:%s\n", (char*)msg->payload);
printf("消息的长度:%d\n", msg->payloadlen);
printf("消息的质量:%d\n", msg->qos);
printf("保留标志:%d\n", msg->retain);
}

// 主函数
int main(int argc, char const* argv[]) {
// 1、初始化MQTT库(a、买了一部电脑、手机)
mosquitto_lib_init();

// 2、创建一个MQTT对象(b、下载B站、小红书)
int num = 110;
struct mosquitto* client_fans = mosquitto_new("rose", true, (void*)&num);
if (client_fans == NULL) {
perror("mosquitto_new error!\n");
return -1;
}

// 3、链接到MQTT服务器(c、连接到B站、小红书的服务器了(打开APP))
int ret = mosquitto_connect(client_fans, MQTT_ADDR, MQTT_PORT, 60);
if (ret != MOSQ_ERR_SUCCESS) {
perror("mosquitto_connect error!\n");
return -2;
}

// 4、订阅主题(d、关注你的up主的某个话题(震惊))
ret = mosquitto_subscribe(client_fans, NULL, "震惊", 0);
if (ret != MOSQ_ERR_SUCCESS) {
perror("mosquitto_subscribe error!\n");
return -3;
} else {
printf("mosquitto_subscribe 成功!\n");
}

// 5、设置消息回调函数(e、浏览这个话题的相关信息)
mosquitto_message_callback_set(client_fans, MSG_CallBack);

// 6、循环处理消息时间,检测是否有消息触发
mosquitto_loop_forever(client_fans, -1, 1);
}

其它参数设置

服务质量 (QoS - Quality of Service)

QoS 是 MQTT 中确保消息可靠传输的关键机制,共分为三个等级:

  • QoS 0 (最多一次): “发后即忘”,以最高的性能发送消息,但不保证消息一定能送达。适用于数据更新频率高且允许少量丢失的场景(如传感器实时读数)。
  • QoS 1 (至少一次): 确保消息至少到达接收方一次,但由于重传机制,可能会导致消息重复。适用于要求消息必须送达,且接收端有能力处理重复数据的场景(如远程开关指令)。
  • QoS 2 (只有一次): 最可靠的等级,通过复杂的握手流程确保消息有且仅有一次被送达,但性能开销最大。适用于绝对不允许消息丢失或重复的金融、计费等关键业务场景。

主题 (Topic) 与通配符

主题 (Topic) 是 MQTT 的消息路由核心,它是一个使用 / 分隔层级的 UTF-8 字符串,类似于文件系统路径。

  • 主题层级 (Topic Levels): 例如 home/livingroom/temperature,这种层级结构使得主题管理和订阅变得非常灵活。
  • 通配符 (Wildcards):
    • + (单层通配符): 匹配一个层级。例如,订阅 home/+/light 可以接收到 home/livingroom/lighthome/bedroom/light 的消息。
    • # (多层通配符): 匹配零个或多个层级,并且必须是主题的最后一级。例如,订阅 home/# 可以接收到所有以 home/ 开头的消息,如 home/livingroom/temperaturehome/security/door/status

高级特性

  • 保留消息 (Retained Message): 当 Broker 接收到一个设置了“保留”标志的发布消息时,它会存储该主题的最后一条消息。当有新的订阅者订阅该主题时,Broker 会立即将这条被保留的消息发送给它。这对于让新上线的设备立即获取最新状态非常有用。
  • 遗嘱消息 (Last Will and Testament - LWT): 客户端在连接 Broker 时可以预设一条“遗嘱消息”。如果客户端因意外(如断电、网络故障)而异常断开连接,Broker 会自动将这条遗嘱消息发布到指定的遗嘱主题上。这常用于监控设备的在线状态。
  • 清除会话 (Clean Session):
    • true:当客户端断开连接时,Broker 会清除其所有订阅关系和离线消息。每次连接都是一个全新的开始。
    • false:客户端断开后,Broker 会保留其订阅关系和 QoS > 0 的离线消息,直到客户端重新连接后发送给它。这对于网络不稳定的移动设备至关重要。
  • 心跳机制 (Keep Alive): 客户端在连接时与 Broker 协商一个时间间隔(秒)。在此期间若无任何消息交互,客户端会发送一个极小的 PINGREQ 包,Broker 则响应 PINGRESP 包,以此确认双方连接仍然有效。如果 Broker 在 1.5 倍的 Keep Alive 时间内未收到客户端的任何消息,则会判定连接断开,并触发 LWT 机制。