这是一个可以自由切换聊天对象,也能传输文件,还有伪云端聊天信息暂存功能的聊天程序!( ̄▽ ̄)”
去年暑假的小学期的电子设计课上我用STC与电脑相互通信,制作出了一个Rader项目(该项目的完整代码在我的GitHub上 )。这个项目地大致思想是:下位机(STC)使用步进电机带动超声波模块采集四周的距离,然后用485串行总线上传到上位机(电脑),上位机将这些数据收集并绘制略丑的雷达图。由于上下位机处理数据的速度不一致,容易导致不同步的现象。当时为了解决这个问题,用了一个简单的方法,现在发现这个方法和“停等协议”十分相似。
这学期的计网实验要求基于socket传输数据,相比于在485总线上实现停等协议,socket还是很简单的。
Naive版聊天程序 最简单的socket通信程序只需要两个进程就可以跑起来了,一个作为服务端,另一个作为客户端,然后两者之间传输数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import socketfrom socket import AF_INET, SOCK_STREAMserverSocket = socket.socket(AF_INET, SOCK_STREAM) srv_addr = ("127.0.0.1" , 8888 ) serverSocket.bind(srv_addr) serverSocket.listen() print ("[Server INFO] listening..." )while True : conn, cli_addr = serverSocket.accept() print ("[Server INFO] connection from {}" .format (cli_addr)) message = conn.recv(1024 ) conn.send(message.upper()) conn.close()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import socketfrom socket import AF_INET, SOCK_STREAMclientSocket = socket.socket(AF_INET, SOCK_STREAM) srv_addr = ("127.0.0.1" , 8888 ) print ("[Client INFO] connect to {}" .format (srv_addr))clientSocket.connect(srv_addr) message = bytes (input ("Input lowercase message> " ), encoding="utf-8" ) clientSocket.send(message) modifiedMessage = clientSocket.recv(1024 ).decode("utf-8" ) print ("[Client INFO] recv: '{}'" .format (modifiedMessage))clientSocket.close()
多用户版 上面这种模式是十分naiive的。比如为了切换用户(假设不同用户在不同的进程上),就只能先kill原先的进程,然后修改代码中的IP和Port,最后花了1分钟时间才能开始聊天。而且这种方式最大的缺陷是只有知道了对方的IP和Port之后才能开始聊天。
为了解决Naiive版聊天程序的缺点,可以构建如下C/S拓扑结构。
这个拓扑的结构的核心在于中央的Server,所有Client的连接信息都会被保存在Server上,Server负责将某个Client的聊天信息转发给目标Client。
数据格式设计 像TCP协议需要报文一样,这个简单聊天程序的信息转发也需要Server识别每一条信息的目的,才能准确转发信息。这就需要设计协议报文的结构(显然这是在应用层上的实现)。由于应用场景简单,我是用的协议结构如下:
1 sender|receiver|timestamp|msg
这是一个四元组,每个元素用管道符|
分割。具体来说每个Client(客户进程)发送数据给Server之前都会在msg
之前附加:发送方标识sender
、接受方标识receiver
以及本地时间戳timestamp
。对应的代码端如下:
1 info = "{}|{}|{}|{}" .format (self.UserID, targetID, timestamp, msg)
这样Server接收到报文之后就能“正确”转发消息了。
这里的“正确”被加上了引号,这是为什么?因为在我设计该乞丐版协议的时候简化场景中只存在唯一用户ID的场景,如果有个叫“Randool”的用户正在和其他用户聊天,这个时候另一个“Randool”登陆了聊天程序,那么前者将不能接收信息(除非再次登录)。不过简单场景下还是可以使用的。
解决方法可以是在Client登录Server时添加验证的步骤,让重复用户名无法通过验证。
消息队列 该聊天程序使用的传输层协议是TCP,这是可靠的传输协议,但聊天程序并不能保证双方一定在线吧,聊天一方在任何时候都可以退出聊天。但是一个健壮的聊天程序不能让信息有所丢失,由于传输层已经不能确保信息一定送达,那么只能寄希望于应用层。
由于消息是通过Server转发的,那么只要在Server上为每一个Client维护一个消息队列即可。数据结构如下:
1 2 MsgQ = {} Q = MsgQ[UserID]
使用这种数据结构就可以模拟云端聊天记录暂存的功能了!
文件传输 文件传输本质上就是传输消息,只不过文件传输的内容不是直接显示在屏幕上罢了。相比于纯聊天记录的传输,文件传输需要多附加上文件名,
base64编码传输 普通的聊天信息中不会出现管道符,但是代码和字符表情就不一定了∑( 口 ||
,如果信息中出现了管道符就会导致协议解析失效,因此需要一种方法将msg
中的|
隐藏掉。思路是转义,但是这个需要手工重写协议解析代码,不够美观。由于之前了解过信息安全中的相关知识,还记得有一种编码方式是base64,由于base64编码结果不会出现管道符,那么问题就简单了,只需要用base64将传输信息重新编码一番。并且这是一种“即插即用”的方式,只要自定义base64的编码解码函数,然后嵌套在待发送msg
的外面即可。
1 2 3 4 import base64b64decode = lambda x: base64.b64decode(x.encode()).decode() b64encode = lambda x: base64.b64encode(x.encode()).decode()
将发送信息改写为如下形式:
1 info = "{}|{}|{}|{}||" .format (self.UserID, targetID, timestamp, b64encode(msg))
终端高亮显示 朴素的文字打印在屏幕上难以区分主次,用户体验极差,因此可以使用终端高亮的方法凸显重要信息。在网上查到了一种高亮的方式,但是仅限于Linux系统。其高亮显示的格式如下:
\033[显示方式;前景色;背景色mXXXXXXXX\033[0m
中间的XXXXXXXX
就是需要显示的文字部分了。显示方式,前景色,背景色是可选参数,可以只写其中的某一个;另外由于表示三个参数不同含义的数值都是唯一的没有重复的,所以三个参数的书写先后顺序没有固定要求,系统都能识别;但是,建议按照默认的格式规范书写。
这个部分参考了Python学习-终端字体高亮显示 ,因此对于参数的配置方面不再多说
效果
有多种终端分屏插件,这里推荐tmux,上面的分屏效果使用的就是tmux
代码实现 服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 import queueimport socketimport timeimport _threadhostname = socket.gethostname() port = 12345 """ The info stored in the queue should be like this: "sender|receiver|timestamp|msg" and all item is str. """ MsgQ = {} def Sender (sock, UserID ): """ Fetch 'info' from queue send to UserID. """ Q = MsgQ[UserID] try : while True : info = Q.get() sock.send(info.encode()) except Exception as e: print (e) sock.close() _thread.exit_thread() def Receiver (sock ): """ Receive 'msg' from UserID and store 'info' into queue. """ try : while True : info = sock.recv(1024 ).decode() print (info) info_unpack = info.split("|" ) receiver = info_unpack[1 ] exit_cmd = receiver == "SEVER" and info_unpack[3 ] == "EXIT" assert not exit_cmd, "{} exit" .format (info_unpack[0 ]) if receiver not in MsgQ: MsgQ[receiver] = queue.Queue() MsgQ[receiver].put(info) except Exception as e: print (e) sock.close() _thread.exit_thread() class Server : def __init__ (self ): self.Sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.Sock.bind((hostname, port)) self.Sock.listen() def run (self ): print ("\033[35;40m[ Server is running ]\033[0m" ) while True : sock, _ = self.Sock.accept() UserID = sock.recv(1024 ).decode() print ("Connect to {}" .format (UserID)) if UserID not in MsgQ: MsgQ[UserID] = queue.Queue() _thread.start_new_thread(Sender, (sock, UserID)) _thread.start_new_thread(Receiver, (sock,)) def close (self ): self.Sock.close() if __name__ == "__main__" : server = Server() try : server.run() except KeyboardInterrupt as e: server.close() print ("Server exited" )
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 import socketimport sys, osimport timeimport base64import _threadfrom SktSrv import hostname, portb64decode = lambda x: base64.b64decode(x.encode()).decode() b64encode = lambda x: base64.b64encode(x.encode()).decode() def Receiver (sock ): from_id = "" fr = None while True : info = sock.recv(1024 ).decode() info_unpacks = info.split("||" )[:-1 ] for info_unpack in info_unpacks: sender, _, timestamp, msg = info_unpack.split("|" ) msg = b64decode(msg) if from_id != sender: from_id = sender print ("==== {} ====" .format (sender)) if msg[:5 ] == "@FILE" : if msg[:10 ] == "@FILENAME:" : print ("++Recvive {}" .format (msg[9 :])) fr = open (msg[10 :]+".txt" , "w" ) elif msg[:9 ] == "@FILEEND:" : fr.close() print ("++Recvive finish" ) elif msg[:6 ] == "@FILE:" : fr.write(msg[6 :]) continue show = "{}\t{}" .format (timestamp, msg) print ("\033[1;36;40m{}\033[0m" .format (show)) class Client : def __init__ (self, UserID: str =None ): if UserID is not None : self.UserID = UserID else : self.UserID = input ("login with userID >> " ) self.Sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_addr = (hostname, port) def Sender (self ): """ Send info: "sender|receiver|timestamp|msg" Change to name: '@switch:name' Trans file: '@trans:filename' """ targetID = input ("Chat with > " ) while True : msg = input () if not len (msg): continue lt = time.localtime() timestamp = "{}:{}:{}" .format (lt.tm_hour, lt.tm_min, lt.tm_sec) if msg == "@exit" : print ("Bye~" ) return elif msg == "@help" : continue elif msg[:8 ] == "@switch:" : targetID = msg.split(":" )[1 ] print ("++Switch to {}" .format (targetID)) continue elif msg[:7 ] == "@trans:" : filename = msg.split(":" )[1 ] if not os.path.exists(filename): print ("!!{} no found" .format (filename)) continue print ("++Transfer {} to {}" .format (filename, targetID)) head = "{}|{}|{}|{}||" .format (self.UserID, targetID, timestamp, b64encode("@FILENAME:" +filename)) self.Sock.send(head.encode()) with open (filename, "r" ) as fp: while True : chunk = fp.read(512 ) if not chunk: break chunk = "{}|{}|{}|{}||" .format (self.UserID, targetID, timestamp, b64encode("@FILE:" +chunk)) self.Sock.send(chunk.encode()) tail = "{}|{}|{}|{}||" .format (self.UserID, targetID, timestamp, b64encode("@FILEEND:" +filename)) self.Sock.send(tail.encode()) print ("++Done." ) continue info = "{}|{}|{}|{}||" .format (self.UserID, targetID, timestamp, b64encode(msg)) self.Sock.send(info.encode()) def run (self ): try : self.Sock.connect(self.server_addr) print ("\033[35;40m[ Client is running ]\033[0m" ) self.Sock.send(self.UserID.encode()) _thread.start_new_thread(Receiver, (self.Sock,)) self.Sender() except BrokenPipeError: print ("\033[1;31;40mMissing connection\033[0m" ) finally : print ("\033[1;33;40mYou are offline.\033[0m" ) self.exit_client() self.Sock.close() def exit_client (self ): bye = "{}|{}|{}|{}" .format (self.UserID, "SEVER" , "" , "EXIT" ) self.Sock.send(bye.encode()) if __name__ == "__main__" : client = Client() client.run()
P2P版 上面的多用户版聊天程序虽然可以实现灵活的用户切换聊天功能,但是实际上由于所有的数据都会以服务器为中转站,会对服务器造成较大的压力。更加灵活的结构是使用P2P的方式,数据只在Client间传输。应该是将服务器视为类似DNS服务器的角色,只维护一个Name <--> (IP,Port)
的查询表,而将连接信息转移到Client上。
存在的问题 P2P版本的聊天程序并不只是实现上述的功能就可以了,考虑到前边“消息队列”中实现的功能:在用户退出后,聊天信息需要能保存在一个可靠的地方。既然聊天双方都存在退出的可能,那么在这个场景下这个“可靠的地方”就是服务器了。这也就是说P2P版本的Client除了建立与其他Client之间的TCP连接,还需要一直保持和Server的连接!
注意这一点,之前是为了减轻Server的压力,减少连接的数量才使用P2P的模式的,但是在该模式为了实现“消息队列”的功能却还是需要Server保存连接。
改进方式 如果要进一步改善,可以按照下面的方式:
Client C1登录时与Server建立连接,Server验证其登录合法性,然后断开连接。
C1选择聊天对象C2,C2的IP等信息需要从Server中获取,因此C1再次建立与Server的连接,完成信息获取后,断开连接。
C1与C2的正常聊天信息不通过Server,而是真正的P2P传输。
聊天一方意外断开后(假设为C2),C1传输的信息无法到达,并且C1可以感知到信息无法到达;这个时候C1再次建立与Server的连接,将未能送达的信息保存到Server上的“消息队列”。
补充一点:在步骤2中,如果C2未上线或C2意外断开,由于Server并不能及时知道Client的信息,因此需要“心跳包机制”,Client登录后定时向Server发送alive
信息,Server收到信息后维持或更新信息。
这样Server从始至终没有一直维持着连接,连接数量是动态变化的,在查询并发量较小的情况下对服务器资源的利用率是很小的。
进一步可以思考什么? 如果有多个Server,如何规划Server之间的拓扑?比如Fat-Tree之类的…