python对Modbus从站数据进行读取和写入

Modbus分别有TCPUDPRTU,在工业现场一般都是采用Modbus RTU协议,一般大家说的基于串口通信的Modbus通信协议都是指Modbus RTU通信协议。与Modbus RTU协议相比较,Modbus TCP协议则是在RTU协议上加一个MBAP报文头,并且由于TCP是基于可靠连接的服务,RTU协议中的CRC校验码就不再需要,所以在Modbus TCP协议中是没有CRC校验码的,所以就常用一句比较通俗的话来说:Modbus TCP协议就是Modbus RTU协议在前面加上五个0以及一个6,然后去掉两个CRC校验码字节就OK。虽然这句话说得不是特别准确,但是也基本上把RTU与TCP之间的区别说得比较清楚了。

Modbus功能码

能码含义功能码含义
0x01读线圈0x04读输入寄存器
0x05写单个线圈0x03读保持寄存器
0x0F写多个线圈0x06写单个保持寄存器
0x02读离散量输入0x10写多个保持寄存器
对应函数方法
READ_COILSH01读线圈
READ_DISCRETE_INPUTSH02读离散输入
READ_HOLDING_REGISTERSH03读寄存器
READ_INPUT_REGISTERSH04读输入寄存器
WRITE_SINGLE_COILH05写单一线圈
WRITE_SINGLE_REGISTERH06写单一寄存器
WRITE_MULTIPLE_COILSH15写多个线圈
WRITE_MULTIPLE_REGISTERSH16写多寄存器

代码实现(基于RTU协议)

# 先下载所需模块 modbus-tk
pip install -i https://pypi.doubanio.com/simple modbus-tk
# 相关配置数据
# work_com = com2
# work_baudrate = 19200
# work_slave = 1
# work_bytesize = 8
# work_parity = N
# work_stopbits = 1

import modbus_tk.defines as cst
import modbus_tk.modbus_rtu as modbus_rtu

class InitModbus(threading.Thread):
    try:
        master = modbus_rtu.RtuMaster(serial.Serial(cfg["work_com"], baudrate=cfg["work_baudrate"], bytesize=int(cfg["work_bytesize"]), parity=cfg["work_parity"], stopbits=int(cfg["work_stopbits"])))
        master.set_timeout(1.0)
        master.set_verbose(True)
        print("——" * 99)
        print("[通知] ModbusMaster > 配置成功")
    except Exception as e:
        print("[错误] ModbusMaster > 配置失败:", e)

    @classmethod
    def write(cls, p, val):
        # 写入
        print(p)
        try:
            r = cls.master.execute(int(cfg["work_slave"]), cst.WRITE_SINGLE_REGISTER, p, output_value=val)
            return r
        except Exception as e:
            print("[错误] ModbusMaster > 从站数据写入异常:", e)


    @classmethod
    def read(cls):
        try:
            r = cls.master.execute(int(cfg["work_slave"]), cst.READ_HOLDING_REGISTERS, 1, 2)  # 这里可以修改需要读取的功能码
        except Exception as e:
            print("[错误] ModbusMaster > 从站数据读取异常:", e)

    def check(self):
        pass

另外两种基本相同,不同的是需要基于ip地址连接

'''
def __init__(self, host="127.0.0.1", port=502, timeout_in_sec=5.0):
    """Constructor. Set the communication settings"""
    super(TcpMaster, self).__init__(timeout_in_sec)
    self._host = host
    self._port = port
    self._sock = None
'''
mt.TcpMaster("127.0.0.1", 502)

附:操作浮点数

# 方案一:通过将你所得到的带小数的放大一定的倍数,都变成整数进行处理,例如:3.14  --放大100   变成 314 ,然后上位机知道我放大了100倍就好,除100就可以转成浮点数
# 方案二:直接读
import serial
import struct
import logging
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus_rtu as modbus_rtu
logger = modbus_tk.utils.create_logger("console")

if __name__ == '__main__':
    try:
        serial = serial.Serial(port="COM2",baudrate=9600,bytesize=8,parity='N',stopbits=1)
        master = modbus_rtu.RtuMaster(serial)
        master.set_timeout(5.0)
        master.set_verbose(True)
        logger.info("connected!")

        # 写单个寄存器
        logger.info(master.execute(1, cst.WRITE_SINGLE_REGISTER, 0, output_value=99))
        logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 1))

        # 写单个寄存器 负数
        logger.info(master.execute(1, cst.WRITE_SINGLE_REGISTER, 1, output_value=-123))
        logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 1, 1))

        # 写多个寄存器 起始地址为4的保持寄存器,操作寄存器个数为4 ,根据列表长度来判断写入个数
        logger.info(master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 4, output_value=[3,-13, 6,24]))
        # 读也读4个寄存器
        read_data = master.execute(1, cst.READ_HOLDING_REGISTERS, 4, 4)

        # 写寄存器 起始地址为8的保持寄存器,操作寄存器个数为 4 ,一个浮点数float 占两个寄存器;
        # 写浮点数时一定要加 data_format 参数,两个ff 表示要写入两个浮点数,以此类推
        # 我这里模拟的是小端模式,具体可参考 struct 用法。和数据源保持一致即可。
        logger.info(master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 8, output_value=[3.3,-6.4],data_format='<ff'))
        # 读对应的 4个寄存器,指定数据格式
        read_data = master.execute(1, cst.READ_HOLDING_REGISTERS, 8, 4,data_format='<ff')
        logger.info(read_data)

        # 写入 双精度 数据到寄存器
        logger.info(master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 16, output_value=[3.313, -6.414], data_format='<dd'))
        read_data = master.execute(1, cst.READ_HOLDING_REGISTERS, 16, 8, data_format='<dd')

        # 写单个线圈,写寄存器地址为0的线圈寄存器,写入内容为1  位操作)
        logger.info(master.execute(1, cst.WRITE_SINGLE_COIL, 0, output_value= 1))
        logger.info(master.execute(1,cst.READ_COILS,0,1))

        # 写多个线圈 写寄存器地址为1的线圈寄存器,写入内容为列表内容  位操作)
        logger.info(master.execute(1, cst.WRITE_MULTIPLE_COILS,1,output_value=[0,1,1,0]))
        logger.info(master.execute(1,cst.READ_COILS,1,4))

        # 读保持寄存器,从 8 开始读 8 个,元组形式返回
        data = master.execute(1, cst.READ_HOLDING_REGISTERS, 8, 8)
        logger.info(data)

        # 读输入寄存器,从 2 开始读 5 个,元组形式返回
        logger.info(master.execute(1, cst.READ_INPUT_REGISTERS, 2, 5))

        # 读线圈寄存器
        logger.info(master.execute(1, cst.READ_COILS, 8, 8))

        # 读离散输入寄存器
        logger.info(master.execute(1, cst.READ_DISCRETE_INPUTS, 8, 8))

    except modbus_tk.modbus.ModbusError as e:
        logger.error("%s-ErrCode=%d" % (e, e.get_exception_code()))
© 版权声明
THE END
喜欢就支持一下吧
点赞12 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容