事件触发的代码原理

动机

目前ChatGPT可以是AI领域出圈最成功的工作之一了,同时GitHub上也涌现了各种将ChatGPT接入即时通讯软件(IM)的仓库,比如wechat-chatgpt。本人也尝试将ChatGPT引入微信,但考虑到未越狱的iOS不能微信多开,且频繁的异常操作可能有封号的风险,因此作罢。

不过在学习这些接入IM的模块往往会使用事件触发的思想构建代码,虽然此前并未接触过这种实现,不过这种思想十分简洁且易于维护,因此本文就来了解一下事件触发的实现原理。

事件触发的特点

首先IM类软件的特点是“有消息才有反应”,比如Alice向Bob发送信息,那么B的客户端只有当A的消息到达时才会提醒B。

阻塞

while循环一定是最容易想到的,但显然循环体内不能粗暴的time.sleep,不然如果Alice在程序sleep时发了拼手气红包,Bob就是大怨种:(。反之,如果让程序不加暂停地查看当前是否有消息,那么Bob在等到红包之前手机可能就没电了。所以,不使用sleep实现程序的暂停是非常关键的,而线程/进程“阻塞”就是理想的方式。

具体到实现上,我们可以用标准库中的Queue缓存消息,并利用队列自带的线程阻塞功能实现这一点:当队列中没有元素时,调用get会阻塞当前线程。

多线程

由于当前线程一旦阻塞就需要另一个线程来帮忙往队列中添加元素,所以控制程序一定有至少两个线程:

  • 主线程负责向队列中添加消息;
  • 子线程负责在后台处理消息。

事件注册

再次回到IM的例子上。假设Bob现在想用控制程序操控自己的IM客户端,而客户端一天可能会收到各式各样的消息,这些消息可以是文字信息、语音信息或图片信息等,并且消息来源又可以是好友、群聊或者公众号……显然不同类别的消息需要用不同的处理方式,比如来自群聊的消息可以选择性忽略,但来自Alice的红包不能错过。

此外,IM客户端还有“登陆成功”、“登陆失败”、“退出登录”和“运行出错”等状态信息。简单起见,可以将上述繁杂的信息称作“事件(event)”。上述如此繁杂的事件难不倒程序员,但会给程序员的头发带来巨大危机。比如下面的实现就非常危险:

1
2
3
4
5
6
7
8
for event in Q:
if event.type == "contract-text":
pass
elif event.type == "contract-image":
pass
elif event.type == "contract-voice":
pass
...

虽说最终的实现绕不开“判断类型->对应的方法处理消息”这一逻辑,但显然长串的if-else给后期维护带来了麻烦,并且每个事件可能会有多种处理函数,上述写法终究不太美观。但说到底,if-else无非是根据消息类型选择合适的处理分支,而这可以用dict来实现。

可以定义一个字典类型的handlers,并向其注册针对不同事件名的各种处理函数。

1
2
3
4
5
6
7
8
9
from collections import defaultdict
from typing import Callable, Dict, List

handlers: Dict[str, List[Callable]] = defaultdict(list)

handlers["contract-text"].append(handler_for_contract_text)
handlers["contract-image"].append(handler_for_contract_image)
handlers["group-text"].append(handler_for_contract_image)
...

如此一来,每当一个事件到来时,分支选择和执行代码就变得优雅起来了。

1
2
3
4
5
6
event.type: str
event.args: dict

# 选择分支并处理消息
for handler in handlers[event.type]:
handler(**event.args)

代码实现

事件管理器

将上述阻塞多线程事件注册三种元素组合起来,并封装成一个类,就可以获得如下”事件管理器“实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import collections
import queue
import threading
from typing import Callable

_Event = collections.namedtuple("Event", ("event", "args"))


class EventManager:
def __init__(self):
self._handlers = collections.defaultdict(list)
self._running = False
self._queue = queue.Queue()
self._listener = threading.Thread(target=self._listening)

def start(self):
self._running = True
self._listener.start()

def stop(self):
self._running = False
self._listener.join() # 等待子线程结束

def _listening(self):
while self._running:
try:
event: _Event = self._queue.get(block=True, timeout=1)
for handler in self._handlers[event.event]:
handler(**event.args)
except queue.Empty:
pass

def on(self, event: str, callback: Callable):
self._handlers[event].append(callback)
return self

def send_event(self, event: str, **kwargs):
self._queue.put(_Event(event, kwargs))

另外一个技巧就是将EventManager.on函数的返回值设置为self,这样就可以实现连续注册。

使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 创建实例
event_manager = EventManager()

# 注册事件处理函数
event_manager.on(
"contract-text", handler_for_contract_text1
).on(
"contract-text", handler_for_contract_text2
).on(
"contract-image", handler_for_contract_image
).on(
"contract-voice", handler_for_contract_voice
).on(
"group-text", handler_for_group_text
)

event_manager.start()

...

event_manager.send_event("contract_text", name="xxx", text="恭喜发财")
event_manager.send_event("group_text", name="YYY", text="大家新年好!")