通过虚拟串口工具,用Python的Serial模块进行连接,用来收发串口信息。
代码
# 先下载所需模块 serial
pip install -i https://pypi.doubanio.com/simple serial
#配置信息
#upper_com = com11
#upper_baudrate = 115200
class Communication():
#初始化
def __init__(self, com, bps, timeout):
self.port = com
self.bps = bps
self.timeout =timeout
try:
# 打开串口,并得到串口对象
self.main_engine = serial.Serial(self.port, self.bps, timeout=self.timeout)
# 判断是否打开成功
if (self.main_engine.is_open):
print(f"[成功] 打开串口成功 > 串口号: {self.main_engine.name} 波特率: {self.main_engine.baudrate}")
except Exception as e:
print("[错误] 打开串口失败:", e)
#关闭串口
def Close_Engine(self):
self.main_engine.close()
print(self.main_engine.is_open) # 检验串口是否打开
# 打印可用串口列表
@staticmethod
def Print_Used_Com():
port_list = list(serial.tools.list_ports.comports())
print(port_list)
#接收指定大小的数据
#从串口读size个字节。如果指定超时,则可能在超时后返回较少的字节;如果没有指定超时,则会一直等到收完指定的字节数。
def Read_Size(self,size):
return self.main_engine.read(size=size)
#接收一行数据
# 使用readline()时应该注意:打开串口时应该指定超时,否则如果串口没有收到新行,则会一直等待。
# 如果没有超时,readline会报异常。
def Read_Line(self):
return self.main_engine.readline()
#发数据
def Send_data(self,data):
self.main_engine.write(data)
class work_in_thread(threading.Thread):
def __init__(self, com):
super(work_in_thread, self).__init__()
self.com = com
# self.run()
def run(self):
print("[通知] 开始监听串口数据...")
while True:
try:
self.prase_data(self.com.Read_Line())
except Exception as e:
print("[错误] 串口数据读取异常:", e)
# 解析串口数据
def prase_data(self, data):
try:
if data != b'':
strvalue = data.decode('utf-8')
# 功能指令
instruct = re.sub(u'([^\u0041-\u005a])', '', strvalue)
# 数值
value = re.sub(u"([^\u0030-\u0039])", "", strvalue)
else:
time.sleep(0.01)
return
except Exception as e:
print("[错误] 数据解析异常:", e)
return
if __name__ == '__main__':
print("[通知] 获取基础配置...")
com = Communication(cfg["upper_com"], cfg["upper_baudrate"], 0.01)
© 版权声明
THE END
暂无评论内容