Extra functions
Pymodbus: Modbus Protocol Implementation.
Released under the BSD license
- class pymodbus.ExceptionResponse(function_code: int, exception_code: int = 0, slave: int = 1, transaction: int = 0)
Bases:
ModbusPDUBase class for a modbus exception PDU.
- ACKNOWLEDGE = 5
- GATEWAY_NO_RESPONSE = 11
- GATEWAY_PATH_UNAVIABLE = 10
- ILLEGAL_ADDRESS = 2
- ILLEGAL_FUNCTION = 1
- ILLEGAL_VALUE = 3
- MEMORY_PARITY_ERROR = 8
- NEGATIVE_ACKNOWLEDGE = 7
- SLAVE_BUSY = 6
- SLAVE_FAILURE = 4
- decode(data: bytes) None
Decode a modbus exception response.
- encode() bytes
Encode a modbus exception response.
- rtu_frame_size: int = 5
- class pymodbus.FramerType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)
Bases:
str,EnumType of Modbus frame.
- ASCII = 'ascii'
- RTU = 'rtu'
- SOCKET = 'socket'
- TLS = 'tls'
- exception pymodbus.ModbusException(string)
Bases:
ExceptionBase modbus exception.
- isError()
Error
- pymodbus.pymodbus_apply_logging_config(level: str | int = 10, log_file_name: str | None = None)
Apply basic logging configuration used by default by Pymodbus maintainers.
- Parameters:
level – (optional) set log level, if not set it is inherited.
log_file_name – (optional) log additional to file
Please call this function to format logging appropriately when opening issues.
Modbus Device Controller.
These are the device management handlers. They should be maintained in the server context and the various methods should be inserted in the correct locations.
- class pymodbus.device.DeviceInformationFactory
Bases:
objectThis is a helper.
That really just hides some of the complexity of processing the device information requests (function code 0x2b 0x0e).
- classmethod get(control, read_code=DeviceInformation.BASIC, object_id=0)
Get the requested device data from the system.
- Parameters:
control – The control block to pull data from
read_code – The read code to process
object_id – The specific object_id to read
- Returns:
The requested data (id, length, value)
- class pymodbus.device.ModbusDeviceIdentification(info=None, info_name=None)
Bases:
objectThis is used to supply the device identification.
For the readDeviceIdentification function
For more information read section 6.21 of the modbus application protocol.
- property MajorMinorRevision
- property ModelName
- property ProductCode
- property ProductName
- property UserApplicationName
- property VendorName
- property VendorUrl
- summary()
Return a summary of the main items.
- Returns:
An dictionary of the main items
- update(value)
Update the values of this identity.
using another identify as the value
- Parameters:
value – The value to copy values from
- class pymodbus.device.ModbusPlusStatistics
Bases:
objectThis is used to maintain the current modbus plus statistics count.
As of right now this is simply a stub to complete the modbus implementation. For more information, see the modbus implementation guide page 87.
- encode()
Return a summary of the modbus plus statistics.
- Returns:
54 16-bit words representing the status
- reset()
Clear all of the modbus plus statistics.
- summary()
Return a summary of the modbus plus statistics.
- Returns:
54 16-bit words representing the status
Modbus Remote Events.
An event byte returned by the Get Communications Event Log function can be any one of four types. The type is defined by bit 7 (the high-order bit) in each byte. It may be further defined by bit 6.
- class pymodbus.events.CommunicationRestartEvent
Bases:
ModbusEventRestart remote device Initiated Communication.
The remote device stores this type of event byte when its communications port is restarted. The remote device can be restarted by the Diagnostics function (code 08), with sub-function Restart Communications Option (code 00 01).
That function also places the remote device into a “Continue on Error” or “Stop on Error” mode. If the remote device is placed into “Continue on Error” mode, the event byte is added to the existing event log. If the remote device is placed into “Stop on Error” mode, the byte is added to the log and the rest of the log is cleared to zeros.
The event is defined by a content of zero.
- decode(event)
Decode the event message to its status bits.
- Parameters:
event – The event to decode
- Raises:
- encode()
Encode the status bits to an event message.
- Returns:
The encoded event message
- value = 0
- class pymodbus.events.EnteredListenModeEvent
Bases:
ModbusEventEnter Remote device Listen Only Mode.
The remote device stores this type of event byte when it enters the Listen Only Mode. The event is defined by a content of 04 hex.
- decode(event)
Decode the event message to its status bits.
- Parameters:
event – The event to decode
- Raises:
- encode()
Encode the status bits to an event message.
- Returns:
The encoded event message
- value = 4
- class pymodbus.events.ModbusEvent
Bases:
ABCDefine modbus events.
- abstract decode(event)
Decode the event message to its status bits.
- Parameters:
event – The event to decode
- abstract encode() bytes
Encode the status bits to an event message.
- class pymodbus.events.RemoteReceiveEvent(overrun=False, listen=False, broadcast=False)
Bases:
ModbusEventRemote device MODBUS Receive Event.
The remote device stores this type of event byte when a query message is received. It is stored before the remote device processes the message. This event is defined by bit 7 set to logic “1”. The other bits will be set to a logic “1” if the corresponding condition is TRUE. The bit layout is:
Bit Contents ---------------------------------- 0 Not Used 2 Not Used 3 Not Used 4 Character Overrun 5 Currently in Listen Only Mode 6 Broadcast Receive 7 1
- decode(event: bytes) None
Decode the event message to its status bits.
- Parameters:
event – The event to decode
- encode() bytes
Encode the status bits to an event message.
- Returns:
The encoded event message
- class pymodbus.events.RemoteSendEvent(read=False, slave_abort=False, slave_busy=False, slave_nak=False, write_timeout=False, listen=False)
Bases:
ModbusEventRemote device MODBUS Send Event.
The remote device stores this type of event byte when it finishes processing a request message. It is stored if the remote device returned a normal or exception response, or no response.
This event is defined by bit 7 set to a logic “0”, with bit 6 set to a “1”. The other bits will be set to a logic “1” if the corresponding condition is TRUE. The bit layout is:
Bit Contents ----------------------------------------------------------- 0 Read Exception Sent (Exception Codes 1-3) 1 Slave Abort Exception Sent (Exception Code 4) 2 Slave Busy Exception Sent (Exception Codes 5-6) 3 Slave Program NAK Exception Sent (Exception Code 7) 4 Write Timeout Error Occurred 5 Currently in Listen Only Mode 6 1 7 0
- decode(event)
Decode the event message to its status bits.
- Parameters:
event – The event to decode
- encode()
Encode the status bits to an event message.
- Returns:
The encoded event message
Pymodbus Exceptions.
Custom exceptions to be used in the Modbus code.
- exception pymodbus.exceptions.ConnectionException(string='')
Bases:
ModbusExceptionError resulting from a bad connection.
- exception pymodbus.exceptions.InvalidMessageReceivedException(string='')
Bases:
ModbusExceptionError resulting from invalid response received or decoded.
- exception pymodbus.exceptions.MessageRegisterException(string='')
Bases:
ModbusExceptionError resulting from failing to register a custom message request/response.
- exception pymodbus.exceptions.ModbusIOException(string='', function_code=None)
Bases:
ModbusExceptionError resulting from data i/o.
- exception pymodbus.exceptions.NoSuchSlaveException(string='')
Bases:
ModbusExceptionError resulting from making a request to a slave that does not exist.
- exception pymodbus.exceptions.NotImplementedException(string='')
Bases:
ModbusExceptionError resulting from not implemented function.
- exception pymodbus.exceptions.ParameterException(string='')
Bases:
ModbusExceptionError resulting from invalid parameter.
Modbus Payload Builders.
A collection of utilities for building and decoding modbus messages payloads.
- class pymodbus.payload.BinaryPayloadBuilder(payload=None, byteorder=Endian.LITTLE, wordorder=Endian.BIG, repack=False)
Bases:
objectA utility that helps build payload messages to be written with the various modbus messages.
It really is just a simple wrapper around the struct module, however it saves time looking up the format strings. What follows is a simple example:
builder = BinaryPayloadBuilder(byteorder=Endian.Little) builder.add_8bit_uint(1) builder.add_16bit_uint(2) payload = builder.build()
- add_16bit_float(value: float) None
Add a 16 bit float to the buffer.
- Parameters:
value – The value to add to the buffer
- add_16bit_int(value: int) None
Add a 16 bit signed int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_16bit_uint(value: int) None
Add a 16 bit unsigned int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_32bit_float(value: float) None
Add a 32 bit float to the buffer.
- Parameters:
value – The value to add to the buffer
- add_32bit_int(value: int) None
Add a 32 bit signed int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_32bit_uint(value: int) None
Add a 32 bit unsigned int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_64bit_float(value: float) None
Add a 64 bit float(double) to the buffer.
- Parameters:
value – The value to add to the buffer
- add_64bit_int(value: int) None
Add a 64 bit signed int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_64bit_uint(value: int) None
Add a 64 bit unsigned int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_8bit_int(value: int) None
Add a 8 bit signed int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_8bit_uint(value: int) None
Add a 8 bit unsigned int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_bits(values: list[bool]) None
Add a collection of bits to be encoded.
If these are less than a multiple of eight, they will be left padded with 0 bits to make it so.
- Parameters:
values – The value to add to the buffer
- add_string(value: str) None
Add a string to the buffer.
- Parameters:
value – The value to add to the buffer
- build() list[bytes]
Return the payload buffer as a list.
This list is two bytes per element and can thus be treated as a list of registers.
- Returns:
The payload buffer as a list
- classmethod deprecate()
Log warning.
- encode() bytes
Get the payload buffer encoded in bytes.
- reset() None
Reset the payload buffer.
- to_coils() list[bool]
Convert the payload buffer into a coil layout that can be used as a context block.
- Returns:
The coil layout to use as a block
- to_registers()
Convert the payload buffer to register layout that can be used as a context block.
- Returns:
The register layout to use as a block
- class pymodbus.payload.BinaryPayloadDecoder(payload, byteorder=Endian.LITTLE, wordorder=Endian.BIG)
Bases:
objectA utility that helps decode payload messages from a modbus response message.
It really is just a simple wrapper around the struct module, however it saves time looking up the format strings. What follows is a simple example:
decoder = BinaryPayloadDecoder(payload) first = decoder.decode_8bit_uint() second = decoder.decode_16bit_uint()
- classmethod bit_chunks(coils, size=8)
Return bit chunks.
- decode_16bit_float()
Decode a 16 bit float from the buffer.
- decode_16bit_int()
Decode a 16 bit signed int from the buffer.
- decode_16bit_uint()
Decode a 16 bit unsigned int from the buffer.
- decode_32bit_float()
Decode a 32 bit float from the buffer.
- decode_32bit_int()
Decode a 32 bit signed int from the buffer.
- decode_32bit_uint()
Decode a 32 bit unsigned int from the buffer.
- decode_64bit_float()
Decode a 64 bit float(double) from the buffer.
- decode_64bit_int()
Decode a 64 bit signed int from the buffer.
- decode_64bit_uint()
Decode a 64 bit unsigned int from the buffer.
- decode_8bit_int()
Decode a 8 bit signed int from the buffer.
- decode_8bit_uint()
Decode a 8 bit unsigned int from the buffer.
- decode_bits(package_len=1)
Decode a byte worth of bits from the buffer.
- decode_string(size=1)
Decode a string from the buffer.
- Parameters:
size – The size of the string to decode
- classmethod deprecate()
Log warning.
- classmethod fromCoils(coils, byteorder=Endian.LITTLE, _wordorder=Endian.BIG)
Initialize a payload decoder with the result of reading of coils.
- classmethod fromRegisters(registers, byteorder=Endian.LITTLE, wordorder=Endian.BIG)
Initialize a payload decoder.
With the result of reading a collection of registers from a modbus device.
The registers are treated as a list of 2 byte values. We have to do this because of how the data has already been decoded by the rest of the library.
- Parameters:
registers – The register results to initialize with
byteorder – The Byte order of each word
wordorder – The endianness of the word (when wordcount is >= 2)
- Returns:
An initialized PayloadDecoder
- Raises:
- reset()
Reset the decoder pointer back to the start.
- skip_bytes(nbytes)
Skip n bytes in the buffer.
- Parameters:
nbytes – The number of bytes to skip
Transaction.
- class pymodbus.transaction.TransactionManager(params: CommParams, framer: FramerBase, retries: int, is_server: bool, trace_packet: Callable[[bool, bytes], bytes] | None, trace_pdu: Callable[[bool, ModbusPDU], ModbusPDU] | None, trace_connect: Callable[[bool], None] | None, sync_client=None)
Bases:
ModbusProtocolTransaction manager.
This is the central class of the library, providing a separation between API and communication: - clients/servers calls the manager to execute requests/responses - transport/framer/pdu is by the manager to communicate with the devices
Transaction manager handles: - Execution of requests (client), with retries and locking - Sending of responses (server), with retries - Connection management (on top of what transport offers) - No response (temporarily) from a device
Transaction manager offers: - a simple execute interface for requests (client) - a simple send interface for responses (server) - external trace methods tracing outgoing/incoming packets/PDUs (byte stream)
- callback_connected() None
Call when connection is succcesfull.
- callback_data(data: bytes, addr: tuple | None = None) int
Handle received data.
- callback_disconnected(exc: Exception | None) None
Call when connection is lost.
- callback_new_connection()
Call when listener receive new connection request.
- dummy_trace_connect(connect: bool) None
Do dummy trace.
- dummy_trace_packet(sending: bool, data: bytes) bytes
Do dummy trace.
- dummy_trace_pdu(sending: bool, pdu: ModbusPDU) ModbusPDU
Do dummy trace.
- async execute(no_response_expected: bool, request: ModbusPDU) ModbusPDU
Execute requests asynchronously.
- REMARK: this method is identical to sync_execute, apart from the lock and try/except.
any changes in either method MUST be mirrored !!!
- getNextTID() int
Retrieve the next transaction identifier.
- pdu_send(pdu: ModbusPDU, addr: tuple | None = None) None
Build byte stream and send.
- sync_execute(no_response_expected: bool, request: ModbusPDU) ModbusPDU
Execute requests asynchronously.
- REMARK: this method is identical to execute, apart from the lock and sync_receive.
any changes in either method MUST be mirrored !!!
- sync_get_response(dev_id) ModbusPDU
Receive until PDU is correct or timeout.
Modbus Utilities.
A collection of utilities for packing data, unpacking data computing checksums, and decode checksums.
- pymodbus.utilities.dict_property(store, index)
Create class properties from a dictionary.
Basically this allows you to remove a lot of possible boilerplate code.
- Parameters:
store – The store store to pull from
index – The index into the store to close over
- Returns:
An initialized property set
- pymodbus.utilities.hexlify_packets(packet)
Return hex representation of bytestring received.
- Parameters:
packet
- Returns:
- pymodbus.utilities.pack_bitstring(bits: list[bool]) bytes
Create a bytestring out of a list of bits.
- Parameters:
bits – A list of bits
example:
bits = [False, True, False, True] result = pack_bitstring(bits)
- pymodbus.utilities.unpack_bitstring(data: bytes) list[bool]
Create bit list out of a bytestring.
- Parameters:
data – The modbus data packet to decode
example:
bytes = "bytes to decode" result = unpack_bitstring(bytes)
PDU classes
Modbus Request/Response Decoders.
- class pymodbus.pdu.decoders.DecodePDU(is_server: bool)
Bases:
objectDecode pdu requests/responses (server/client).
- decode(frame: bytes) ModbusPDU | None
Decode a frame.
- lookupPduClass(data: bytes) type[ModbusPDU] | None
Use function_code to determine the class of the PDU.
- register(custom_class: type[ModbusPDU]) None
Register a function and sub function class with the decoder.
Bit Reading Request/Response messages.
- class pymodbus.pdu.bit_message.ReadCoilsRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUReadCoilsRequest.
- decode(data: bytes) None
Decode a request pdu.
- encode() bytes
Encode a request pdu.
- function_code: int = 1
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Byte Count(1 byte) + Quantity of Coils (n Bytes)/8, if the remainder is different of 0 then N = N+1
- rtu_frame_size: int = 8
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run request against a datastore.
- class pymodbus.pdu.bit_message.ReadCoilsResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUReadCoilsResponse.
- decode(data: bytes) None
Decode response pdu.
- encode() bytes
Encode response pdu.
- function_code: int = 1
- rtu_byte_count_pos: int = 2
- class pymodbus.pdu.bit_message.ReadDiscreteInputsRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ReadCoilsRequestReadDiscreteInputsRequest.
- function_code: int = 2
- class pymodbus.pdu.bit_message.ReadDiscreteInputsResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ReadCoilsResponseReadDiscreteInputsResponse.
- function_code: int = 2
- class pymodbus.pdu.bit_message.WriteMultipleCoilsRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUWriteMultipleCoilsRequest.
- decode(data: bytes) None
Decode a write coils request.
- encode() bytes
Encode write coils request.
- function_code: int = 15
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Output Address (2 byte) + Quantity of Outputs (2 Bytes) :return:
- rtu_byte_count_pos: int = 6
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a request against a datastore.
- class pymodbus.pdu.bit_message.WriteMultipleCoilsResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUWriteMultipleCoilsResponse.
- decode(data: bytes) None
Decode a write coils response.
- encode() bytes
Encode write coils response.
- function_code: int = 15
- rtu_frame_size: int = 8
- class pymodbus.pdu.bit_message.WriteSingleCoilRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
WriteSingleCoilResponseWriteSingleCoilRequest.
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Output Address (2 byte) + Output Value (2 Bytes)
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a request against a datastore.
- class pymodbus.pdu.bit_message.WriteSingleCoilResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUWriteSingleCoilResponse.
- decode(data: bytes) None
Decode a write coil request.
- encode() bytes
Encode write coil request.
- function_code: int = 5
- rtu_frame_size: int = 8
Diagnostic Record Read/Write.
- class pymodbus.pdu.diag_message.ChangeAsciiInputDelimiterRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseChangeAsciiInputDelimiterRequest.
- sub_function_code: int = 3
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ChangeAsciiInputDelimiterResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseChangeAsciiInputDelimiterResponse.
- sub_function_code: int = 3
- class pymodbus.pdu.diag_message.ClearCountersRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseClearCountersRequest.
- sub_function_code: int = 10
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ClearCountersResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseClearCountersResponse.
- sub_function_code: int = 10
- class pymodbus.pdu.diag_message.ClearOverrunCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseClearOverrunCountRequest.
- sub_function_code: int = 20
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ClearOverrunCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseClearOverrunCountResponse.
- sub_function_code: int = 20
- class pymodbus.pdu.diag_message.DiagnosticBase(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDUDiagnosticBase.
- decode(data: bytes) None
Decode a diagnostic request.
- encode() bytes
Encode a diagnostic response.
- function_code: int = 8
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Sub function code (2 byte) + Data (2 * N bytes)
- rtu_frame_size: int = 8
- sub_function_code: int = 9999
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Implement dummy.
- class pymodbus.pdu.diag_message.ForceListenOnlyModeRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseForceListenOnlyModeRequest.
- sub_function_code: int = 4
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ForceListenOnlyModeResponse(dev_id=1, transaction_id=0)
Bases:
DiagnosticBaseForceListenOnlyModeResponse.
This does not send a response
- sub_function_code: int = 4
- class pymodbus.pdu.diag_message.GetClearModbusPlusRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseGetClearModbusPlusRequest.
- encode()
Encode a diagnostic response.
- get_response_pdu_size()
Return size of the respaonse.
Func_code (1 byte) + Sub function code (2 byte) + Operation (2 byte) + Data (108 bytes)
- sub_function_code: int = 21
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.GetClearModbusPlusResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseGetClearModbusPlusResponse.
- sub_function_code: int = 21
- class pymodbus.pdu.diag_message.RestartCommunicationsOptionRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseRestartCommunicationsOptionRequest.
- sub_function_code: int = 1
- class pymodbus.pdu.diag_message.RestartCommunicationsOptionResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseRestartCommunicationsOptionResponse.
- sub_function_code: int = 1
- class pymodbus.pdu.diag_message.ReturnBusCommunicationErrorCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnBusCommunicationErrorCountRequest.
- sub_function_code: int = 12
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnBusCommunicationErrorCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnBusCommunicationErrorCountResponse.
- sub_function_code: int = 12
- class pymodbus.pdu.diag_message.ReturnBusExceptionErrorCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnBusExceptionErrorCountRequest.
- sub_function_code: int = 13
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnBusExceptionErrorCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnBusExceptionErrorCountResponse.
- sub_function_code: int = 13
- class pymodbus.pdu.diag_message.ReturnBusMessageCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnBusMessageCountRequest.
- sub_function_code: int = 11
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnBusMessageCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnBusMessageCountResponse.
- sub_function_code: int = 11
- class pymodbus.pdu.diag_message.ReturnDiagnosticRegisterRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnDiagnosticRegisterRequest.
- sub_function_code: int = 2
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnDiagnosticRegisterResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnDiagnosticRegisterResponse.
- sub_function_code: int = 2
- class pymodbus.pdu.diag_message.ReturnIopOverrunCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnIopOverrunCountRequest.
- sub_function_code: int = 19
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnIopOverrunCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnIopOverrunCountResponse.
- sub_function_code: int = 19
- class pymodbus.pdu.diag_message.ReturnQueryDataRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnQueryDataRequest.
- sub_function_code: int = 0
- class pymodbus.pdu.diag_message.ReturnQueryDataResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnQueryDataResponse.
- sub_function_code: int = 0
- class pymodbus.pdu.diag_message.ReturnSlaveBusCharacterOverrunCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnSlaveBusCharacterOverrunCountRequest.
- sub_function_code: int = 18
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnSlaveBusCharacterOverrunCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnSlaveBusCharacterOverrunCountResponse.
- sub_function_code: int = 18
- class pymodbus.pdu.diag_message.ReturnSlaveBusyCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnSlaveBusyCountRequest.
- sub_function_code: int = 17
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnSlaveBusyCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnSlaveBusyCountResponse.
- sub_function_code: int = 17
- class pymodbus.pdu.diag_message.ReturnSlaveMessageCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnSlaveMessageCountRequest.
- sub_function_code: int = 14
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnSlaveMessageCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnSlaveMessageCountResponse.
- sub_function_code: int = 14
- class pymodbus.pdu.diag_message.ReturnSlaveNAKCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnSlaveNAKCountRequest.
- sub_function_code: int = 16
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnSlaveNAKCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnSlaveNAKCountResponse.
- sub_function_code: int = 16
- class pymodbus.pdu.diag_message.ReturnSlaveNoResponseCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnSlaveNoResponseCountRequest.
- sub_function_code: int = 15
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnSlaveNoResponseCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBaseReturnSlaveNoResponseCountResponse.
- sub_function_code: int = 15
File Record Read/Write Messages.
- class pymodbus.pdu.file_message.FileRecord(file_number: int = 0, record_number: int = 0, record_data: bytes = b'', record_length: int = 0)
Bases:
objectRepresents a file record and its relevant data.
- file_number: int = 0
- record_data: bytes = b''
- record_length: int = 0
- record_number: int = 0
- class pymodbus.pdu.file_message.ReadFifoQueueRequest(address: int = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDUReadFifoQueueRequest.
- decode(data: bytes) None
Decode the incoming request.
- encode() bytes
Encode the request packet.
- function_code: int = 24
- rtu_frame_size: int = 6
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run a read exception status request against the store.
- class pymodbus.pdu.file_message.ReadFifoQueueResponse(values: list[int] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDUReadFifoQueueResponse.
- classmethod calculateRtuFrameSize(buffer: bytes) int
Calculate the size of the message.
- decode(data: bytes) None
Decode a the response.
- encode() bytes
Encode the response.
- function_code: int = 24
- class pymodbus.pdu.file_message.ReadFileRecordRequest(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDUReadFileRecordRequest.
- decode(data: bytes) None
Decode the incoming request.
- encode() bytes
Encode the request packet.
- function_code: int = 20
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Quantity of record (each 7 bytes),
- rtu_byte_count_pos: int = 2
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run a read exception status request against the store.
- class pymodbus.pdu.file_message.ReadFileRecordResponse(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDUReadFileRecordResponse.
- decode(data: bytes) None
Decode the response.
- encode() bytes
Encode the response.
- function_code: int = 20
- rtu_byte_count_pos: int = 2
- class pymodbus.pdu.file_message.WriteFileRecordRequest(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDUWriteFileRecordRequest.
- decode(data: bytes) None
Decode the incoming request.
- encode() bytes
Encode the request packet.
- function_code: int = 21
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Quantity of record (each 7 bytes),
- rtu_byte_count_pos: int = 2
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run the write file record request against the context.
- class pymodbus.pdu.file_message.WriteFileRecordResponse(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDUThe normal response is an echo of the request.
- decode(data: bytes) None
Decode the incoming request.
- encode() bytes
Encode the response.
- function_code: int = 21
- rtu_byte_count_pos: int = 2
Encapsulated Interface (MEI) Transport Messages.
- class pymodbus.pdu.mei_message.ReadDeviceInformationRequest(read_code: int | None = None, object_id: int = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDUReadDeviceInformationRequest.
- decode(data: bytes) None
Decode data part of the message.
- encode() bytes
Encode the request packet.
- function_code: int = 43
- rtu_frame_size: int = 7
- sub_function_code: int = 14
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run a read exception status request against the store.
- class pymodbus.pdu.mei_message.ReadDeviceInformationResponse(read_code: int | None = None, information: dict | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDUReadDeviceInformationResponse.
- classmethod calculateRtuFrameSize(buffer: bytes) int
Calculate the size of the message.
- decode(data: bytes) None
Decode a the response.
- encode() bytes
Encode the response.
- function_code: int = 43
- sub_function_code: int = 14
Diagnostic record read/write.
- class pymodbus.pdu.other_message.GetCommEventCounterRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUGetCommEventCounterRequest.
- decode(_data: bytes) None
Decode data part of the message.
- encode() bytes
Encode the message.
- function_code: int = 11
- rtu_frame_size: int = 4
- async update_datastore(_context) ModbusPDU
Run a read exception status request against the store.
- class pymodbus.pdu.other_message.GetCommEventCounterResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUGetCommEventCounterRequest.
- decode(data: bytes) None
Decode a the response.
- encode() bytes
Encode the response.
- function_code: int = 11
- rtu_frame_size: int = 8
- class pymodbus.pdu.other_message.GetCommEventLogRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUGetCommEventLogRequest.
- decode(_data: bytes) None
Decode data part of the message.
- encode() bytes
Encode the message.
- function_code: int = 12
- rtu_frame_size: int = 4
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run a read exception status request against the store.
- class pymodbus.pdu.other_message.GetCommEventLogResponse(status: bool = True, message_count: int = 0, event_count: int = 0, events: list[int] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDUGetCommEventLogRequest.
- decode(data: bytes) None
Decode a the response.
- encode() bytes
Encode the response.
- function_code: int = 12
- rtu_byte_count_pos: int = 2
- class pymodbus.pdu.other_message.ReadExceptionStatusRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUReadExceptionStatusRequest.
- decode(_data: bytes) None
Decode data part of the message.
- encode() bytes
Encode the message.
- function_code: int = 7
- rtu_frame_size: int = 4
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run a read exception status request against the store.
- class pymodbus.pdu.other_message.ReadExceptionStatusResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUReadExceptionStatusResponse.
- decode(data: bytes) None
Decode a the response.
- encode() bytes
Encode the response.
- function_code: int = 7
- rtu_frame_size: int = 5
- class pymodbus.pdu.other_message.ReportSlaveIdRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUReportSlaveIdRequest.
- decode(_data: bytes) None
Decode data part of the message.
- encode() bytes
Encode the message.
- function_code: int = 17
- rtu_frame_size: int = 4
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run a report slave id request against the store.
- class pymodbus.pdu.other_message.ReportSlaveIdResponse(identifier: bytes = b'\x00', status: bool = True, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDUReportSlaveIdRequeste.
- decode(data: bytes) None
Decode a the response.
Since the identifier is device dependent, we just return the raw value that a user can decode to whatever it should be.
- encode() bytes
Encode the response.
- function_code: int = 17
- rtu_byte_count_pos: int = 2
Contains base classes for modbus request/response/error packets.
- class pymodbus.pdu.pdu.ExceptionResponse(function_code: int, exception_code: int = 0, slave: int = 1, transaction: int = 0)
Bases:
ModbusPDUBase class for a modbus exception PDU.
- ACKNOWLEDGE = 5
- GATEWAY_NO_RESPONSE = 11
- GATEWAY_PATH_UNAVIABLE = 10
- ILLEGAL_ADDRESS = 2
- ILLEGAL_FUNCTION = 1
- ILLEGAL_VALUE = 3
- MEMORY_PARITY_ERROR = 8
- NEGATIVE_ACKNOWLEDGE = 7
- SLAVE_BUSY = 6
- SLAVE_FAILURE = 4
- address: int
- bits: list[bool]
- count: int
- decode(data: bytes) None
Decode a modbus exception response.
- dev_id: int
- encode() bytes
Encode a modbus exception response.
- fut: Future
- registers: list[int]
- rtu_frame_size: int = 5
- status: int
- transaction_id: int
- class pymodbus.pdu.pdu.ModbusPDU(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
objectBase class for all Modbus messages.
- classmethod calculateRtuFrameSize(data: bytes) int
Calculate the size of a PDU.
- abstract decode(data: bytes) None
Decode data part of the message.
- abstract encode() bytes
Encode the message.
- function_code: int = 0
- get_response_pdu_size() int
Calculate response pdu size.
- isError() bool
Check if the error is a success or failure.
- rtu_byte_count_pos: int = 0
- rtu_frame_size: int = 0
- sub_function_code: int = -1
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run request against a datastore.
- validateAddress(address: int = -1) None
Validate API supplied address.
- validateCount(max_count: int, count: int = -1) None
Validate API supplied count.
Register Reading Request/Response.
- class pymodbus.pdu.register_message.MaskWriteRegisterRequest(address=0, and_mask=65535, or_mask=0, dev_id=1, transaction_id=0)
Bases:
ModbusPDUMaskWriteRegisterRequest.
- decode(data: bytes) None
Decode the incoming request.
- encode() bytes
Encode the request packet.
- function_code: int = 22
- rtu_frame_size: int = 10
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a mask write register request against the store.
- class pymodbus.pdu.register_message.MaskWriteRegisterResponse(address=0, and_mask=65535, or_mask=0, dev_id=1, transaction_id=0)
Bases:
ModbusPDUMaskWriteRegisterResponse.
- decode(data: bytes) None
Decode a the response.
- encode() bytes
Encode the response.
- function_code: int = 22
- rtu_frame_size: int = 10
- class pymodbus.pdu.register_message.ReadHoldingRegistersRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUReadHoldingRegistersRequest.
- decode(data: bytes) None
Decode a register request packet.
- encode() bytes
Encode the request packet.
- function_code: int = 3
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Byte Count(1 byte) + 2 * Quantity of registers (== byte count).
- rtu_frame_size: int = 8
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a read holding request against a datastore.
- class pymodbus.pdu.register_message.ReadHoldingRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUReadHoldingRegistersResponse.
- decode(data: bytes) None
Decode a register response packet.
- encode() bytes
Encode the response packet.
- function_code: int = 3
- rtu_byte_count_pos: int = 2
- class pymodbus.pdu.register_message.ReadInputRegistersRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ReadHoldingRegistersRequestReadInputRegistersRequest.
- function_code: int = 4
- class pymodbus.pdu.register_message.ReadInputRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ReadHoldingRegistersResponseReadInputRegistersResponse.
- function_code: int = 4
- class pymodbus.pdu.register_message.ReadWriteMultipleRegistersRequest(read_address: int = 0, read_count: int = 0, write_address: int = 0, write_registers: list[int] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDUReadWriteMultipleRegistersRequest.
- decode(data: bytes) None
Decode the register request packet.
- encode() bytes
Encode the request packet.
- function_code: int = 23
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Byte Count(1 byte) + 2 * Quantity of Coils (n Bytes)
- rtu_byte_count_pos: int = 10
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a write single register request against a datastore.
- class pymodbus.pdu.register_message.ReadWriteMultipleRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ReadHoldingRegistersResponseReadWriteMultipleRegistersResponse.
- function_code: int = 23
- class pymodbus.pdu.register_message.WriteMultipleRegistersRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUWriteMultipleRegistersRequest.
- decode(data: bytes) None
Decode a write single register packet packet request.
- encode() bytes
Encode a write single register packet packet request.
- function_code: int = 16
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Starting Address (2 byte) + Quantity of Registers (2 Bytes)
- rtu_byte_count_pos: int = 6
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a write single register request against a datastore.
- class pymodbus.pdu.register_message.WriteMultipleRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUWriteMultipleRegistersResponse.
- decode(data: bytes) None
Decode a write single register packet packet request.
- encode() bytes
Encode a write single register packet packet request.
- function_code: int = 16
- rtu_frame_size: int = 8
- class pymodbus.pdu.register_message.WriteSingleRegisterRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
WriteSingleRegisterResponseWriteSingleRegisterRequest.
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Register Address(2 byte) + Register Value (2 bytes)
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a write single register request against a datastore.
- class pymodbus.pdu.register_message.WriteSingleRegisterResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDUWriteSingleRegisterResponse.
- decode(data: bytes) None
Decode a write single register packet packet request.
- encode() bytes
Encode a write single register packet packet request.
- function_code: int = 6
- rtu_frame_size: int = 8