原文 Sol - An MQTT broker from scratch. Part 3 - Server
前言
这一部分我们会实现我们程序中的服务功能,通过之前在 part-2 中实现的 network
模块,我们可以比较轻松的接收并处理在 part-1 中定义好的各种 MQTT
数据包。
服务端定义
我们的头文件非常简单,唯一向外提供的函数只有 start_server
,他也只需要接收两个参数:
我们还需要定义创建 epoll 时使用的两个常量,一个是单次监听的最大事件数量,另一个是 epoll 监听的超时时间。这两个常量的定义以后我们也可以轻松的移动到配置模块里,暂时就先放在 server
的头文件。
服务端实现
实现的部分比我一开始预想的要庞大一些,所有我们所需的 处理器(handler)
和 回调函数都会在这里定义。所以我们首先来实现三个最基础的回调函数,这三个函数是任何服务器都必不可少的:
- 用于建立连接的
on_accept
- 用于读取事件的
on_read
- 用于发送数据的
on_write
我们还需要定义一些关于MQTT包处理的 handler
,同样使用一个数组保存,并且使 handler
在其中的序号等于包类型码。(这个方式我们已经用过好几次了)
正如你所见,我定义了两个静态函数(在C语言中,当我们不严格的追究术语时,由于这种静态函数只能被同样.c文件里的函数访问,我们可以把这种函数看作是其他OOP语言中的私有方法。)
accept_new_client
函数使用了上一篇文中 network
模块定义的 accept_connection
函数,得以从操作系统层级接收新连接并进行一些设置。on_accept
则是实际负责处理新链接的回调函数,他依赖 accept_new_client
函数。
accept_new_client
函数所需的参数结构 connection
是我从我其他项目的代码库复制过来的,并不是说必须要用这种方式。
我们又添加了三个静态函数,recv_packet
函数就像他的名字一样,依赖 mqtt
模块,负责持续接收数据流直到足够一个完整的 MQTT 包。另外两个分别是 on_read
和 on_write
。
请注意,on_read
和 on_write
使用我们之前定义的函数不停的重置对 socket
的监听,就像来回打乒乓球一样。例如, on_read
可以通过 处理器
的返回值来决定下一次的操作是 read
还是 write
,然后把客户端链接的下一个回调函数设置为 on_read
或者 on_write
,当然也有可能是断开链接。比如说客户端发来的数据出现了错误,或者当客户端发来了 DISCONNECT
包,那么此时对应的 处理器
返回的值就既不是 REARM_W
也不是 REARM_R
。
在 on_write
中我们看到 send_bytes
传入了一个带有大小和内容的 payload
,这里使用了我定义的一个方便的工具结构 bytestring
,我们现在就在 src/pack.h
and src/pack.c
中添加他。
工具 bytestring
这里是关于 bytestring
的实现。
日志和通用工具
让我们稍微打断一下主线,按照我的经验,到这个阶段我们往往会需要一些工具函数,我一般会把他们统一放在 util
包中。我们刚才已经看到了一些 sol_info
, sol_debug
或者 sol_error
这样的函数,其实就是 util
包中的定义。
我们的日志需求很简单,所以不需要专门做一个日志模块,就先放到 util
包里。
log函数设置了一些宏定义,方便我们使用不同级别的日志。我们还做了一个 STREQ
用来比较两个字符串是否相等。
这些简单的函数足以支撑我们的日志系统,如果在启动时调用 sol_log_init
我们还能将日志存入日志文件。
服务入口实现
终于我们要开始写 start_server
函数了,这个函数会调用所有我们之前写过的内容。他将作为程序的入口点,完成各种设置和全局实例的初始化,然后等待着客户端链接。
定时通报服务器状态
好的,我们现在有了一个(几乎)功能齐全的服务器,它使用我们的回调系统来处理流量。 接下来我们需要在头文件上添加一些代码,例如我们刚才使用的 info
结构体,还有全局的名为 sol
的实例,这些我们都还没有定义。
这是刚才的 start_server
函数中我们添加的一个周期性任务。
publish_stats
函数会每隔 conf->stats_pub_interval
秒被调用一次, conf->stats_pub_interval
是一个全局的配置值,配置相关的内容我们稍后会去实现。
现在,让我们先实现这个回调函数:
我们已经注册了我们第一个周期性回调,他会定时的发送 sys_topics
数组中主题的消息,
下面是一些我们需要的全局实例:
结尾
我们还需要补充一些代码,才能使我们上面的代码能够运行。比如,struct sol
的定义、closure_destructor
函数,哈希表的定义,比如 topic
的存储和解析方法。这一切我们都需要去完成。
在下一部分我们会编写处理各种MQTT数据包的 处理器
,根据数据包的类型和内容不同,服务器会表现出不同的行为。