97色伦色在线综合视频,无玛专区,18videosex性欧美黑色,日韩黄色电影免费在线观看,国产精品伦理一区二区三区,在线视频欧美日韩,亚洲欧美在线中文字幕不卡

購(gòu)物網(wǎng)站建設(shè)運(yùn)營(yíng)需求云南網(wǎng)站設(shè)計(jì)多少錢(qián)

鶴壁市浩天電氣有限公司 2026/01/24 09:06:44
購(gòu)物網(wǎng)站建設(shè)運(yùn)營(yíng)需求,云南網(wǎng)站設(shè)計(jì)多少錢(qián),室內(nèi)設(shè)計(jì)好學(xué)嗎,wordpress讀取文章樣式大家好#xff0c;我是jobleap.cn的小九。 Python 的 websockets 庫(kù)是基于 asyncio 實(shí)現(xiàn)的異步 WebSocket 客戶(hù)端/服務(wù)端框架#xff0c;遵循 RFC 6455 標(biāo)準(zhǔn)#xff0c;支持全雙工通信、心跳檢測(cè)、認(rèn)證等核心特性#xff0c;是構(gòu)建實(shí)時(shí)通信場(chǎng)景#xff08;如聊天室、實(shí)時(shí)監(jiān)…大家好我是jobleap.cn的小九。Python 的websockets庫(kù)是基于asyncio實(shí)現(xiàn)的異步 WebSocket 客戶(hù)端/服務(wù)端框架遵循 RFC 6455 標(biāo)準(zhǔn)支持全雙工通信、心跳檢測(cè)、認(rèn)證等核心特性是構(gòu)建實(shí)時(shí)通信場(chǎng)景如聊天室、實(shí)時(shí)監(jiān)控、推送服務(wù)的首選工具。本文將系統(tǒng)講解websockets的所有常用 API并通過(guò)實(shí)戰(zhàn)案例串聯(lián)知識(shí)點(diǎn)幫助你掌握異步 WebSocket 開(kāi)發(fā)的全流程。一、環(huán)境準(zhǔn)備安裝依賴(lài)websockets僅支持 Python 3.7且依賴(lài)asyncioPython 3.7 內(nèi)置安裝命令如下# 安裝最新穩(wěn)定版pipinstallwebsockets# 驗(yàn)證安裝python -cimport websockets; print(websockets.__version__)二、核心概念與基礎(chǔ)架構(gòu)WebSocket 是基于 TCP 的全雙工通信協(xié)議核心特點(diǎn)一次握手持久連接無(wú)需反復(fù)建立 HTTP 連接服務(wù)端/客戶(hù)端可主動(dòng)推送消息支持文本、二進(jìn)制消息傳輸。websockets核心 API 分為兩大模塊服務(wù)端websockets.serve()創(chuàng)建服務(wù)端、websockets.WebSocketServerProtocol連接對(duì)象客戶(hù)端websockets.connect()創(chuàng)建客戶(hù)端、websockets.WebSocketClientProtocol連接對(duì)象通用消息收發(fā)send()/recv()、連接關(guān)閉close()、狀態(tài)檢測(cè)等。三、基礎(chǔ)常用 API 詳解3.1 最簡(jiǎn)示例Echo 服務(wù)服務(wù)端客戶(hù)端實(shí)現(xiàn)“客戶(hù)端發(fā)什么服務(wù)端就返回什么”的基礎(chǔ)功能覆蓋核心 APIserve()、connect()、recv()、send()。3.1.1 服務(wù)端代碼importasyncioimportwebsockets# 核心處理函數(shù)每個(gè)客戶(hù)端連接會(huì)觸發(fā)該函數(shù)asyncdefecho_handler(websocket,path): websocket: 客戶(hù)端連接對(duì)象WebSocketServerProtocol 實(shí)例 path: 客戶(hù)端請(qǐng)求的路徑如 ws://localhost:8765/chat try:# 循環(huán)接收客戶(hù)端消息全雙工通信持續(xù)監(jiān)聽(tīng)asyncformessageinwebsocket:print(f服務(wù)端收到{message})# 發(fā)送消息給客戶(hù)端echo 邏輯awaitwebsocket.send(fEcho:{message})finally:print(f客戶(hù)端{(lán)websocket.remote_address}斷開(kāi)連接)# 啟動(dòng)服務(wù)端asyncdefstart_echo_server():# 核心 APIwebsockets.serve() 創(chuàng)建服務(wù)端# 參數(shù)處理函數(shù)、綁定地址、端口asyncwithwebsockets.serve(echo_handler,localhost,8765):print(Echo 服務(wù)端已啟動(dòng)ws://localhost:8765)# 掛起事件循環(huán)保持服務(wù)運(yùn)行awaitasyncio.Future()# 無(wú)限運(yùn)行# 運(yùn)行服務(wù)端if__name____main__:asyncio.run(start_echo_server())3.1.2 客戶(hù)端代碼importasyncioimportwebsocketsasyncdefecho_client():# 核心 APIwebsockets.connect() 連接服務(wù)端asyncwithwebsockets.connect(ws://localhost:8765)aswebsocket:# 發(fā)送文本消息核心 APIsend()awaitwebsocket.send(Hello WebSocket!)# 接收消息核心 APIrecv()responseawaitwebsocket.recv()print(f客戶(hù)端收到{response})# 主動(dòng)發(fā)送多條消息foriinrange(2):msgf測(cè)試消息{i1}awaitwebsocket.send(msg)resawaitwebsocket.recv()print(f客戶(hù)端收到{res})# 運(yùn)行客戶(hù)端需先啟動(dòng)服務(wù)端if__name____main__:asyncio.run(echo_client())3.2 消息類(lèi)型文本與二進(jìn)制消息WebSocket 支持文本str和二進(jìn)制bytes兩種消息類(lèi)型websockets原生支持這兩種格式的收發(fā)3.2.1 服務(wù)端支持二進(jìn)制消息importasyncioimportwebsocketsasyncdefbinary_handler(websocket,path):asyncformessageinwebsocket:# 判斷消息類(lèi)型ifisinstance(message,str):print(f文本消息{message})awaitwebsocket.send(f文本響應(yīng){message})elifisinstance(message,bytes):print(f二進(jìn)制消息長(zhǎng)度{len(message)}{message})# 二進(jìn)制消息響應(yīng)比如返回原字節(jié)后綴awaitwebsocket.send(messageb_response)asyncdefstart_binary_server():asyncwithwebsockets.serve(binary_handler,localhost,8766):print(二進(jìn)制消息服務(wù)端已啟動(dòng)ws://localhost:8766)awaitasyncio.Future()# asyncio.run(start_binary_server())3.2.2 客戶(hù)端發(fā)送二進(jìn)制消息importasyncioimportwebsocketsasyncdefbinary_client():asyncwithwebsockets.connect(ws://localhost:8766)aswebsocket:# 發(fā)送文本消息awaitwebsocket.send(這是文本消息)text_respawaitwebsocket.recv()print(f文本響應(yīng){text_resp})# 發(fā)送二進(jìn)制消息比如圖片字節(jié)、自定義二進(jìn)制數(shù)據(jù)binary_databhello_binaryawaitwebsocket.send(binary_data)binary_respawaitwebsocket.recv()print(f二進(jìn)制響應(yīng){binary_resp})# asyncio.run(binary_client())3.3 連接管理關(guān)閉連接與狀態(tài)檢測(cè)websockets提供連接狀態(tài)檢測(cè)、主動(dòng)關(guān)閉連接的 API核心屬性/方法websocket.closed判斷連接是否關(guān)閉布爾值websocket.close(code1000, reason)主動(dòng)關(guān)閉連接code 遵循 WebSocket 標(biāo)準(zhǔn)碼websocket.close_code/websocket.close_reason獲取關(guān)閉碼/原因。示例主動(dòng)關(guān)閉連接與狀態(tài)檢測(cè)importasyncioimportwebsocketsasyncdefclose_handler(websocket,path):# 接收客戶(hù)端消息觸發(fā)關(guān)閉邏輯messageawaitwebsocket.recv()ifmessageclose:# 主動(dòng)關(guān)閉連接標(biāo)準(zhǔn)關(guān)閉碼 1000正常關(guān)閉awaitwebsocket.close(code1000,reason客戶(hù)端請(qǐng)求關(guān)閉)print(f連接已關(guān)閉碼{websocket.close_code}原因{websocket.close_reason})else:awaitwebsocket.send(f收到{message})asyncdefstart_close_server():asyncwithwebsockets.serve(close_handler,localhost,8767):print(關(guān)閉連接測(cè)試服務(wù)端已啟動(dòng)ws://localhost:8767)awaitasyncio.Future()asyncdefclose_client():asyncwithwebsockets.connect(ws://localhost:8767)aswebsocket:# 檢測(cè)初始狀態(tài)print(f初始連接狀態(tài){關(guān)閉ifwebsocket.closedelse開(kāi)啟})# 發(fā)送關(guān)閉指令awaitwebsocket.send(close)# 等待連接關(guān)閉可選awaitasyncio.sleep(0.1)print(f發(fā)送關(guān)閉指令后狀態(tài){關(guān)閉ifwebsocket.closedelse開(kāi)啟})# 先啟動(dòng)服務(wù)端再運(yùn)行客戶(hù)端# asyncio.run(start_close_server())# asyncio.run(close_client())3.4 異常處理捕獲 WebSocket 相關(guān)異常websockets定義了多種異常類(lèi)型核心異常包括websockets.exceptions.ConnectionClosed連接已關(guān)閉websockets.exceptions.ConnectionClosedError連接異常關(guān)閉websockets.exceptions.InvalidURI無(wú)效的 WebSocket 地址websockets.exceptions.TimeoutError連接/收發(fā)超時(shí)。示例完整異常處理importasyncioimportwebsocketsfromwebsockets.exceptionsimport(ConnectionClosed,InvalidURI,TimeoutError)asyncdefexception_handler(websocket,path):try:asyncformessageinwebsocket:print(f收到消息{message})awaitwebsocket.send(f響應(yīng){message})exceptConnectionClosedase:print(f連接關(guān)閉異常碼{e.code}原因{e.reason})exceptExceptionase:print(f未知異常{e})asyncdefstart_exception_server():asyncwithwebsockets.serve(exception_handler,localhost,8768):print(異常處理服務(wù)端已啟動(dòng)ws://localhost:8768)awaitasyncio.Future()asyncdefexception_client():try:# 測(cè)試1無(wú)效 URI故意寫(xiě)錯(cuò)# async with websockets.connect(ws://localhost:8768/invalid) as websocket:# 測(cè)試2正常連接后強(qiáng)制斷開(kāi)asyncwithwebsockets.connect(ws://localhost:8768,timeout5)aswebsocket:awaitwebsocket.send(測(cè)試消息)# 模擬服務(wù)端斷開(kāi)后繼續(xù)發(fā)送awaitasyncio.sleep(1)awaitwebsocket.send(斷開(kāi)后發(fā)送的消息)# 觸發(fā) ConnectionClosedexceptInvalidURIase:print(f錯(cuò)誤無(wú)效的 URI -{e})exceptTimeoutErrorase:print(f錯(cuò)誤連接超時(shí) -{e})exceptConnectionClosedase:print(f錯(cuò)誤連接已關(guān)閉 -{e.code}|{e.reason})exceptExceptionase:print(f通用錯(cuò)誤{e})# asyncio.run(start_exception_server())# asyncio.run(exception_client())3.5 進(jìn)階 API心跳檢測(cè)防止連接斷開(kāi)WebSocket 長(zhǎng)連接可能因網(wǎng)絡(luò)閑置被網(wǎng)關(guān)/服務(wù)器斷開(kāi)心跳檢測(cè)是核心解決方案客戶(hù)端/服務(wù)端定期發(fā)送心跳包如ping對(duì)方回復(fù)pong確認(rèn)連接存活。websockets內(nèi)置ping()/pong()方法也可自定義心跳邏輯示例帶心跳檢測(cè)的客戶(hù)端服務(wù)端importasyncioimportwebsocketsfromdatetimeimportdatetime# ---------------------- 服務(wù)端 ----------------------asyncdefheartbeat_server_handler(websocket,path):# 啟動(dòng)心跳檢測(cè)任務(wù)后臺(tái)運(yùn)行heartbeat_taskasyncio.create_task(server_heartbeat(websocket))try:asyncformessageinwebsocket:ifmessageping:# 響應(yīng)心跳包awaitwebsocket.send(pong)print(f[{datetime.now()}] 服務(wù)端收到心跳回復(fù) pong)else:print(f[{datetime.now()}] 服務(wù)端收到消息{message})awaitwebsocket.send(f響應(yīng){message})finally:# 取消心跳任務(wù)heartbeat_task.cancel()print(f[{datetime.now()}] 客戶(hù)端斷開(kāi)心跳任務(wù)已取消)asyncdefserver_heartbeat(websocket):服務(wù)端心跳檢測(cè)定期檢查連接狀態(tài)whileTrue:awaitasyncio.sleep(10)# 每10秒檢測(cè)一次ifwebsocket.closed:break# 主動(dòng)發(fā)送 ping可選也可等客戶(hù)端發(fā)awaitwebsocket.ping()print(f[{datetime.now()}] 服務(wù)端發(fā)送 ping 檢測(cè))asyncdefstart_heartbeat_server():asyncwithwebsockets.serve(heartbeat_server_handler,localhost,8769):print(心跳檢測(cè)服務(wù)端已啟動(dòng)ws://localhost:8769)awaitasyncio.Future()# ---------------------- 客戶(hù)端 ----------------------asyncdefclient_heartbeat(websocket):客戶(hù)端心跳任務(wù)每5秒發(fā)送一次 pingwhileTrue:awaitasyncio.sleep(5)ifwebsocket.closed:breakawaitwebsocket.send(ping)# 等待 pong 響應(yīng)超時(shí)則斷開(kāi)try:respawaitasyncio.wait_for(websocket.recv(),timeout3)ifresppong:print(f[{datetime.now()}] 客戶(hù)端收到 pong連接正常)else:raiseException(非心跳響應(yīng))exceptasyncio.TimeoutError:print(f[{datetime.now()}] 心跳超時(shí)關(guān)閉連接)awaitwebsocket.close()breakasyncdefheartbeat_client():try:asyncwithwebsockets.connect(ws://localhost:8769)aswebsocket:# 啟動(dòng)客戶(hù)端心跳任務(wù)heartbeat_taskasyncio.create_task(client_heartbeat(websocket))# 發(fā)送業(yè)務(wù)消息awaitwebsocket.send(業(yè)務(wù)消息1)resp1awaitwebsocket.recv()print(f[{datetime.now()}] 客戶(hù)端收到{resp1})# 等待心跳運(yùn)行模擬長(zhǎng)連接awaitasyncio.sleep(20)# 取消心跳任務(wù)heartbeat_task.cancel()exceptExceptionase:print(f客戶(hù)端異常{e})# 運(yùn)行先啟動(dòng)服務(wù)端再啟動(dòng)客戶(hù)端# asyncio.run(start_heartbeat_server())# asyncio.run(heartbeat_client())3.6 進(jìn)階 API連接認(rèn)證Token/用戶(hù)名密碼WebSocket 連接建立時(shí)可通過(guò)請(qǐng)求頭或路徑參數(shù)做身份認(rèn)證websockets支持在連接握手階段驗(yàn)證示例基于 Token 的認(rèn)證importasyncioimportwebsocketsfromwebsockets.exceptionsimportRejectConnection# 模擬合法 Token 列表VALID_TOKENS{token_123,token_456}# ---------------------- 認(rèn)證中間件 ----------------------asyncdefauth_middleware(websocket,path): 連接握手階段認(rèn)證 - 從請(qǐng)求頭獲取 Token - 驗(yàn)證失敗則拒絕連接RejectConnection # 獲取請(qǐng)求頭中的 Tokenwebsocket.request_headers 是類(lèi)字典對(duì)象tokenwebsocket.request_headers.get(X-Auth-Token)ifnottokenortokennotinVALID_TOKENS:# 拒絕連接標(biāo)準(zhǔn) 401 狀態(tài)碼raiseRejectConnection(401,Invalid or missing token)# 認(rèn)證通過(guò)進(jìn)入業(yè)務(wù)處理awaitauth_handler(websocket,path)# ---------------------- 業(yè)務(wù)處理 ----------------------asyncdefauth_handler(websocket,path):print(f客戶(hù)端{(lán)websocket.remote_address}認(rèn)證通過(guò)開(kāi)始通信)asyncformessageinwebsocket:awaitwebsocket.send(f認(rèn)證通過(guò)的響應(yīng){message})# ---------------------- 啟動(dòng)認(rèn)證服務(wù)端 ----------------------asyncdefstart_auth_server():asyncwithwebsockets.serve(auth_middleware,localhost,8770):print(認(rèn)證服務(wù)端已啟動(dòng)ws://localhost:8770)awaitasyncio.Future()# ---------------------- 認(rèn)證客戶(hù)端 ----------------------asyncdefauth_client(valid_token:boolTrue):# 構(gòu)造請(qǐng)求頭攜帶 Tokenheaders{}ifvalid_token:headers[X-Auth-Token]token_123# 合法 Tokenelse:headers[X-Auth-Token]invalid_token# 非法 Tokentry:asyncwithwebsockets.connect(ws://localhost:8770,extra_headersheaders# 自定義請(qǐng)求頭)aswebsocket:awaitwebsocket.send(認(rèn)證后的業(yè)務(wù)消息)respawaitwebsocket.recv()print(f客戶(hù)端收到{resp})exceptwebsockets.exceptions.ConnectionRefusedErrorase:print(f連接被拒絕{e})# 運(yùn)行# 1. 啟動(dòng)服務(wù)端asyncio.run(start_auth_server())# 2. 合法 Token 客戶(hù)端asyncio.run(auth_client(valid_tokenTrue))# 3. 非法 Token 客戶(hù)端asyncio.run(auth_client(valid_tokenFalse))3.7 進(jìn)階 API服務(wù)端廣播多客戶(hù)端通信WebSocket 最典型的場(chǎng)景是多客戶(hù)端實(shí)時(shí)通信如聊天室核心是維護(hù)客戶(hù)端連接池實(shí)現(xiàn)消息廣播示例簡(jiǎn)易聊天室廣播多客戶(hù)端importasyncioimportwebsocketsfromtypingimportSet# 維護(hù)所有活躍的客戶(hù)端連接active_connections:Set[websockets.WebSocketServerProtocol]set()asyncdefchat_handler(websocket,path):# 1. 新客戶(hù)端連接加入連接池active_connections.add(websocket)client_addrwebsocket.remote_addressprint(f[{client_addr}] 加入聊天室當(dāng)前在線(xiàn){len(active_connections)})try:# 2. 接收客戶(hù)端消息并廣播asyncformessageinwebsocket:broadcast_msgf[{client_addr}]{message}print(f廣播消息{broadcast_msg})# 遍歷所有客戶(hù)端發(fā)送廣播消息forconninactive_connections:ifconn!websocket:# 不發(fā)給自己awaitconn.send(broadcast_msg)exceptwebsockets.exceptions.ConnectionClosed:print(f[{client_addr}] 異常斷開(kāi))finally:# 3. 客戶(hù)端斷開(kāi)從連接池移除active_connections.remove(websocket)print(f[{client_addr}] 離開(kāi)聊天室當(dāng)前在線(xiàn){len(active_connections)})asyncdefstart_chat_server():asyncwithwebsockets.serve(chat_handler,localhost,8771):print(聊天室服務(wù)端已啟動(dòng)ws://localhost:8771)awaitasyncio.Future()# ---------------------- 聊天室客戶(hù)端 ----------------------asyncdefchat_client(username:str):asyncwithwebsockets.connect(ws://localhost:8771)aswebsocket:# 啟動(dòng)接收消息的任務(wù)后臺(tái)運(yùn)行實(shí)時(shí)接收廣播recv_taskasyncio.create_task(recv_chat_msg(websocket))# 控制臺(tái)輸入消息并發(fā)送whileTrue:msginput(f{username} )ifmsg.lower()exit:breakawaitwebsocket.send(f{username}{msg})# 取消接收任務(wù)recv_task.cancel()awaitwebsocket.close()asyncdefrecv_chat_msg(websocket):后臺(tái)接收廣播消息whileTrue:try:msgawaitwebsocket.recv()print(f 收到廣播{msg})print( ,end,flushTrue)# 恢復(fù)輸入提示符exceptwebsockets.exceptions.ConnectionClosed:break# 運(yùn)行# 1. 啟動(dòng)聊天室服務(wù)端asyncio.run(start_chat_server())# 2. 啟動(dòng)多個(gè)客戶(hù)端不同終端運(yùn)行# asyncio.run(chat_client(用戶(hù)1))# asyncio.run(chat_client(用戶(hù)2))四、實(shí)戰(zhàn)案例串聯(lián)所有常用 API 的實(shí)時(shí)監(jiān)控系統(tǒng)場(chǎng)景需求實(shí)現(xiàn)一個(gè)簡(jiǎn)易的實(shí)時(shí)監(jiān)控系統(tǒng)服務(wù)端認(rèn)證客戶(hù)端Token 驗(yàn)證接收客戶(hù)端上報(bào)的監(jiān)控?cái)?shù)據(jù)二進(jìn)制/文本廣播監(jiān)控?cái)?shù)據(jù)給所有在線(xiàn)客戶(hù)端心跳檢測(cè)清理離線(xiàn)客戶(hù)端完整異常處理??蛻?hù)端攜帶 Token 連接服務(wù)端定期上報(bào)監(jiān)控?cái)?shù)據(jù)CPU/內(nèi)存使用率模擬實(shí)時(shí)接收服務(wù)端廣播的所有客戶(hù)端監(jiān)控?cái)?shù)據(jù)心跳檢測(cè)斷連自動(dòng)重連。完整代碼實(shí)現(xiàn)4.1 服務(wù)端代碼monitor_server.pyimportasyncioimportwebsocketsfromwebsockets.exceptionsimportRejectConnection,ConnectionClosedfromtypingimportSet,Dictfromdatetimeimportdatetime# 配置VALID_TOKENS{monitor_token_2025}# 合法認(rèn)證 TokenSERVER_HOSTlocalhostSERVER_PORT8772HEARTBEAT_INTERVAL10# 服務(wù)端心跳檢測(cè)間隔秒CLIENT_TIMEOUT30# 客戶(hù)端超時(shí)時(shí)間秒# 客戶(hù)端連接池{連接對(duì)象: {addr: 地址, last_heartbeat: 最后心跳時(shí)間}}client_pool:Dict[websockets.WebSocketServerProtocol,dict]{}# ---------------------- 認(rèn)證邏輯 ----------------------asyncdefauth_middleware(websocket,path):tokenwebsocket.request_headers.get(X-Monitor-Token)ifnottokenortokennotinVALID_TOKENS:raiseRejectConnection(401,Invalid Monitor Token)# 認(rèn)證通過(guò)記錄客戶(hù)端信息client_pool[websocket]{addr:websocket.remote_address,last_heartbeat:datetime.now()}print(f[{datetime.now()}] 客戶(hù)端{(lán)websocket.remote_address}認(rèn)證通過(guò)加入監(jiān)控)# 進(jìn)入業(yè)務(wù)處理awaitmonitor_handler(websocket,path)# ---------------------- 心跳檢測(cè)任務(wù) ----------------------asyncdefserver_heartbeat_check():后臺(tái)定時(shí)清理離線(xiàn)客戶(hù)端whileTrue:awaitasyncio.sleep(HEARTBEAT_INTERVAL)nowdatetime.now()# 遍歷連接池檢測(cè)超時(shí)客戶(hù)端forconn,infoinlist(client_pool.items()):if(now-info[last_heartbeat]).total_seconds()CLIENT_TIMEOUT:print(f[{datetime.now()}] 客戶(hù)端{(lán)info[addr]}心跳超時(shí)強(qiáng)制斷開(kāi))awaitconn.close(code1001,reasonHeartbeat timeout)delclient_pool[conn]# ---------------------- 業(yè)務(wù)處理 ----------------------asyncdefmonitor_handler(websocket,path):try:asyncformessageinwebsocket:# 更新最后心跳時(shí)間無(wú)論消息類(lèi)型視為存活client_pool[websocket][last_heartbeat]datetime.now()# 處理監(jiān)控?cái)?shù)據(jù)ifisinstance(message,str):# 文本消息心跳/控制指令ifmessageping:awaitwebsocket.send(pong)print(f[{datetime.now()}] 客戶(hù)端{(lán)websocket.remote_address}心跳響應(yīng))else:print(f[{datetime.now()}] 客戶(hù)端{(lán)websocket.remote_address}文本監(jiān)控?cái)?shù)據(jù){message})elifisinstance(message,bytes):# 二進(jìn)制消息模擬監(jiān)控?cái)?shù)據(jù)如序列化的指標(biāo)print(f[{datetime.now()}] 客戶(hù)端{(lán)websocket.remote_address}二進(jìn)制監(jiān)控?cái)?shù)據(jù)長(zhǎng)度{len(message)})# 廣播監(jiān)控?cái)?shù)據(jù)給所有客戶(hù)端broadcast_msgf[{websocket.remote_address}]{message[:50]}...# 截?cái)嚅L(zhǎng)消息forconninclient_pool:ifconn!websocket:awaitconn.send(broadcast_msg)exceptConnectionClosedase:print(f[{datetime.now()}] 客戶(hù)端{(lán)websocket.remote_address}斷開(kāi)碼{e.code}原因{e.reason})finally:# 清理連接池ifwebsocketinclient_pool:delclient_pool[websocket]print(f[{datetime.now()}] 客戶(hù)端{(lán)websocket.remote_address}已移除當(dāng)前在線(xiàn){len(client_pool)})# ---------------------- 啟動(dòng)服務(wù)端 ----------------------asyncdefstart_monitor_server():# 啟動(dòng)心跳檢測(cè)后臺(tái)任務(wù)asyncio.create_task(server_heartbeat_check())# 啟動(dòng) WebSocket 服務(wù)asyncwithwebsockets.serve(auth_middleware,SERVER_HOST,SERVER_PORT):print(f[{datetime.now()}] 監(jiān)控服務(wù)端已啟動(dòng)ws://{SERVER_HOST}:{SERVER_PORT})awaitasyncio.Future()if__name____main__:asyncio.run(start_monitor_server())4.2 客戶(hù)端代碼monitor_client.pyimportasyncioimportwebsocketsfromwebsockets.exceptionsimportConnectionRefusedError,ConnectionClosedimportrandomfromdatetimeimportdatetimeimporttime# 配置SERVER_URLws://localhost:8772AUTH_TOKENmonitor_token_2025# 合法 TokenREPORT_INTERVAL5# 監(jiān)控?cái)?shù)據(jù)上報(bào)間隔秒HEARTBEAT_INTERVAL8# 客戶(hù)端心跳間隔秒RECONNECT_INTERVAL3# 斷連重連間隔秒# ---------------------- 心跳任務(wù) ----------------------asyncdefclient_heartbeat(websocket):客戶(hù)端心跳定期發(fā)送 pingwhileTrue:awaitasyncio.sleep(HEARTBEAT_INTERVAL)ifwebsocket.closed:breaktry:awaitwebsocket.send(ping)respawaitasyncio.wait_for(websocket.recv(),timeout3)ifresppong:print(f[{datetime.now()}] 心跳正常)else:raiseException(非 pong 響應(yīng))exceptasyncio.TimeoutError:print(f[{datetime.now()}] 心跳超時(shí)準(zhǔn)備重連)awaitwebsocket.close()breakexceptExceptionase:print(f[{datetime.now()}] 心跳異常{e})break# ---------------------- 接收廣播任務(wù) ----------------------asyncdefrecv_broadcast(websocket):接收服務(wù)端廣播的監(jiān)控?cái)?shù)據(jù)whileTrue:try:msgawaitwebsocket.recv()print(f [{datetime.now()}] 收到監(jiān)控廣播{msg})exceptConnectionClosed:break# ---------------------- 上報(bào)監(jiān)控?cái)?shù)據(jù) ----------------------asyncdefreport_monitor_data(websocket):模擬上報(bào) CPU/內(nèi)存使用率文本二進(jìn)制whileTrue:awaitasyncio.sleep(REPORT_INTERVAL)ifwebsocket.closed:break# 模擬監(jiān)控?cái)?shù)據(jù)cpu_usagerandom.uniform(10.0,80.0)mem_usagerandom.uniform(20.0,90.0)# 1. 發(fā)送文本監(jiān)控?cái)?shù)據(jù)text_datafCPU:{cpu_usage:.2f}%, MEM:{mem_usage:.2f}%awaitwebsocket.send(text_data)print(f[{datetime.now()}] 上報(bào)文本監(jiān)控?cái)?shù)據(jù){text_data})# 2. 發(fā)送二進(jìn)制監(jiān)控?cái)?shù)據(jù)模擬序列化的指標(biāo)binary_dataf{cpu_usage:.2f}|{mem_usage:.2f}.encode(utf-8)awaitwebsocket.send(binary_data)print(f[{datetime.now()}] 上報(bào)二進(jìn)制監(jiān)控?cái)?shù)據(jù){binary_data})# ---------------------- 客戶(hù)端主邏輯支持重連 ----------------------asyncdefmonitor_client():whileTrue:try:# 建立連接攜帶認(rèn)證 Tokenasyncwithwebsockets.connect(SERVER_URL,extra_headers{X-Monitor-Token:AUTH_TOKEN})aswebsocket:print(f[{datetime.now()}] 成功連接監(jiān)控服務(wù)端)# 啟動(dòng)后臺(tái)任務(wù)heartbeat_taskasyncio.create_task(client_heartbeat(websocket))broadcast_taskasyncio.create_task(recv_broadcast(websocket))report_taskasyncio.create_task(report_monitor_data(websocket))# 等待任務(wù)完成或連接斷開(kāi)awaitasyncio.gather(heartbeat_task,broadcast_task,report_task)exceptConnectionRefusedError:print(f[{datetime.now()}] 連接被拒絕服務(wù)端未啟動(dòng)/Token 錯(cuò)誤)exceptConnectionClosed:print(f[{datetime.now()}] 連接斷開(kāi))exceptExceptionase:print(f[{datetime.now()}] 客戶(hù)端異常{e})# 斷連后重連print(f[{datetime.now()}]{RECONNECT_INTERVAL}秒后嘗試重連...)awaitasyncio.sleep(RECONNECT_INTERVAL)if__name____main__:asyncio.run(monitor_client())運(yùn)行說(shuō)明啟動(dòng)服務(wù)端python monitor_server.py啟動(dòng)多個(gè)客戶(hù)端不同終端python monitor_client.py觀察效果客戶(hù)端自動(dòng)認(rèn)證連接定期上報(bào)文本/二進(jìn)制監(jiān)控?cái)?shù)據(jù)服務(wù)端廣播數(shù)據(jù)給所有客戶(hù)端心跳檢測(cè)超時(shí)自動(dòng)清理客戶(hù)端客戶(hù)端斷連后自動(dòng)重連。五、核心 API 總結(jié)與最佳實(shí)踐5.1 核心 API 速查表類(lèi)別API 名稱(chēng)/方法作用服務(wù)端websockets.serve(handler, host, port)創(chuàng)建 WebSocket 服務(wù)端客戶(hù)端websockets.connect(uri, **kwargs)連接 WebSocket 服務(wù)端消息收發(fā)await websocket.send(msg)發(fā)送文本/二進(jìn)制消息await websocket.recv()接收消息單次async for msg in websocket持續(xù)監(jiān)聽(tīng)消息推薦連接管理websocket.closed判斷連接是否關(guān)閉await websocket.close(code, reason)主動(dòng)關(guān)閉連接code 遵循 WebSocket 標(biāo)準(zhǔn)碼websocket.request_headers獲取客戶(hù)端請(qǐng)求頭服務(wù)端認(rèn)證RejectConnection(status_code, reason)服務(wù)端拒絕連接認(rèn)證失敗心跳await websocket.ping()/pong()內(nèi)置 ping/pong 心跳底層異常ConnectionClosed連接關(guān)閉異常InvalidURI無(wú)效 URI 異常TimeoutError連接/收發(fā)超時(shí)異常5.2 最佳實(shí)踐連接池管理服務(wù)端維護(hù)客戶(hù)端連接池時(shí)使用set/dict存儲(chǔ)連接并在finally塊中清理避免內(nèi)存泄漏心跳必加長(zhǎng)連接場(chǎng)景必須實(shí)現(xiàn)心跳檢測(cè)防止閑置連接被網(wǎng)關(guān)/服務(wù)器斷開(kāi)異常全覆蓋至少捕獲ConnectionClosed、ConnectionRefusedError、TimeoutError等核心異常重連機(jī)制客戶(hù)端實(shí)現(xiàn)斷連自動(dòng)重連提升魯棒性并發(fā)控制服務(wù)端廣播時(shí)若客戶(hù)端數(shù)量多使用asyncio.Semaphore限制并發(fā)發(fā)送避免事件循環(huán)阻塞認(rèn)證安全認(rèn)證 Token 避免明文傳輸生產(chǎn)環(huán)境建議使用 WSSWebSocket Secure即wss://消息大小限制生產(chǎn)環(huán)境可通過(guò)max_size參數(shù)限制消息大小防止超大消息攻擊# 服務(wù)端限制單條消息最大 1MBwebsockets.serve(handler,host,port,max_size1024*1024)# 客戶(hù)端限制單條消息最大 1MBwebsockets.connect(uri,max_size1024*1024)六、總結(jié)本文覆蓋了websockets庫(kù)的所有核心常用 API基礎(chǔ)服務(wù)端創(chuàng)建、客戶(hù)端連接、文本/二進(jìn)制消息收發(fā)進(jìn)階連接管理、異常處理、心跳檢測(cè)、身份認(rèn)證、服務(wù)端廣播實(shí)戰(zhàn)通過(guò)實(shí)時(shí)監(jiān)控系統(tǒng)串聯(lián)所有 API實(shí)現(xiàn)認(rèn)證、心跳、數(shù)據(jù)上報(bào)、廣播、重連等完整功能。掌握這些知識(shí)點(diǎn)后你可以輕松構(gòu)建各類(lèi)實(shí)時(shí)通信場(chǎng)景聊天室、實(shí)時(shí)監(jiān)控、推送服務(wù)、在線(xiàn)游戲等同時(shí)遵循最佳實(shí)踐確保系統(tǒng)的穩(wěn)定性和安全性。
版權(quán)聲明: 本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)聯(lián)系我們進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

