• 欢迎访问
  • ...

python3+websocket+protobuf

Python 程序员的旅途 4年前 (2021-09-29) 2941次浏览 已收录 0个评论

简单来讲, ProtoBuf 是结构数据序列化方法,可简单类比xml,比xml更小,支持Python,Java,C++等多种语言,支持多平台,扩展性和兼容性更好。本篇不描述如何定义proto文件,具体定义方法可自行查询。本文内容是通过websocket客户端,将protobuf格式数据发送到服务端。

使用protoBuf

以下根据定义好的proto文件为例,文件格式为utf-8,proto文件内容在文章末尾查看。

1.需先下载protobuf,通过命令将proto文件转换为python文件:

本文使用的命令格式是“protoc  python文件输出路径  要转换的proto文件”,

protoc --python_out=./ ./Error.proto
protoc --python_out=./ ./Login.proto
protoc --python_out=./ ./PbMessage.proto
protoc --python_out=./ ./MdgwMonitorData.proto
protoc --python_out=./ ./TdgwMonitorData.proto

执行命令之后生成的文件名分别是:

Error_pb2.py
Login_pb2.py
PbMessage_pb2.py
MdgwMonitorData_pb2.py
TdgwMonitorData_pb2.py

2.调用生成proto格式数据

生成login信息,先实例化PbMessage_pb2.PbMessageP, 这是proto的调用接口,包含了Login、Error、MdgwMonitorData、TdgwMonitorData多个文件的调用。

def pmlogin():
    # 实例化PbMessageP
    pbm = pb2.PbMessageP()
    pbm.msgType = pb2.PbMessageP.LoginType
    # 
    pbm.login.identity = "GatewayMontior"
    pbm.login.targetIdentity = "MDGW"
    pbm.login.pingPongInt = 10
    pbm.login.prtclVersion = "0.1"
    print(pbm)
    
    # 通过websocket发送,需将pbm对象序列化
    return pbm.SerializeToString()

3.websocket客户端发送二进制数据

使用websocket第三方库,需要先定义on_open、on_message、on_error、on_close四个函数,on_open函数中定义连接服务端时发送的信息,on_message接收服务端返回的信息,在on_message中解析服务端返回的proto格式数据。发送二进制数据时,需在send中入参opcode=0x2。

def on_message(ws, message):
    print("**************** message *******************")
    print(message)
    pbm = pb2.PbMessageP()
    pbm.msgType = pb2.PbMessageP.MdgwStaticInfoType
    pbm.ParseFromString(message)
    print(pbm)
    print(pbm.upgradeMdgwVersion)

def on_error(ws, error):
    print("**************** error *******************")
    print(error)

def on_close(ws, close_status_code, close_msg):
    print("### closed ###")

def on_open(ws):
    def run(*args):
        # 发送二进制数据 需入参opcode=0x2
        ws.send(pmlogin(), opcode=0x2)
        # 登录认证后,发送订阅信息
        ws.send(metricsSub(), opcode=0x2)

        time.sleep(1)
        # ws.close()
        print("thread terminating...")
    _thread.start_new_thread(run, ())

 

源码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import websocket
import _thread
import time
import Login_pb2 as login
import PbMessage_pb2 as pb2


mdgw_url = "ws://127.0.0.1:9898/mdgw/monitor"
def on_message(ws, message):
    print("**************** message *******************")
    print(message)
    pbm = pb2.PbMessageP()
    pbm.msgType = pb2.PbMessageP.MdgwStaticInfoType
    pbm.ParseFromString(message)
    print(pbm)
    print(pbm.upgradeMdgwVersion)


def on_error(ws, error):
    print("**************** error *******************")
    print(error)
    

def on_close(ws, close_status_code, close_msg):
    print("### closed ###")


def on_open(ws):
    def run(*args):
        ws.send(pmlogin(), opcode=0x2)
        ws.send(metricsSub(), opcode=0x2)

        time.sleep(1)
        # ws.close()
        print("thread terminating...")
    _thread.start_new_thread(run, ())


def loginp():
    lg = login.LoginP()
    # 发送方身份
    lg.identity = "monweb"
    # 接收方身份 监控系统填可以标识其身份的 Identity,连接行情网关时 TargetIdentity 填写“MDGW”。
    lg.targetIdentity = "MDGW"
    # PingPong 间隔,单位为秒
    lg.pingPongInt = 10
    # 监控协议版本
    lg.prtclVersion = "0.1"
    print(lg)
    return lg.SerializeToString()

