This is an automated email from the ASF dual-hosted git repository. hutcheb pushed a commit to branch feat/plc4py/modbus_write in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/feat/plc4py/modbus_write by this push: new 257a60dfee feat(plc4py): Add some manual modbus tests 257a60dfee is described below commit 257a60dfee48c00a1e4d9e28fbe205eb8b05be8e Author: hutcheb <ben.hut...@gmail.com> AuthorDate: Sat Aug 31 12:02:01 2024 +0800 feat(plc4py): Add some manual modbus tests --- plc4py/plc4py/drivers/modbus/ModbusDevice.py | 20 ++- plc4py/plc4py/drivers/modbus/ModbusTag.py | 2 +- .../plc4py/protocols/modbus/readwrite/DataItem.py | 2 +- .../drivers/modbus/test_modbus_connection.py | 179 ++++++++++++++++++--- 4 files changed, 178 insertions(+), 25 deletions(-) diff --git a/plc4py/plc4py/drivers/modbus/ModbusDevice.py b/plc4py/plc4py/drivers/modbus/ModbusDevice.py index 5263168585..cc5c6dc110 100644 --- a/plc4py/plc4py/drivers/modbus/ModbusDevice.py +++ b/plc4py/plc4py/drivers/modbus/ModbusDevice.py @@ -20,6 +20,7 @@ import asyncio import logging from asyncio import Transport from dataclasses import dataclass, field +from math import ceil from typing import Dict, List from bitarray import bitarray @@ -62,6 +63,7 @@ from plc4py.protocols.modbus.readwrite.ModbusPDUWriteMultipleCoilsRequest import from plc4py.protocols.modbus.readwrite.ModbusPDUWriteMultipleHoldingRegistersRequest import ( ModbusPDUWriteMultipleHoldingRegistersRequestBuilder, ) +from protocols.modbus.readwrite.ModbusDataType import ModbusDataType @dataclass @@ -92,13 +94,21 @@ class ModbusDevice: message_future = loop.create_future() if isinstance(tag, ModbusTagCoil): + if tag.data_type.value != ModbusDataType.BOOL.value: + raise NotImplementedError(f"Only BOOL data types can be used with the coil register area") pdu = ModbusPDUReadCoilsRequest(tag.address, tag.quantity) elif isinstance(tag, ModbusTagDiscreteInput): + if tag.data_type.value != ModbusDataType.BOOL.value: + raise NotImplementedError(f"Only BOOL data types can be used with the digital input register area") pdu = ModbusPDUReadDiscreteInputsRequest(tag.address, tag.quantity) elif isinstance(tag, ModbusTagInputRegister): - pdu = ModbusPDUReadInputRegistersRequest(tag.address, tag.quantity) + number_of_registers_per_item = tag.data_type.data_type_size / 2 + number_of_registers = ceil(tag.quantity * number_of_registers_per_item) + pdu = ModbusPDUReadInputRegistersRequest(tag.address,number_of_registers) elif isinstance(tag, ModbusTagHoldingRegister): - pdu = ModbusPDUReadHoldingRegistersRequest(tag.address, tag.quantity) + number_of_registers_per_item = tag.data_type.data_type_size / 2 + number_of_registers = ceil(tag.quantity * number_of_registers_per_item) + pdu = ModbusPDUReadHoldingRegistersRequest(tag.address, number_of_registers) else: raise NotImplementedError( "Modbus tag type not implemented " + str(tag.__class__) @@ -182,10 +192,10 @@ class ModbusDevice: # Create future to be returned when a value is returned loop = asyncio.get_running_loop() message_future = loop.create_future() - + values = request.values[request.tag_names[0]] if isinstance(tag, ModbusTagCoil): pdu = ModbusPDUWriteMultipleCoilsRequest( - tag.address, tag.quantity, [v for k, v in request.values] + tag.address, tag.quantity, values ) elif isinstance(tag, ModbusTagDiscreteInput): raise PlcRuntimeException( @@ -197,7 +207,7 @@ class ModbusDevice: ) elif isinstance(tag, ModbusTagHoldingRegister): pdu = ModbusPDUWriteMultipleHoldingRegistersRequestBuilder( - tag.address, tag.quantity, request.values + tag.address, tag.quantity, [values] ).build() else: raise NotImplementedError( diff --git a/plc4py/plc4py/drivers/modbus/ModbusTag.py b/plc4py/plc4py/drivers/modbus/ModbusTag.py index a67c419784..4ad6c6d50b 100644 --- a/plc4py/plc4py/drivers/modbus/ModbusTag.py +++ b/plc4py/plc4py/drivers/modbus/ModbusTag.py @@ -102,7 +102,7 @@ class ModbusTag(PlcTag): ) data_type = ( - ModbusDataType(matcher.group("datatype")) + ModbusDataType[matcher.group("datatype")] if "datatype" in matcher.groupdict() and matcher.group("datatype") is not None else cls._DEFAULT_DATA_TYPE diff --git a/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py b/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py index 2eec738436..3c631e385e 100644 --- a/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py +++ b/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py @@ -436,7 +436,7 @@ class DataItem: for _ in range(item_count): value.append( PlcSTRING( - str(read_buffer.read_str(8, logical_name="", encoding="")) + read_buffer.read_str(8, logical_name="", encoding="") ) ) diff --git a/plc4py/tests/unit/plc4py/drivers/modbus/test_modbus_connection.py b/plc4py/tests/unit/plc4py/drivers/modbus/test_modbus_connection.py index 62a98fb0bb..4ae14177a9 100644 --- a/plc4py/tests/unit/plc4py/drivers/modbus/test_modbus_connection.py +++ b/plc4py/tests/unit/plc4py/drivers/modbus/test_modbus_connection.py @@ -17,6 +17,7 @@ # under the License. # import time +from unittest import TestCase import pytest @@ -24,10 +25,10 @@ from plc4py.PlcDriverManager import PlcDriverManager from plc4py.api.value.PlcValue import PlcResponseCode import logging -from plc4py.spi.values.PlcValues import PlcINT +from plc4py.spi.values.PlcValues import PlcINT, PlcREAL logger = logging.getLogger("testing") - +TEST_SERVER_IP = "192.168.190.173" @pytest.mark.asyncio async def manual_test_plc_driver_modbus_connect(): @@ -38,7 +39,7 @@ async def manual_test_plc_driver_modbus_connect(): driver_manager = PlcDriverManager() # Establish a connection to the Modbus PLC - async with driver_manager.connection("modbus://1") as connection: + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}") as connection: # Check if the connection is successful assert connection.is_connected() @@ -58,12 +59,32 @@ async def test_plc_driver_modbus_read_coil(): driver_manager = PlcDriverManager() # Establish a connection to the Modbus PLC - async with driver_manager.connection("modbus://192.168.174.128:502") as connection: + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as connection: with connection.read_request_builder() as builder: - builder.add_item("Random Tag", "0x00001[9]") + builder.add_item("Random Tag", "0x00001") request = builder.build() response = await connection.execute(request) value = response.tags["Random Tag"].value + assert value == True + + +@pytest.mark.asyncio +@pytest.mark.xfail +async def test_plc_driver_modbus_read_coil_non_bool(): + """ + Test reading data from a Modbus PLC. + """ + log = logging.getLogger(__name__) + + # Initialize the PlcDriverManager + driver_manager = PlcDriverManager() + + # Establish a connection to the Modbus PLC + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as connection: + with connection.read_request_builder() as builder: + builder.add_item("Random Tag", "0x00001:REAL") + request = builder.build() + TestCase.assertRaises(await connection.execute(request), NotImplementedError) @pytest.mark.asyncio @@ -78,12 +99,34 @@ async def test_plc_driver_modbus_read_coil_array(): driver_manager = PlcDriverManager() # Establish a connection to the Modbus PLC - async with driver_manager.connection("modbus://192.168.174.128:502") as connection: + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as connection: + with connection.read_request_builder() as builder: + builder.add_item("Random Tag", "0x00001[2]") + request = builder.build() + response = await connection.execute(request) + value = response.tags["Random Tag"].value + assert value == [True, False] + + +@pytest.mark.asyncio +@pytest.mark.xfail +async def test_plc_driver_modbus_read_contacts(): + """ + Test reading data from a Modbus PLC. + """ + log = logging.getLogger(__name__) + + # Initialize the PlcDriverManager + driver_manager = PlcDriverManager() + + # Establish a connection to the Modbus PLC + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as connection: with connection.read_request_builder() as builder: - builder.add_item("Random Tag", "0x00001[10]") + builder.add_item("Random Tag", "1x00001") request = builder.build() response = await connection.execute(request) value = response.tags["Random Tag"].value + assert value == True @pytest.mark.asyncio @@ -97,17 +140,17 @@ async def test_plc_driver_modbus_read_contact_array(): driver_manager = PlcDriverManager() # Establish a connection to the Modbus PLC - async with driver_manager.connection("modbus://192.168.174.128:502") as connection: + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as connection: with connection.read_request_builder() as builder: - builder.add_item("Random Tag", "1x00001[10]") + builder.add_item("Random Tag", "1x00001[2]") request = builder.build() response = await connection.execute(request) value = response.tags["Random Tag"].value - pass + assert value == [True, False] @pytest.mark.asyncio -async def test_plc_driver_modbus_read_input_register_array(): +async def test_plc_driver_modbus_read_input_register(): """ Test reading data from a Modbus PLC. """ @@ -117,17 +160,37 @@ async def test_plc_driver_modbus_read_input_register_array(): driver_manager = PlcDriverManager() # Establish a connection to the Modbus PLC - async with driver_manager.connection("modbus://192.168.174.128:502") as connection: + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as connection: with connection.read_request_builder() as builder: builder.add_item("Random Tag", "3x00001") request = builder.build() response = await connection.execute(request) value = response.tags["Random Tag"].value - pass + assert value == 333 + + +@pytest.mark.asyncio +async def test_plc_driver_modbus_read_input_register_array(): + """ + Test reading data from a Modbus PLC. + """ + log = logging.getLogger(__name__) + + # Initialize the PlcDriverManager + driver_manager = PlcDriverManager() + + # Establish a connection to the Modbus PLC + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as connection: + with connection.read_request_builder() as builder: + builder.add_item("Random Tag", "3x00001[2]") + request = builder.build() + response = await connection.execute(request) + value = response.tags["Random Tag"].value + assert value == [333, 0] @pytest.mark.asyncio -async def test_plc_driver_modbus_read_holding_array(): +async def test_plc_driver_modbus_read_holding(): """ Test reading data from a Modbus PLC. """ @@ -137,13 +200,93 @@ async def test_plc_driver_modbus_read_holding_array(): driver_manager = PlcDriverManager() # Establish a connection to the Modbus PLC - async with driver_manager.connection("modbus://192.168.174.128:502") as connection: + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as connection: with connection.read_request_builder() as builder: builder.add_item("Random Tag", "4x00001") request = builder.build() response = await connection.execute(request) value = response.tags["Random Tag"].value - pass + assert value == 874 + + +@pytest.mark.asyncio +async def test_plc_driver_modbus_read_holding(): + """ + Test reading data from a Modbus PLC. + """ + log = logging.getLogger(__name__) + + # Initialize the PlcDriverManager + driver_manager = PlcDriverManager() + + # Establish a connection to the Modbus PLC + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as connection: + with connection.read_request_builder() as builder: + builder.add_item("Random Tag", "4x00001[2]") + request = builder.build() + response = await connection.execute(request) + value = response.tags["Random Tag"].value + assert value == [874, 0] + + +@pytest.mark.asyncio +async def test_plc_driver_modbus_read_holding_real(): + """ + Test reading data from a Modbus PLC. + """ + log = logging.getLogger(__name__) + + # Initialize the PlcDriverManager + driver_manager = PlcDriverManager() + + # Establish a connection to the Modbus PLC + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as connection: + with connection.read_request_builder() as builder: + builder.add_item("Random Tag", "4x00001:REAL[2]") + request = builder.build() + response = await connection.execute(request) + value = response.tags["Random Tag"].value + assert value == [PlcREAL(value=6.876641952310382e-37), PlcREAL(value=0.0)] + + +@pytest.mark.asyncio +async def test_plc_driver_modbus_read_holding_string_even(): + """ + Test reading data from a Modbus PLC. + """ + log = logging.getLogger(__name__) + + # Initialize the PlcDriverManager + driver_manager = PlcDriverManager() + + # Establish a connection to the Modbus PLC + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as connection: + with connection.read_request_builder() as builder: + builder.add_item("Random Tag", "4x00041:CHAR[6]") + request = builder.build() + response = await connection.execute(request) + value = response.tags["Random Tag"].value + assert value == [b'F', b'A', b'F', b'B', b'C', b'B'] + + +@pytest.mark.asyncio +async def test_plc_driver_modbus_read_holding_string_odd(): + """ + Test reading data from a Modbus PLC. + """ + log = logging.getLogger(__name__) + + # Initialize the PlcDriverManager + driver_manager = PlcDriverManager() + + # Establish a connection to the Modbus PLC + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as connection: + with connection.read_request_builder() as builder: + builder.add_item("Random Tag", "4x00041:CHAR[5]") + request = builder.build() + response = await connection.execute(request) + value = response.tags["Random Tag"].value + assert value == [b'F', b'A', b'F', b'B', b'C'] @pytest.mark.asyncio @@ -157,10 +300,10 @@ async def test_plc_driver_modbus_write_holding(): driver_manager = PlcDriverManager() # Establish a connection to the Modbus PLC - async with driver_manager.connection("modbus://192.168.174.128:502") as connection: + async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as connection: with connection.write_request_builder() as builder: builder.add_item("Random Tag", "4x00001", 2) request = builder.build() response = await connection.execute(request) value = response.tags["Random Tag"] - pass \ No newline at end of file + pass