查看網(wǎng)站建設(shè)時(shí)間營(yíng)銷(xiāo)型網(wǎng)站建設(shè)網(wǎng)站手機(jī)

查看網(wǎng)站建設(shè)時(shí)間,營(yíng)銷(xiāo)型網(wǎng)站建設(shè)網(wǎng)站手機(jī),資陽(yáng)網(wǎng)站建設(shè)方案,安卓網(wǎng)站開(kāi)發(fā)前景黑馬頭條 ps #xff1a; 學(xué)習(xí)代碼架構(gòu)設(shè)計(jì) 學(xué)習(xí)場(chǎng)景的封裝抽離 學(xué)習(xí)并發(fā)處理 時(shí)隔多年#xff0c;再次學(xué)習(xí)#xff0

2026/01/23 06:32:01

如何擁有自己的網(wǎng)站做網(wǎng)站后端的全部步驟

如何擁有自己的網(wǎng)站,做網(wǎng)站后端的全部步驟,山東公司網(wǎng)站推廣優(yōu)化,網(wǎng)站如何實(shí)現(xiàn)qq登錄功能PyTorch吞吐量?jī)?yōu)化實(shí)驗(yàn)#xff1a;Miniconda-Python3.9環(huán)境調(diào)優(yōu) 在深度學(xué)習(xí)模型訓(xùn)練中#

2026/01/23 00:20:01

網(wǎng)站如何做404大型門(mén)戶(hù)網(wǎng)站都有

網(wǎng)站如何做404,大型門(mén)戶(hù)網(wǎng)站都有,房地產(chǎn)排名,校園社交網(wǎng)站怎么做MoE 是 Mixture of Experts#xff08;混合專(zhuān)家模型#xff09;的縮寫(xiě)。它是目前解決大模型 “既要變得超級(jí)聰明

2026/01/23 10:10:01