原文 Sol - An MQTT broker from scratch. Refactoring & eventloop
更新日期: 2020-02-07
前言
在前面的六个部分中,我们探索了一些常见的 CS 主题,例如网络编程、数据结构,这段短暂的旅程的终点是得到了一个充满了BUG但是勉强可用的MQTT broker。
由于好奇心,我想测试一下我们的项目离真正的生产项目有多么接近,而且我想对项目进行一些重构,减少一些临时的代码,让项目的结构更加合理,同时关注项目的可移植性。
我不会把所有的重构过程都写到博客中,因为那会非常无聊,我只会突出一些最重要的部分,剩下的部分你可以直接把 master
分支合并到 tutorial
来查看,或者直接克隆 master
分支。
首先我按照有限度列出了需要优化的要点:
- 低层的 I/O 处理器,用以正确处理数据流读写
- 对 EPOLL 进行抽象,因为他是 Linux 独有功能,提供一些备选方案
- 管理加密消息,实现可用明文消息或加密消息的透明接口
- 正确处理客户端会话,实现类似
'+'
通配符之类的其他 MQTT 功能
备注:虽然我们自己做的哈希表运行的不错,但我还是决定选择使用久经沙场的 UTHASH
库。由于他只有一个头文件,集成进我们的项目也非常容易。他的项目文档在这里。
TCP分片问题
第一个也是最需要被检查的问题是网络通信,在本地进行负载测试时,我发现当负载量较大时程序开始丢包,或者说,内核缓冲区被淹没并开始对数据流进行分片。TCP 作为一个流协议,在处理数据中进行分片是无可厚非的,没有在一开始时就考虑这个问题显然是我比较幼稚,或者说因为我着急写一个可以运行的程序,忽略了底层细节。无论如何,这让程序产生了一些问题,例如解析错误的数据包,或者分片部分被当作数据包的第一个字节,识别成了各种不同的指令等等。
因此,最重要的修复之一是 server.c 模块中的 recv_packet
函数,特别是为每个客户端添加了类似状态机的行为,使其可以正确执行非阻塞读写,而不会阻塞线程。
我还将应用程序的核心部分,特别是 MQTT 抽象(例如客户端会话和主题)移到了 sol_internal.h
中。
因此,客户端结构现在更加健壮,它存储每个数据包读写的状态,以便在内核空间出现 EAGAIN
错误时恢复。
加密通讯
需要注意的是,recv_packet
和 write_data
是两个在 network.h 模块中定义的函数:
ssize_t send_data(struct connection *, const unsigned char *, size_t)
ssize_t recv_data(struct connection *, unsigned char *, size_t)
他们都需要使用 struct connection
作为第一个参数,后面两个参数就是常规的读/写 buffer 和读/写字节数。
这个连接结构直接针对了前言中需改进列表内的第三条(明文消息和加密消息的抽象),他是客户端链接的抽象实现,并且提供了管理通信所需的4个基本回调函数:
这个改进允许我们基于选择的类型创建每条链接,不论是普通链接还是TLS链接都使用相同的函数收发数据。
结构定义如下:
结构体中存储了 SSL *
和 SSL_CTX *
,当我们使用普通链接时他们会为 NULL
。
编解码与辅助函数
另一个有益的提升是修正了之前错误的编码和解码函数(感谢beej networking guide,这个教程真的很优秀)并且添加了一些工具函数用来处理整形和bytes的解码。
微型的事件循环:ev
在单线程环境中抽象主机提供的多路复用API并不是一件困难的事,本质上就是提供一个数据结构,用来持有一组自定义事件。头文件里描述的很清楚,最重要的部分是我们对事件类型的枚举(enum ev_type
),自定义事件(struct ev
)和持有自定义事件的数组(events_monitored
)。这些构成了我们的事件封装(ev_ctx
)。
ev_ctx
中使用不透明的 void *
指针可以让我们引用系统提供的任何底层 API,无论是 EPOLL
、SELECT
还是 KQUEUE
。
在服务器初始化时,ev_ctx
会被注册一些基本的周期性事件和服务端口的 on_accpet
事件。之后我们的程序就由事件循环不停驱动,比如当客户端链接建立后,我们会对输入的数据进行监听,触发 read_callback
,收到完整的数据包并处理后,决定是否要发送回复。
这是一个连接客户端的生命周期,我们有一个 accept
回调函数,他将接入的链接放入事件循环中,并且开启读取监听:
当然,启动的服务器必须进行阻塞调用以启动事件循环,我们也需要一个停止机制。得益于 ev_stop API,添加一个额外的事件例程来在我们想要停止运行的循环时调用变得非常简单。
现在我们的服务器会使用一个阻塞的循环来提供服务,但是我们也需要一个停止机制。感谢 ev_stop
接口,他这让我们可以简单的停止循环。
最终,我们的 start_server
函数,作为程序的入口,他会监听一个端口,并打开事件循环来提供服务。
正如你看到的,这里有一个用于创建客户端池的 memorypool_new
,我们预先分配了一定数量的客户端,并且在断开链接时回收他们。只要我们的客户端内容是懒加载的,特别是他们的读写buffer(可能是 MB 级别)是懒加载的,那么我们这个客户端池就相当划算。
当然,这只是整个过程的一小部分,但最终我做出了一个相当不错的原型。下一步将是进行一些压力测试,看看它与 Mosquitto 或 Mosca 这些久经考验且无可争议的优秀软件相比如何。我们仍然缺少许多功能,例如用于存储会话的持久层,但先贼发布/订阅部分应该是可测试的。希望这个教程可以作为更整洁和精心设计的项目的起点。再见!