def metsp(monitorType="", metricName="", period=5):
    mp = login.MetricP()
    # 监 控 项 。 取 值 :marketDataTask 、fileTask 、forwardTask。 当 MetricName=M001时,该域填空串。
    if monitorType != "":
        mp.monitorType = monitorType
    # 发送监控名称 静态信息M001 行情任务运行指标M002 文件任务运行指标M003 转发任务运行指标M004 任务运行过程中输出信息M005
    mp.metricName = metricName
    # 发送周期 , 单位为秒。 取值为-1表示实时推送
    mp.period = period
    return mp


def pmlogin():
    pbm = pb2.PbMessageP()
    pbm.msgType = pb2.PbMessageP.LoginType

    pbm.login.identity = "GatewayMontior"
    pbm.login.targetIdentity = "MDGW"
    pbm.login.pingPongInt = 10
    pbm.login.prtclVersion = "0.1"

    # print(pbm)

    return pbm.SerializeToString()

def metricsSub():
    pbm = pb2.PbMessageP()

    pbm.msgType = pb2.PbMessageP.MetricsSubType
    pbm.metricsSub.software = "MDGW"
    pbm.metricsSub.metrics.MergeFrom([metsp("", "M001", 5)])

    # print(pbm)
    return pbm.SerializeToString()


if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(mdgw_url,
                              on_open=on_open,
                              on_message=on_message,
                              on_error=on_error,
                              on_close=on_close)

    ws.run_forever(ping_timeout=10)

 

ProtoBuf文件示例

Error.proto

syntax = "proto3";

option java_package = "com.sse.fea.monitor.protobuf";
option java_outer_classname ="Error";

message ErrorP {
    uint32 errorStatus = 1;
    string text = 2;
}

Login.proto

syntax = "proto3";

option java_package = "com.sse.fea.monitor.protobuf";
option java_outer_classname ="Login";

message LoginP {
    string identity = 1;
    string targetIdentity = 2;
    uint32 pingPongInt = 3;
    string prtclVersion = 4;
}

message MetricsSubP{
    string software = 1;
    repeated MetricP metrics = 2;
}

message MetricP {
    string monitorType = 1;
    string metricName = 2;
    int32 period = 3;
}

MdgwMonitorData.proto

syntax = "proto3";

option java_package = "com.sse.fea.monitor.protobuf";
option java_outer_classname = "MdgwMonitorData";
option java_multiple_files = true;

// 行情网关静态信息
message MdgwStaticInfoP {
    string metricName = 1;
    string mdgwVersion = 2;
    string upgradeMdgwVersion = 3;
    string lowestVssPrtclVersion = 4;
    string certIdentifier = 5;
    string certType = 6;
    string certAlgorithm = 7;
    string certExpiryDate = 8;
    uint32 certRemainingDay = 9;
    string transportType = 10;
}

// md任务运行指标
message MdgwMdTaskRunMetricP {
    string metricName = 1;
    string envNum = 2;
    string taskName = 3;
    string taskState = 4;
    uint32 queuePendingNum = 5;
    uint32 queueRejectNum = 6;
    uint64 receivePkgNum = 7;
    uint64 receivePkgTps = 8;
    uint64 lossPkgNum = 9;
    string lossPkgRate = 10; // 0.0%
    uint64 mdFileProcessTps = 11;
    uint64 processPkgTps = 12;
    repeated string securityType = 13;
    string serverAddress = 14;
    string vssId = 15;
    message VssInfoP {
      string compId = 1;
      string addr = 2;
      uint64 sendTps = 3;
    }
    repeated VssInfoP vssInfo = 16;
}

// file传输任务运行指标
message MdgwFileTaskRunMetricP {
    string metricName = 1;
    string envNum = 2;
    string taskName = 3;
    string taskState = 4;
    uint32 queuePendingNum = 5;
    uint32 queueRejectNum = 6;
    uint64 receivePkgNum = 7;
    uint64 receivePkgTps = 8;
    uint64 lossPkgNum = 9;
    string lossPkgRate = 10; // 0.0%
    uint64 processPkgTps = 11;
    repeated string fileType = 12;
    string serverAddress = 13;
}

// forward任务运行指标
message MdgwForwardTaskRunMetricP {
    string metricName = 1;
    string envNum = 2;
    string taskName = 3;
    string taskState = 4;
    uint32 queuePendingNum = 5;
    uint32 queueRejectNum = 6;
    uint64 receivePkgNum = 7;
    uint64 receivePkgTps = 8;
    uint64 lossPkgNum = 9;
    string lossPkgRate = 10; // 0.0%
    uint64 processPkgTps = 11;
    uint64 forwardPkgNum = 12;
    repeated string forwardType = 13;
    string serverAddress = 14;
    string vssId = 15;
}

