动机
目前ChatGPT可以是AI领域出圈最成功的工作之一了,同时GitHub上也涌现了各种将ChatGPT接入即时通讯软件(IM)的仓库,比如wechat-chatgpt。本人也尝试将ChatGPT引入微信,但考虑到未越狱的iOS不能微信多开,且频繁的异常操作可能有封号的风险,因此作罢。
不过在学习这些接入IM的模块往往会使用事件触发的思想构建代码,虽然此前并未接触过这种实现,不过这种思想十分简洁且易于维护,因此本文就来了解一下事件触发的实现原理。
事件触发的特点
首先IM类软件的特点是“有消息才有反应”,比如Alice向Bob发送信息,那么B的客户端只有当A的消息到达时才会提醒B。
阻塞
while
循环一定是最容易想到的,但显然循环体内不能粗暴的time.sleep
,不然如果Alice在程序sleep时发了拼手气红包,Bob就是大怨种:(。反之,如果让程序不加暂停地查看当前是否有消息,那么Bob在等到红包之前手机可能就没电了。所以,不使用sleep实现程序的暂停是非常关键的,而线程/进程“阻塞”就是理想的方式。
具体到实现上,我们可以用标准库中的Queue
缓存消息,并利用队列自带的线程阻塞功能实现这一点:当队列中没有元素时,调用get
会阻塞当前线程。
多线程
由于当前线程一旦阻塞就需要另一个线程来帮忙往队列中添加元素,所以控制程序一定有至少两个线程:
- 主线程负责向队列中添加消息;
- 子线程负责在后台处理消息。
事件注册
再次回到IM的例子上。假设Bob现在想用控制程序操控自己的IM客户端,而客户端一天可能会收到各式各样的消息,这些消息可以是文字信息、语音信息或图片信息等,并且消息来源又可以是好友、群聊或者公众号……显然不同类别的消息需要用不同的处理方式,比如来自群聊的消息可以选择性忽略,但来自Alice的红包不能错过。
此外,IM客户端还有“登陆成功”、“登陆失败”、“退出登录”和“运行出错”等状态信息。简单起见,可以将上述繁杂的信息称作“事件(event)”。上述如此繁杂的事件难不倒程序员,但会给程序员的头发带来巨大危机。比如下面的实现就非常危险:
1 2 3 4 5 6 7 8
| for event in Q: if event.type == "contract-text": pass elif event.type == "contract-image": pass elif event.type == "contract-voice": pass ...
|
虽说最终的实现绕不开“判断类型->对应的方法处理消息”这一逻辑,但显然长串的if-else给后期维护带来了麻烦,并且每个事件可能会有多种处理函数,上述写法终究不太美观。但说到底,if-else无非是根据消息类型选择合适的处理分支,而这可以用dict
来实现。
可以定义一个字典类型的handlers
,并向其注册针对不同事件名的各种处理函数。
1 2 3 4 5 6 7 8 9
| from collections import defaultdict from typing import Callable, Dict, List
handlers: Dict[str, List[Callable]] = defaultdict(list)
handlers["contract-text"].append(handler_for_contract_text) handlers["contract-image"].append(handler_for_contract_image) handlers["group-text"].append(handler_for_contract_image) ...
|
如此一来,每当一个事件到来时,分支选择和执行代码就变得优雅起来了。
1 2 3 4 5 6
| event.type: str event.args: dict
for handler in handlers[event.type]: handler(**event.args)
|
代码实现
事件管理器
将上述阻塞
、多线程
和事件注册
三种元素组合起来,并封装成一个类,就可以获得如下”事件管理器“实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import collections import queue import threading from typing import Callable
_Event = collections.namedtuple("Event", ("event", "args"))
class EventManager: def __init__(self): self._handlers = collections.defaultdict(list) self._running = False self._queue = queue.Queue() self._listener = threading.Thread(target=self._listening)
def start(self): self._running = True self._listener.start()
def stop(self): self._running = False self._listener.join()
def _listening(self): while self._running: try: event: _Event = self._queue.get(block=True, timeout=1) for handler in self._handlers[event.event]: handler(**event.args) except queue.Empty: pass
def on(self, event: str, callback: Callable): self._handlers[event].append(callback) return self
def send_event(self, event: str, **kwargs): self._queue.put(_Event(event, kwargs))
|
另外一个技巧就是将EventManager.on
函数的返回值设置为self
,这样就可以实现连续注册。
使用方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| event_manager = EventManager()
event_manager.on( "contract-text", handler_for_contract_text1 ).on( "contract-text", handler_for_contract_text2 ).on( "contract-image", handler_for_contract_image ).on( "contract-voice", handler_for_contract_voice ).on( "group-text", handler_for_group_text )
event_manager.start()
...
event_manager.send_event("contract_text", name="xxx", text="恭喜发财") event_manager.send_event("group_text", name="YYY", text="大家新年好!")
|