原文 Sol - An MQTT broker from scratch. Part 2 - Networking
前言
让我们继续之前的工作,在第一部分中我们实现了 MQTT v3.1.1 的数据结构和解码函数,接下来我们需要做一些组包和编码函数,让我们可以发送网络包。
顺带说明一下,我们并没有打算去编写完美的或者内存效率很高的代码,而且,过早的优化是万恶之源,以后我们有的是时间来提高我们的代码质量。
组包实现
暂时我们只需要做 CONNACK
SUBACK
PUBLISH
包的组包工作,其他的各种 ACK
的结构都是一样的,之前我们已经用 typedef 让这些 ACK
引用了同一个函数。
union mqtt_header *mqtt_packet_header(unsigned char)
函数用来处理 Fixed Header,以及以下这些只有 Fixed Header 的包:
- PINGREQ
- PINGRESP
- DISCONNECT
struct mqtt_ack *mqtt_packet_ack(unsigned char, unsigned short)
用来处理以下这些 类ACK
的包:
- PUBACK
- PUBREC
- PUBREL
- PUBCOMP
- UNSUBACK
其余的包都需要专门的函数来组包。再说一次,虽然可能有很多更优雅的代码或者更优化的方法,但是现在我们只要写能用的代码就行了,以后迟早会优化的。
编码实现
我们接下来处理编码函数,编码函数其实就是解码函数的反方向操作:我们使用内存对象创造一个字节流,之后可以通过socket发出去。
现在我们有一些函数返回指向 static struct
的指针(例如上方代码中的 mqtt_packet_header
),在单线程的情况下这是没什么问题的。 在多线程环境下,一定会出问题,每次这种函数的返回都会指向同一片内存区域,可能导致各种冲突。因此为了将来的改进,需要重构这些部分,使用 malloc
来为每次返回分配地址。
我们采用和之前解码函数一样的方式来映射编码函数。做一个静态数组,其中的序号恰好等于包类型。
socket 封装
我们计划创建一个单线程 TCP 服务器,使用 epoll 接口实现多路 I/O。Epoll 是继 select 和 poll 之后内核 2.5.44 添加的最新的多路复用机制,也是性能最高、连接数最多的多路复用机制,它在 BSD 和 BSD-like (Mac OSX) 系统中的对应机制是 kqueue。
我们需要定义一些函数来管理我们的socket descriptor。
我们定义了一些简单的辅助函数,用来创建和绑定 socket
端口,处理新链接并把 socket
设置为 non-blocking
模式(这样才能发挥 epoll 的复用能力)。
我不喜欢必须处理每个进出服务器的字节,在我写的涉及到TCP通信的程序中,我都会定义这两个函数:
ssize_t send_bytes(int, const unsigned char *, size_t)
用于在while循环中持续发送数据,直到把数据全部发送完。正确捕获 EAGAIN
或 EWOUDLBLOCK
异常。
ssize_t recv_bytes(int, unsigned char *, size_t)
在while循环中获得任意长度的数据。正确捕获 EAGAIN
或 EWOUDLBLOCK
异常。
socket 封装实现
接下来是 network.c
的实现。
epoll 封装
为了让 epoll API能够更加简单易用。我对 epoll 进行了一些的封装,让我们就可以通过注册回调函数的方式来响应事件。
网络上有很多使用 epoll 的示例,大部分都是描述基本用法:注册一个 socket 并启动一个循环来监听事件,每当 socket 需要被读写时,调用一个函数来使用它们。这些例子当然简单好用,但是并没有告诉我们如何通过回调的方式使用 epoll。经过思考后,我发现可以使用 epoll_event
自带的 epoll_data
来解决这个问题:
正如你看到的,epoll_data
中有一个 void *
,一个常常用来保存fd的 int
,还有两个大小不同的 uint
。我计划做一个自定义事件结构体,其中包括了fd、一些自定义数据和最关键的回调函数指针。然后我们可以把自定义事件结构体绑定到 epoll_data
的 void *
中,如此一来,每当事件发生时,我们都可以通过 epoll_data
获得所有我们需要的东西。
我想要定义两种类型的回调,一种是事件触发的回调,另一种是间隔触发的周期性回调。我们需要把 epoll 封装到一个自定义结构里,来实现这两种回调。对于这两种回调的处理,我们则会采用完全相同的方式:获得 epoll_data
,在其中获得所有我们所需的数据和需要执行的回调函数。
接收数据包并使用 epoll_wait 处理的顺序图
我们需要定义两种结构体和一种函数指针
- struct evloop 封装 epoll 实例的结构体,添加了各种参数用来实现我们的业务设计
- struct closure 上文中提到的自定义事件结构体,封装了各种事件参数和回调函数的指针
- **void callback(struct evloop , void ) 回调函数的接口,在 closure 里真正被执行的函数的接口
另外,我们需要在 .c 文件中实现一些对 evloop
的创建、删除和管理功能。
epoll 封装实现
在头文件中定义了我们网络所需的各种工具函数后,接下来我们开始进行函数实现。
让我们先从最简单的开始,evloop
实例的创建、初始化和删除。他包括了这些内容:
epoll
的 fd
即 epollfd
- 单次处理的最大事件数量
- 一个毫秒单位的超时时间
- loop是否正在运行的状态标识
- 动态大小的周期性任务数组
接着,我们需要实现三个包装 epoll
API的函数,用来创建、修改和删除 epoll
对 fd
的监听。我们封装函数的目的是为所有的 epoll
监听都添加 EPOLLET
和 EPOLLONESHOT
标识。EPOLLET
标识可以让 epoll
工作在边沿触发
模式,EPOLLONESHOT
标识则可以确保 epoll
对某个事件触发仅产生一次(然后我们通过手动重置的方式让其可以继续响应)。
这样的设置可以避免未来我们在使用多线程架构时,一次事件的传入会唤醒所有等待中的线程,这被称为惊群效应
(thundering herd problem),不过这些都是后话,暂时可以不用深究。
这里有两件事需要注意:
第一,如前所述,epoll_event
中包括了一个 union epoll_data
,其中可以保存一个 fd
或 一个 void *
。我们选择了使用后者,传入了我们的 closure
,这其中包含了更多有用的信息,也包括 fd
在内。
第二,刚才我们定义的添加和修改函数的第三个参数,可以接收一组事件,一般而言是 EPOLLIN
或 EPOLLOUT
。同时我们添加了 EPOLLONESHOT
标识,这意味着当事件触发一次后就不会再次触发,除非我们手动重置该事件。这样做是为了保持对低级事件触发的某种程度的控制,并为将来的多线程实现留出空间。这篇文档精彩地阐述了 epoll
这种设计的好处,以及为什么最好使用 EPOLLONESHOT
标志。
epoll 循环实现
我们继续实现我们的封装,接下来是一些回调函数的注册、周期回调的注册以及主循环。
在我们之前的所有代码中,evloop_wait
是最有意思的,他启动一个循环不停监视 epoll_wait
,执行错误检查,区分本次触发是周期性的自动触发或是读写触发,然后执行我们设置的回调函数。
结尾
我们的代码越写越多,这次我们又添加了一个模块。
此时我们的文件结构是这样的: