简单来讲, ProtoBuf 是结构数据序列化方法,可简单类比xml,比xml更小,支持Python,Java,C++等多种语言,支持多平台,扩展性和兼容性更好。本篇不描述如何定义proto文件,具体定义方法可自行查询。本文内容是通过websocket客户端,将protobuf格式数据发送到服务端。
使用protoBuf
以下根据定义好的proto文件为例,文件格式为utf-8,proto文件内容在文章末尾查看。
1.需先下载protobuf,通过命令将proto文件转换为python文件:
本文使用的命令格式是“protoc python文件输出路径 要转换的proto文件”,
protoc --python_out=./ ./Error.proto protoc --python_out=./ ./Login.proto protoc --python_out=./ ./PbMessage.proto protoc --python_out=./ ./MdgwMonitorData.proto protoc --python_out=./ ./TdgwMonitorData.proto
执行命令之后生成的文件名分别是:
Error_pb2.py Login_pb2.py PbMessage_pb2.py MdgwMonitorData_pb2.py TdgwMonitorData_pb2.py
2.调用生成proto格式数据
生成login信息,先实例化PbMessage_pb2.PbMessageP, 这是proto的调用接口,包含了Login、Error、MdgwMonitorData、TdgwMonitorData多个文件的调用。
def pmlogin(): # 实例化PbMessageP pbm = pb2.PbMessageP() pbm.msgType = pb2.PbMessageP.LoginType # pbm.login.identity = "GatewayMontior" pbm.login.targetIdentity = "MDGW" pbm.login.pingPongInt = 10 pbm.login.prtclVersion = "0.1" print(pbm) # 通过websocket发送,需将pbm对象序列化 return pbm.SerializeToString()
3.websocket客户端发送二进制数据
使用websocket第三方库,需要先定义on_open、on_message、on_error、on_close四个函数,on_open函数中定义连接服务端时发送的信息,on_message接收服务端返回的信息,在on_message中解析服务端返回的proto格式数据。发送二进制数据时,需在send中入参opcode=0x2。
def on_message(ws, message): print("**************** message *******************") print(message) pbm = pb2.PbMessageP() pbm.msgType = pb2.PbMessageP.MdgwStaticInfoType pbm.ParseFromString(message) print(pbm) print(pbm.upgradeMdgwVersion) def on_error(ws, error): print("**************** error *******************") print(error) def on_close(ws, close_status_code, close_msg): print("### closed ###") def on_open(ws): def run(*args): # 发送二进制数据 需入参opcode=0x2 ws.send(pmlogin(), opcode=0x2) # 登录认证后,发送订阅信息 ws.send(metricsSub(), opcode=0x2) time.sleep(1) # ws.close() print("thread terminating...") _thread.start_new_thread(run, ())
源码:
#!/usr/bin/env python # -*- coding: utf-8 -*- import websocket import _thread import time import Login_pb2 as login import PbMessage_pb2 as pb2 mdgw_url = "ws://127.0.0.1:9898/mdgw/monitor" def on_message(ws, message): print("**************** message *******************") print(message) pbm = pb2.PbMessageP() pbm.msgType = pb2.PbMessageP.MdgwStaticInfoType pbm.ParseFromString(message) print(pbm) print(pbm.upgradeMdgwVersion) def on_error(ws, error): print("**************** error *******************") print(error) def on_close(ws, close_status_code, close_msg): print("### closed ###") def on_open(ws): def run(*args): ws.send(pmlogin(), opcode=0x2) ws.send(metricsSub(), opcode=0x2) time.sleep(1) # ws.close() print("thread terminating...") _thread.start_new_thread(run, ()) def loginp(): lg = login.LoginP() # 发送方身份 lg.identity = "monweb" # 接收方身份 监控系统填可以标识其身份的 Identity,连接行情网关时 TargetIdentity 填写“MDGW”。 lg.targetIdentity = "MDGW" # PingPong 间隔,单位为秒 lg.pingPongInt = 10 # 监控协议版本 lg.prtclVersion = "0.1" print(lg) return lg.SerializeToString() def metsp(monitorType="", metricName="", period=5): mp = login.MetricP() # 监 控 项 。 取 值 :marketDataTask 、fileTask 、forwardTask。 当 MetricName=M001时,该域填空串。 if monitorType != "": mp.monitorType = monitorType # 发送监控名称 静态信息M001 行情任务运行指标M002 文件任务运行指标M003 转发任务运行指标M004 任务运行过程中输出信息M005 mp.metricName = metricName # 发送周期 , 单位为秒。 取值为-1表示实时推送 mp.period = period return mp def pmlogin(): pbm = pb2.PbMessageP() pbm.msgType = pb2.PbMessageP.LoginType pbm.login.identity = "GatewayMontior" pbm.login.targetIdentity = "MDGW" pbm.login.pingPongInt = 10 pbm.login.prtclVersion = "0.1" # print(pbm) return pbm.SerializeToString() def metricsSub(): pbm = pb2.PbMessageP() pbm.msgType = pb2.PbMessageP.MetricsSubType pbm.metricsSub.software = "MDGW" pbm.metricsSub.metrics.MergeFrom([metsp("", "M001", 5)]) # print(pbm) return pbm.SerializeToString() if __name__ == "__main__": websocket.enableTrace(True) ws = websocket.WebSocketApp(mdgw_url, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close) ws.run_forever(ping_timeout=10)
ProtoBuf文件示例
Error.proto
syntax = "proto3"; option java_package = "com.sse.fea.monitor.protobuf"; option java_outer_classname ="Error"; message ErrorP { uint32 errorStatus = 1; string text = 2; }
Login.proto
syntax = "proto3"; option java_package = "com.sse.fea.monitor.protobuf"; option java_outer_classname ="Login"; message LoginP { string identity = 1; string targetIdentity = 2; uint32 pingPongInt = 3; string prtclVersion = 4; } message MetricsSubP{ string software = 1; repeated MetricP metrics = 2; } message MetricP { string monitorType = 1; string metricName = 2; int32 period = 3; }
MdgwMonitorData.proto
syntax = "proto3"; option java_package = "com.sse.fea.monitor.protobuf"; option java_outer_classname = "MdgwMonitorData"; option java_multiple_files = true; // 行情网关静态信息 message MdgwStaticInfoP { string metricName = 1; string mdgwVersion = 2; string upgradeMdgwVersion = 3; string lowestVssPrtclVersion = 4; string certIdentifier = 5; string certType = 6; string certAlgorithm = 7; string certExpiryDate = 8; uint32 certRemainingDay = 9; string transportType = 10; } // md任务运行指标 message MdgwMdTaskRunMetricP { string metricName = 1; string envNum = 2; string taskName = 3; string taskState = 4; uint32 queuePendingNum = 5; uint32 queueRejectNum = 6; uint64 receivePkgNum = 7; uint64 receivePkgTps = 8; uint64 lossPkgNum = 9; string lossPkgRate = 10; // 0.0% uint64 mdFileProcessTps = 11; uint64 processPkgTps = 12; repeated string securityType = 13; string serverAddress = 14; string vssId = 15; message VssInfoP { string compId = 1; string addr = 2; uint64 sendTps = 3; } repeated VssInfoP vssInfo = 16; } // file传输任务运行指标 message MdgwFileTaskRunMetricP { string metricName = 1; string envNum = 2; string taskName = 3; string taskState = 4; uint32 queuePendingNum = 5; uint32 queueRejectNum = 6; uint64 receivePkgNum = 7; uint64 receivePkgTps = 8; uint64 lossPkgNum = 9; string lossPkgRate = 10; // 0.0% uint64 processPkgTps = 11; repeated string fileType = 12; string serverAddress = 13; } // forward任务运行指标 message MdgwForwardTaskRunMetricP { string metricName = 1; string envNum = 2; string taskName = 3; string taskState = 4; uint32 queuePendingNum = 5; uint32 queueRejectNum = 6; uint64 receivePkgNum = 7; uint64 receivePkgTps = 8; uint64 lossPkgNum = 9; string lossPkgRate = 10; // 0.0% uint64 processPkgTps = 11; uint64 forwardPkgNum = 12; repeated string forwardType = 13; string serverAddress = 14; string vssId = 15; } // 任务运行输出信息 message MdgwTaskOutputP { string metricName = 1; string taskName = 2; string happenTime = 3; string level = 4; string info = 5; }
PbMessage.proto
syntax = "proto3"; option java_package = "com.sse.fea.monitor.protobuf"; option java_outer_classname ="PbMessage"; import "Login.proto"; import "Error.proto"; import "MdgwMonitorData.proto"; import "TdgwMonitorData.proto"; message PbMessageP { enum MsgType { UnknownType = 0; LoginType = 1; ErrorType = 2; MetricsSubType = 3; MdgwStaticInfoType = 100; MdgwMdTaskRunMetricType = 101; MdgwFileTaskRunMetricType = 102; MdgwForwardTaskRunMetricType = 103; MdgwTaskOutputType = 104; TdgwStaticInfoType = 200; TdgwPbuRunMetricType = 201; TdgwOmsSessionMetricType = 202; TdgwPbuOutputType = 203; } MsgType msgType = 1; oneof dataBody { LoginP login = 2; ErrorP error = 3; MetricsSubP metricsSub = 4; MdgwFileTaskRunMetricP fileTaskRunMetric = 100; MdgwForwardTaskRunMetricP forwardTaskRunMetric = 101; MdgwMdTaskRunMetricP mdTaskRunMetric = 102; MdgwStaticInfoP mdgwStaticInfo = 103; MdgwTaskOutputP mdgwTaskOutput = 104; TdgwOmsSessionMetricP omsSessionMetric = 200; TdgwPbuRunMetricP pbuRunMetric = 201; TdgwStaticInfoP tdgwStaticInfo = 202; TdgwPbuOutputP tdgwPbuOutput = 203; } }
TdgwMonitorData.proto
syntax = "proto3"; option java_package = "com.sse.fea.monitor.protobuf"; option java_outer_classname = "TdgwMonitorData"; option java_multiple_files = true; // 交易网关静态信息 message TdgwStaticInfoP { string metricName = 1; string tdgwVersion = 2; string certType = 3; string certAlgorithm = 4; string certIdentifier = 5; string certExpiryDate = 6; uint32 certRemainingDay = 7; } // 交易网关PBU运行指标 message TdgwPbuRunMetricP { string metricName = 1; string platform = 2; string envNum = 3; string pbu = 4; string state = 5; string platformState = 6; string platformDateTime = 7; string tradeDay = 8; string upgradeTdgwVersion = 9; string lowestOmsPrtclVersion = 10; repeated string successSubPbu = 11; uint64 sendPkgCnt = 12; uint64 sendPkgTps = 13; uint64 receivePkgCnt = 14; uint64 receivePkgTps = 15; string serverAddress = 16; uint32 upQueuePendingNum = 17; uint32 downQueuePendingNum = 18; uint32 throttle = 19; uint32 reqSize = 20; uint32 maxReqSize = 21; uint32 suspendAcceptOrderCnt = 22; uint32 peakReqSize = 23; repeated ExecRptIndexP execRptIndex = 24; uint32 suspendAcceptExecCnt = 25; } message ExecRptIndexP { string pbu = 1; uint32 setId = 2; uint64 nextRptIndex = 3; } // 交易网关OMS会话指标 message TdgwOmsSessionMetricP { string metricName = 1; string platform = 2; string pbu = 3; string state = 4; string omsId = 5; uint64 sendPkgCnt = 6; uint64 sendPkgTps = 7; uint64 receivePkgCnt = 8; uint64 receivePkgTps = 9; uint64 orderCnt = 10; uint64 execRptCnt = 11; uint64 generalRespCnt = 12; uint64 orderRejectCnt = 13; } // 交易网关PBU运行输出信息 message TdgwPbuOutputP { string metricName = 1; string platform = 2; string pbu = 3; string happenTime = 4; string level = 5; string info = 6; }