原文 Sol - An MQTT broker from scratch. Part 1 - The protocol
前言
我已经在物联网领域工作有一段时间了,这段时间里我一直在处理物联网架构相关的工作,探索物联网系统开发的最佳模式,研究相关的协议和标准,例如MQTT。
因为我一直在渴望着提升我编程能力的机会,我觉得在物联网方向深入研究会很有趣也很有好处。因此,我再一次 git init
了一个项目,并且要通过写下这些博客来挑战我自己,强迫自己进步。
Sol 是一个C语言项目,一个超级简单的Linux平台的MQTT broker,支持MQTT 3.3.1,不兼容旧的版本,非常类似于轻量级的 Mosquitto
(虽然这玩意已经是个轻量级软件了)。由于现在有很多种类的MQTT客户端,所以测试起来会比较简单。最终的成品可能会成为一个更简洁,功能更丰富的软件,我们要创造这个功能的最小化实现。顺便提一下,Sol 这个名字的来源有一半的原因是我对短名称的偏好,另一半的原因则是火星日 (The Martian docet)。或者说,Sol 可能代表Scrappy Ol’ Loser。emmmm
注意:这个项目一直到最后才会编译,你需要跟写所有的代码步骤。如果你想要在中途进行测试,我建议你自己建一个主函数来做这些测试或者修改。
一步一步来,我一般会创建一个这样的文件结构来初始化我的C项目:
这里是Github上的仓库。
我会尝试着一步一步描述 Sol 的开发过程,但我也不会贴上所有的代码,只会解释关键的地方。你想要学习的最好方式依然是亲自编写、编译、修改代码。
这将是一系列文章,每篇文章都将讨论并主要实施项目的一个概念/模块:
我想说,虽然 sol 会是一个完全功能的 broker,但仍有很大改进和优化空间,以及可能的一些隐藏功能(俗称BUG)。
架构设计
broker
的本质是一个中间件,它接受来自多个客户端(生产者)的输入,并使用抽象方法将其转发给一组目标客户端(消费者),这种抽象方法用于定义和管理这些客户端组,形式为 channel 或 topic(根据协议标准)。与 IRC 频道或通用聊天中的等效概念非常相似,每个消费者客户端都可以订阅 topic
,以便接收其他客户端发布到这些 topic
的所有消息。
第一个想到的是建立在某种数据结构之上的服务器,这种数据结构可以轻松管理这些 topic
和连接的客户端(无论是生产者还是消费者)。客户端收到的每个消息都必须转发给所有订阅了该消息指定 topic
的其他已连接客户端。
让我们试试这种方法,使用一个 TCP 服务器和一个用于处理数据流的模块。实现服务器的方法有很多,包括线程、fork 进程和多路 I/O,这次我将尝试用多路 I/O 的方式。
我们先使用单线程多路 I/O 服务器,未来有可能进行多线程拓展。实际上,用于多路复用的 epoll 接口是线程安全的。
MQTT结构
首先,我们需要基于官方文档,制作一些描述 MQTT 协议数据包的结构体。
从 opcode 表和 MQTT 头开始,基于文档,每个数据包都包含以下三部分:
- fixed header(必选)
- variable header(可选)
- payload(可选)
Fixed Header的第一个字节包括了 MQTT type
和 Flags
,第二到第五个字节使用可变编码的方式,存储剩余数据包的长度。
Flags并不是强制填写的,只是一些控制类数据,内容如下:
- Dup flag: 当消息被发送超过一次时使用
- QoS level: 有以下三种取值
AT_MOST_ONCE
=0, AT_LEAST_ONCE
=1 and EXACTLY_ONCE
=2
- Retain flag: 保留标志,有保留标志的消息被发布到主题时,消息会被保留,之后连接进来的客户端也可以收到该消息。保留消息可以被另一条保留消息覆盖。
所以,打开 Vim (或者其他任何你喜欢的IDE),创建名为 mqtt.h
的头文件,开始写关于 Fixed Header 的数据结构吧:
最上方的两个 #define
定义了 MQTT Fixed Header 和 MQTT ACK 的长度。
正如你在代码中看到的,我们利用了 union——一种可以在内存中的同一位置存储多种表示形式的结构——来表示一个字节。换句话说,与普通的 struct
不同,union
中只能有一个字段具有值(在此例中是byte或bits)。它们的内存位置是共享的,因此,通过使用位字段,我们可以有效地操作单个比特或字节的一部分。
CONNECT
我们要定义的第一个控制数据包是 CONNECT。 这是当客户端建立新连接时必须发送的第一个数据包,CONNECT 包必须是有且仅有一个,否则视为与协议不符,服务端需要断开连接。
对于每个 CONNECT,服务端需要在响应中回复 CONNACK。
按照这个模式,结合 MQTT v3.1.1 的文档,其他数据包的定义也比较简单了。
SUBSCRIBE UNSUBSCRIBE PUBLISH ACK等
接下来我们处理 SUBSCRIBE,UNSUBSCRIBE 和 PUBLISH。SUBSCRIBE 必须要使用 SUBACK 来响应,其他的都可以使用通用 ACK,并设置 typedef 字段的值来响应。
剩余的这一类ACK包:
- PUBACK
- PUBREC
- PUBREL
- PUBCOMP
- UNSUBACK
- PINGREQ
- PINGRESP
- DISCONNECT
因为有相同的结构,所以都可以通过 typedef 来定义,只是语义有所不同。最后一个 DISCONNECT,虽然严格来说不是一个 ACK,但是也有相同的结构。
MQTT
最终我们可以定义一个通用 MQTT 包,包括上面的一切,后续我们所有的 MQTT 数据包都可以用这个结构来表示。
MQTT函数
编码解码
现在我们继续定义一些公共函数。在 src/mqtt.h
中,我们需要考虑其他模块使用 MQTT 协议时会用到哪些函数。
为了使用 MQTT 协议处理通信,我们基本上需要 4 个函数,其中客户端向服务端有 2 个,服务端向客户端也是 2 个:
- 一个编码函数(总之就是把内存里的数据做成二进制流,这里不讨论术语)
- 一个解码函数(就是从二进制流恢复成内存结构)
我们还需要 2 个函数来处理 fixed head 部分中变长的 Remaining Length 字段。
内存操作
我们还需要一些工具函数,用来进行基于数据包的内存分配、释放,这里没啥特别的。
函数实现
MQTT包编解码接口
好了,我们现在有一个不错的头文件了,定义了我们通讯协议中的所有内容,现在我们需要实现这些函数了。为了能够实现这些功能,首先我们要定义几个私有的帮助函数,用来进行编码和解码的动作。这些函数会被公有函数unpack_mqtt_packet
和 pack_mqtt_packet
调用。
二进制流编解码实现
在继续实现 src/mqtt.h
上所有定义的函数之前,我们需要实现一些辅助函数,以简化每个接收到的数据包的编码解码过程。
让我们快速搞定这部分,这一块只是简单的序列化和反序列化操作而已(记得用Big-endian就行)。
以及相应的实现
这样我们就完成了字节流和数据类型的双向转换工作。
Remaining Length编解码实现
完成了 pack
部分后,我们需要把他们运用在我们的MQTT包里,首先当然是:
第一步我们可以实现对 Fixed Header 中的 Remaining Length 字段的操作。MQTT文档中提供了这一段实现的伪代码,我们可以仿写一下。
让我们来看看 Remaining Length 如何用1-4个变长的Byte来表示剩余包的长度。
Remaining Length 表示的是数据包剩余部分的长度,包括 variable header 和 payload。Remaining Length 中表示的长度不包括 Remaining Length 字段本身所占用的长度。
Remaining Length 的编码使用了一种可变长度编码方案,该方案对 127 以下的值使用单个字节。较大的值则按以下方式处理:每个字节的低 7 位编码数据,高位用于指示是否存在后续字节。因此,每个字节编码 128 个值和一个 “延续位”。Remaining Length 字段的最大字节数为 4。
MQTT的文档已经描述的非常清晰,我们只需要实现。
CONNECT 解码实现
好了,现在我们可以完整的解析 Fixed Header 了,接下来我们试着解码 CONNECT 包。
CONNECT 是一个有很多flags的包,而且长度仅次于 PUBLISH 包。
CONNECT 包的内容包括:
- Fixed Header 中的 MQTT type + Flags,高4位(MQTT type)(称为MSB)的值是
1
,表示Connect type
,低4位(Flags)(LSB)保留
- Fixed Header 中的变长 Remaining Length,表示剩余部分的长度
- Variable Header,由四个字段组成:
- Protocol Name
- Protocol Level
- Connect Flags
- Keep Alive
- 可能存在或者不存在的 payload(基于 Connect Flags 的设置)
Protocol Name 是 UTF-8 编码的大写字符串 “MQTT”,这个字段的长度和内容在未来版本的MQTT协议中都不会再改变。
所以 3.1.1 版本的 Protocol Name 就是 “MQTT”,我们也不用去管旧版本的名字是什么了。
Connect flags 为一个byte,包含了一些关于客户端行为以及是否有 payload 段存在的标识:
Connect flags 中的字段 |
大小 |
含义 |
Username flag |
1bit |
表示用户名存在与否 |
Password flag |
1bit |
表示密码存在与否 |
Will retain |
1bit |
表示遗嘱是否保留 |
Will QoS |
2bit |
表示遗嘱的QOS等级 |
Will flag |
1bit |
表示遗嘱存在与否 |
Clean Session |
1bit |
表示是否为新链接 |
Connect flags的最高位保留,其他所有位都被当作bool值初始化(除了Will QoS),这些bool值在 payload 部分也有相应的字段。比如当 Username 和 Password 的值为1,那么在 payload 中会有 2byte 的 username length,紧随其后的就是 username 字符串,Password也是相同的道理。
为了说明这件事,假设我们收到了这样一个 CONNECT 包:
- Connect flags 中的 username 和 password 都置为1
- username = “hello”
- password = “nacho”
- client ID = “danzan”
那么这个数据包应该长这样:
字段 |
大小 |
偏移量 |
描述 |
Packet type + Falgs |
1 |
0 |
类型为Connect type 0x01 ,Flags为空 |
Length |
1 |
1 |
后续总长度32Byte,小于127,所以可以用1Byte表示 |
Protocol name length |
2 |
2 |
协议名长度,值固定为 0x04 |
Protocol name |
4 |
4 |
‘M’ ‘Q’ ‘T’ ‘T’ |
Protocol level |
1 |
8 |
对于MQTT 3.1.1 此字段值为 0x04 |
Connect flags |
1 |
9 |
包括 Username , password , will retain , will QoS , will flag , clean session |
Keepalive |
2 |
10 |
ushort,保活时间,单位秒,最大值65536(18小时12分15秒) |
Client ID length |
2 |
12 |
ushort, 此例中值为0x06 (danzan) |
Client ID |
6 |
14 |
‘d’ ‘a’ ‘n’ ‘z’ ‘a’ ‘n’ |
Username length |
2 |
20 |
ushort, 此例中值为0x05 (hello) |
Username |
5 |
22 |
‘h’ ‘e’ ‘l’ ‘l’ ‘o’ |
Password length |
2 |
27 |
ushort, 此例中值为0x05 (nacho) |
Password |
5 |
29 |
‘n’ ‘a’ ‘c’ ‘h’ ‘o’ |
例如因为 Will Flags 被置为0,所以我们不需要在 payload
中解析这个字段(也压根没有),上例中我们要解析的内容总共就是包括 Fixed Header 在内的 34个byte。
PUBLISH 解码实现
以下是 PUBLISH 包的结构:
仅当 QoS level > 0 时,存在 Packet identifier MSB 和 LSB。当 QoS 被设置为 at most once (值为0)时,没有必要存在 packet ID。
Payload部分的长度通过 Remaining Length 减去其他所有内容计算得来。
SUBSCRIBE 和 UNSUBSCRIBE 解码实现
SUBSCRIBE 包和 UNSUBSCRIBE 包的结构非常相似。他们的 payload 部分都是一个 topic 相关的元组列表,其中 SUBSCRIBE 的元组是 (topic_len, topic_filter, qos),而 UNSUBSCRIBE 是 (topic_len, topic_filter)。
ACK 解码实现
最终到了 ACK 包,MQTT 协议中没有设计通用 ACK,但是实际上每个 ACK 包的数据结构都是一样的,有一个 Fixed Header 和一个 packet_id组成。
MQTT 协议中有如下几种类型的ACK:
- PUBACK
- PUBREC
- PUBREL
- PUBCOMP
- UNSUBACK
MQTT包解码实现
现在我们已经实现了 unpack_mqtt_packet
需要的所有工具函数,接下来我们先定义一个解码函数的接口,然后使用一个静态数组来索引所有的解码函数,这里我们直接使用 Control Packet type
的值来作为数组中的索引。
需要注意的是,DISCONNECT
PINGREQ
PINGRESP
这三种包只有一个byte,所以我们不需要编写解码工具函数。
结尾
从零开始MQTT broker的第一部分就这样结束了,我们做了两个模块,一个根据 OASIS 定义的标准描述MQTT协议结构,另一个则用来处理编解码操作。
此时我们的文件结构是这样的: