Serial串口通信

通过虚拟串口工具,用PythonSerial模块进行连接,用来收发串口信息。

代码

# 先下载所需模块 serial
pip install -i https://pypi.doubanio.com/simple serial
#配置信息
#upper_com = com11
#upper_baudrate = 115200

class Communication():
    #初始化
    def __init__(self, com, bps, timeout):
        self.port = com
        self.bps = bps
        self.timeout =timeout

        try:
            # 打开串口,并得到串口对象
             self.main_engine = serial.Serial(self.port, self.bps, timeout=self.timeout)
            # 判断是否打开成功
             if (self.main_engine.is_open):
                print(f"[成功] 打开串口成功 > 串口号: {self.main_engine.name} 波特率: {self.main_engine.baudrate}")
        except Exception as e:
            print("[错误] 打开串口失败:", e)

    #关闭串口
    def Close_Engine(self):
        self.main_engine.close()
        print(self.main_engine.is_open)  # 检验串口是否打开

    # 打印可用串口列表
    @staticmethod
    def Print_Used_Com():
        port_list = list(serial.tools.list_ports.comports())
        print(port_list)

    #接收指定大小的数据
    #从串口读size个字节。如果指定超时,则可能在超时后返回较少的字节;如果没有指定超时,则会一直等到收完指定的字节数。
    def Read_Size(self,size):
        return self.main_engine.read(size=size)

    #接收一行数据
    # 使用readline()时应该注意:打开串口时应该指定超时,否则如果串口没有收到新行,则会一直等待。
    # 如果没有超时,readline会报异常。
    def Read_Line(self):
        return self.main_engine.readline()

    #发数据
    def Send_data(self,data):
        self.main_engine.write(data)


class work_in_thread(threading.Thread):
    def __init__(self, com):
        super(work_in_thread, self).__init__()
        self.com = com
        # self.run()

    def run(self):
        print("[通知] 开始监听串口数据...")
        while True:
            try:
                self.prase_data(self.com.Read_Line())
            except Exception as e:
                print("[错误] 串口数据读取异常:", e)

    # 解析串口数据
    def prase_data(self, data):
        try:
            if data != b'':
                strvalue = data.decode('utf-8')
                # 功能指令
                instruct = re.sub(u'([^\u0041-\u005a])', '', strvalue)
                # 数值
                value = re.sub(u"([^\u0030-\u0039])", "", strvalue)

            else:
                time.sleep(0.01)
                return
        except Exception as e:
            print("[错误] 数据解析异常:", e)
            return

if __name__ == '__main__':
    print("[通知] 获取基础配置...")
    com = Communication(cfg["upper_com"], cfg["upper_baudrate"], 0.01)
© 版权声明
THE END
喜欢就支持一下吧
点赞14 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容