// 任务运行输出信息
message MdgwTaskOutputP {
    string metricName = 1;
    string taskName = 2;
    string happenTime = 3;
    string level = 4;
    string info = 5;
}

PbMessage.proto

syntax = "proto3";

option java_package = "com.sse.fea.monitor.protobuf";
option java_outer_classname ="PbMessage";

import "Login.proto";
import "Error.proto";
import "MdgwMonitorData.proto";
import "TdgwMonitorData.proto";

message PbMessageP {
  enum MsgType {
      UnknownType = 0;
      LoginType = 1;
      ErrorType = 2;
      MetricsSubType = 3;
      MdgwStaticInfoType = 100;
      MdgwMdTaskRunMetricType = 101;
      MdgwFileTaskRunMetricType = 102;
      MdgwForwardTaskRunMetricType = 103;
      MdgwTaskOutputType = 104;
      TdgwStaticInfoType = 200;
      TdgwPbuRunMetricType = 201;
      TdgwOmsSessionMetricType = 202;
      TdgwPbuOutputType = 203;
  }

  MsgType msgType = 1;

  oneof dataBody {
      LoginP login = 2;
      ErrorP error = 3;
      MetricsSubP metricsSub = 4;
      MdgwFileTaskRunMetricP fileTaskRunMetric = 100;
      MdgwForwardTaskRunMetricP forwardTaskRunMetric = 101;
      MdgwMdTaskRunMetricP mdTaskRunMetric = 102;
      MdgwStaticInfoP mdgwStaticInfo = 103;
      MdgwTaskOutputP mdgwTaskOutput = 104;
      TdgwOmsSessionMetricP omsSessionMetric = 200;
      TdgwPbuRunMetricP pbuRunMetric = 201;
      TdgwStaticInfoP tdgwStaticInfo = 202;
      TdgwPbuOutputP tdgwPbuOutput = 203;
  }
}

TdgwMonitorData.proto

syntax = "proto3";

option java_package = "com.sse.fea.monitor.protobuf";
option java_outer_classname = "TdgwMonitorData";
option java_multiple_files = true;

// 交易网关静态信息
message TdgwStaticInfoP {
    string metricName = 1;
    string tdgwVersion = 2;
    string certType = 3;
    string certAlgorithm = 4;
    string certIdentifier = 5;
    string certExpiryDate = 6;
    uint32 certRemainingDay = 7;
}

// 交易网关PBU运行指标
message TdgwPbuRunMetricP {
    string metricName = 1;
    string platform = 2;
    string envNum = 3;
    string pbu = 4;
    string state = 5;
    string platformState = 6;
    string platformDateTime = 7;
    string tradeDay = 8;
    string upgradeTdgwVersion = 9;
    string lowestOmsPrtclVersion = 10;
    repeated string successSubPbu = 11;
    uint64 sendPkgCnt = 12;
    uint64 sendPkgTps = 13;
    uint64 receivePkgCnt = 14;
    uint64 receivePkgTps = 15;
    string serverAddress = 16;
    uint32 upQueuePendingNum = 17;
    uint32 downQueuePendingNum = 18;
    uint32 throttle = 19;
    uint32 reqSize = 20;
    uint32 maxReqSize = 21;
    uint32 suspendAcceptOrderCnt = 22;
    uint32 peakReqSize = 23;
    repeated ExecRptIndexP execRptIndex = 24;
    uint32 suspendAcceptExecCnt = 25;
}

message ExecRptIndexP {
    string pbu = 1;
    uint32 setId = 2;
    uint64 nextRptIndex = 3;
}

// 交易网关OMS会话指标
message TdgwOmsSessionMetricP {
    string metricName = 1;
    string platform = 2;
    string pbu = 3;
    string state = 4;
    string omsId = 5;
    uint64 sendPkgCnt = 6;
    uint64 sendPkgTps = 7;
    uint64 receivePkgCnt = 8;
    uint64 receivePkgTps = 9;
    uint64 orderCnt = 10;
    uint64 execRptCnt = 11;
    uint64 generalRespCnt = 12;
    uint64 orderRejectCnt = 13;
}

// 交易网关PBU运行输出信息
message TdgwPbuOutputP {
    string metricName = 1;
    string platform = 2;
    string pbu = 3;
    string happenTime = 4;
    string level = 5;
    string info = 6;
}

 

 


版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:http://www.seesapril.com/?p=290
喜欢 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址