This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch feat/plc4py/modbus_write
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/feat/plc4py/modbus_write by 
this push:
     new 257a60dfee feat(plc4py): Add some manual modbus tests
257a60dfee is described below

commit 257a60dfee48c00a1e4d9e28fbe205eb8b05be8e
Author: hutcheb <ben.hut...@gmail.com>
AuthorDate: Sat Aug 31 12:02:01 2024 +0800

    feat(plc4py): Add some manual modbus tests
---
 plc4py/plc4py/drivers/modbus/ModbusDevice.py       |  20 ++-
 plc4py/plc4py/drivers/modbus/ModbusTag.py          |   2 +-
 .../plc4py/protocols/modbus/readwrite/DataItem.py  |   2 +-
 .../drivers/modbus/test_modbus_connection.py       | 179 ++++++++++++++++++---
 4 files changed, 178 insertions(+), 25 deletions(-)

diff --git a/plc4py/plc4py/drivers/modbus/ModbusDevice.py 
b/plc4py/plc4py/drivers/modbus/ModbusDevice.py
index 5263168585..cc5c6dc110 100644
--- a/plc4py/plc4py/drivers/modbus/ModbusDevice.py
+++ b/plc4py/plc4py/drivers/modbus/ModbusDevice.py
@@ -20,6 +20,7 @@ import asyncio
 import logging
 from asyncio import Transport
 from dataclasses import dataclass, field
+from math import ceil
 from typing import Dict, List
 
 from bitarray import bitarray
@@ -62,6 +63,7 @@ from 
plc4py.protocols.modbus.readwrite.ModbusPDUWriteMultipleCoilsRequest import
 from 
plc4py.protocols.modbus.readwrite.ModbusPDUWriteMultipleHoldingRegistersRequest 
import (
     ModbusPDUWriteMultipleHoldingRegistersRequestBuilder,
 )
+from protocols.modbus.readwrite.ModbusDataType import ModbusDataType
 
 
 @dataclass
@@ -92,13 +94,21 @@ class ModbusDevice:
         message_future = loop.create_future()
 
         if isinstance(tag, ModbusTagCoil):
+            if tag.data_type.value != ModbusDataType.BOOL.value:
+                raise NotImplementedError(f"Only BOOL data types can be used 
with the coil register area")
             pdu = ModbusPDUReadCoilsRequest(tag.address, tag.quantity)
         elif isinstance(tag, ModbusTagDiscreteInput):
+            if tag.data_type.value != ModbusDataType.BOOL.value:
+                raise NotImplementedError(f"Only BOOL data types can be used 
with the digital input register area")
             pdu = ModbusPDUReadDiscreteInputsRequest(tag.address, tag.quantity)
         elif isinstance(tag, ModbusTagInputRegister):
-            pdu = ModbusPDUReadInputRegistersRequest(tag.address, tag.quantity)
+            number_of_registers_per_item = tag.data_type.data_type_size / 2
+            number_of_registers = ceil(tag.quantity * 
number_of_registers_per_item)
+            pdu = 
ModbusPDUReadInputRegistersRequest(tag.address,number_of_registers)
         elif isinstance(tag, ModbusTagHoldingRegister):
-            pdu = ModbusPDUReadHoldingRegistersRequest(tag.address, 
tag.quantity)
+            number_of_registers_per_item = tag.data_type.data_type_size / 2
+            number_of_registers = ceil(tag.quantity * 
number_of_registers_per_item)
+            pdu = ModbusPDUReadHoldingRegistersRequest(tag.address, 
number_of_registers)
         else:
             raise NotImplementedError(
                 "Modbus tag type not implemented " + str(tag.__class__)
@@ -182,10 +192,10 @@ class ModbusDevice:
         # Create future to be returned when a value is returned
         loop = asyncio.get_running_loop()
         message_future = loop.create_future()
-
+        values = request.values[request.tag_names[0]]
         if isinstance(tag, ModbusTagCoil):
             pdu = ModbusPDUWriteMultipleCoilsRequest(
-                tag.address, tag.quantity, [v for k, v in request.values]
+                tag.address, tag.quantity, values
             )
         elif isinstance(tag, ModbusTagDiscreteInput):
             raise PlcRuntimeException(
@@ -197,7 +207,7 @@ class ModbusDevice:
             )
         elif isinstance(tag, ModbusTagHoldingRegister):
             pdu = ModbusPDUWriteMultipleHoldingRegistersRequestBuilder(
-                tag.address, tag.quantity, request.values
+                tag.address, tag.quantity, [values]
             ).build()
         else:
             raise NotImplementedError(
diff --git a/plc4py/plc4py/drivers/modbus/ModbusTag.py 
b/plc4py/plc4py/drivers/modbus/ModbusTag.py
index a67c419784..4ad6c6d50b 100644
--- a/plc4py/plc4py/drivers/modbus/ModbusTag.py
+++ b/plc4py/plc4py/drivers/modbus/ModbusTag.py
@@ -102,7 +102,7 @@ class ModbusTag(PlcTag):
             )
 
         data_type = (
-            ModbusDataType(matcher.group("datatype"))
+            ModbusDataType[matcher.group("datatype")]
             if "datatype" in matcher.groupdict()
             and matcher.group("datatype") is not None
             else cls._DEFAULT_DATA_TYPE
diff --git a/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py 
b/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py
index 2eec738436..3c631e385e 100644
--- a/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py
+++ b/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py
@@ -436,7 +436,7 @@ class DataItem:
             for _ in range(item_count):
                 value.append(
                     PlcSTRING(
-                        str(read_buffer.read_str(8, logical_name="", 
encoding=""))
+                        read_buffer.read_str(8, logical_name="", encoding="")
                     )
                 )
 
diff --git a/plc4py/tests/unit/plc4py/drivers/modbus/test_modbus_connection.py 
b/plc4py/tests/unit/plc4py/drivers/modbus/test_modbus_connection.py
index 62a98fb0bb..4ae14177a9 100644
--- a/plc4py/tests/unit/plc4py/drivers/modbus/test_modbus_connection.py
+++ b/plc4py/tests/unit/plc4py/drivers/modbus/test_modbus_connection.py
@@ -17,6 +17,7 @@
 # under the License.
 #
 import time
+from unittest import TestCase
 
 import pytest
 
@@ -24,10 +25,10 @@ from plc4py.PlcDriverManager import PlcDriverManager
 from plc4py.api.value.PlcValue import PlcResponseCode
 import logging
 
-from plc4py.spi.values.PlcValues import PlcINT
+from plc4py.spi.values.PlcValues import PlcINT, PlcREAL
 
 logger = logging.getLogger("testing")
-
+TEST_SERVER_IP = "192.168.190.173"
 
 @pytest.mark.asyncio
 async def manual_test_plc_driver_modbus_connect():
@@ -38,7 +39,7 @@ async def manual_test_plc_driver_modbus_connect():
     driver_manager = PlcDriverManager()
 
     # Establish a connection to the Modbus PLC
-    async with driver_manager.connection("modbus://1") as connection:
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}") as 
connection:
         # Check if the connection is successful
         assert connection.is_connected()
 
@@ -58,12 +59,32 @@ async def test_plc_driver_modbus_read_coil():
     driver_manager = PlcDriverManager()
 
     # Establish a connection to the Modbus PLC
-    async with driver_manager.connection("modbus://192.168.174.128:502") as 
connection:
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as 
connection:
         with connection.read_request_builder() as builder:
-            builder.add_item("Random Tag", "0x00001[9]")
+            builder.add_item("Random Tag", "0x00001")
             request = builder.build()
             response = await connection.execute(request)
             value = response.tags["Random Tag"].value
+            assert value == True
+
+
+@pytest.mark.asyncio
+@pytest.mark.xfail
+async def test_plc_driver_modbus_read_coil_non_bool():
+    """
+    Test reading data from a Modbus PLC.
+    """
+    log = logging.getLogger(__name__)
+
+    # Initialize the PlcDriverManager
+    driver_manager = PlcDriverManager()
+
+    # Establish a connection to the Modbus PLC
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as 
connection:
+        with connection.read_request_builder() as builder:
+            builder.add_item("Random Tag", "0x00001:REAL")
+            request = builder.build()
+            TestCase.assertRaises(await connection.execute(request), 
NotImplementedError)
 
 
 @pytest.mark.asyncio
@@ -78,12 +99,34 @@ async def test_plc_driver_modbus_read_coil_array():
     driver_manager = PlcDriverManager()
 
     # Establish a connection to the Modbus PLC
-    async with driver_manager.connection("modbus://192.168.174.128:502") as 
connection:
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as 
connection:
+        with connection.read_request_builder() as builder:
+            builder.add_item("Random Tag", "0x00001[2]")
+            request = builder.build()
+            response = await connection.execute(request)
+            value = response.tags["Random Tag"].value
+            assert value == [True, False]
+
+
+@pytest.mark.asyncio
+@pytest.mark.xfail
+async def test_plc_driver_modbus_read_contacts():
+    """
+    Test reading data from a Modbus PLC.
+    """
+    log = logging.getLogger(__name__)
+
+    # Initialize the PlcDriverManager
+    driver_manager = PlcDriverManager()
+
+    # Establish a connection to the Modbus PLC
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as 
connection:
         with connection.read_request_builder() as builder:
-            builder.add_item("Random Tag", "0x00001[10]")
+            builder.add_item("Random Tag", "1x00001")
             request = builder.build()
             response = await connection.execute(request)
             value = response.tags["Random Tag"].value
+            assert value == True
 
 
 @pytest.mark.asyncio
@@ -97,17 +140,17 @@ async def test_plc_driver_modbus_read_contact_array():
     driver_manager = PlcDriverManager()
 
     # Establish a connection to the Modbus PLC
-    async with driver_manager.connection("modbus://192.168.174.128:502") as 
connection:
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as 
connection:
         with connection.read_request_builder() as builder:
-            builder.add_item("Random Tag", "1x00001[10]")
+            builder.add_item("Random Tag", "1x00001[2]")
             request = builder.build()
             response = await connection.execute(request)
             value = response.tags["Random Tag"].value
-            pass
+            assert value == [True, False]
 
 
 @pytest.mark.asyncio
-async def test_plc_driver_modbus_read_input_register_array():
+async def test_plc_driver_modbus_read_input_register():
     """
     Test reading data from a Modbus PLC.
     """
@@ -117,17 +160,37 @@ async def 
test_plc_driver_modbus_read_input_register_array():
     driver_manager = PlcDriverManager()
 
     # Establish a connection to the Modbus PLC
-    async with driver_manager.connection("modbus://192.168.174.128:502") as 
connection:
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as 
connection:
         with connection.read_request_builder() as builder:
             builder.add_item("Random Tag", "3x00001")
             request = builder.build()
             response = await connection.execute(request)
             value = response.tags["Random Tag"].value
-            pass
+            assert value == 333
+
+
+@pytest.mark.asyncio
+async def test_plc_driver_modbus_read_input_register_array():
+    """
+    Test reading data from a Modbus PLC.
+    """
+    log = logging.getLogger(__name__)
+
+    # Initialize the PlcDriverManager
+    driver_manager = PlcDriverManager()
+
+    # Establish a connection to the Modbus PLC
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as 
connection:
+        with connection.read_request_builder() as builder:
+            builder.add_item("Random Tag", "3x00001[2]")
+            request = builder.build()
+            response = await connection.execute(request)
+            value = response.tags["Random Tag"].value
+            assert value == [333, 0]
 
 
 @pytest.mark.asyncio
-async def test_plc_driver_modbus_read_holding_array():
+async def test_plc_driver_modbus_read_holding():
     """
     Test reading data from a Modbus PLC.
     """
@@ -137,13 +200,93 @@ async def test_plc_driver_modbus_read_holding_array():
     driver_manager = PlcDriverManager()
 
     # Establish a connection to the Modbus PLC
-    async with driver_manager.connection("modbus://192.168.174.128:502") as 
connection:
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as 
connection:
         with connection.read_request_builder() as builder:
             builder.add_item("Random Tag", "4x00001")
             request = builder.build()
             response = await connection.execute(request)
             value = response.tags["Random Tag"].value
-            pass
+            assert value == 874
+
+
+@pytest.mark.asyncio
+async def test_plc_driver_modbus_read_holding():
+    """
+    Test reading data from a Modbus PLC.
+    """
+    log = logging.getLogger(__name__)
+
+    # Initialize the PlcDriverManager
+    driver_manager = PlcDriverManager()
+
+    # Establish a connection to the Modbus PLC
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as 
connection:
+        with connection.read_request_builder() as builder:
+            builder.add_item("Random Tag", "4x00001[2]")
+            request = builder.build()
+            response = await connection.execute(request)
+            value = response.tags["Random Tag"].value
+            assert value == [874, 0]
+
+
+@pytest.mark.asyncio
+async def test_plc_driver_modbus_read_holding_real():
+    """
+    Test reading data from a Modbus PLC.
+    """
+    log = logging.getLogger(__name__)
+
+    # Initialize the PlcDriverManager
+    driver_manager = PlcDriverManager()
+
+    # Establish a connection to the Modbus PLC
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as 
connection:
+        with connection.read_request_builder() as builder:
+            builder.add_item("Random Tag", "4x00001:REAL[2]")
+            request = builder.build()
+            response = await connection.execute(request)
+            value = response.tags["Random Tag"].value
+            assert value == [PlcREAL(value=6.876641952310382e-37), 
PlcREAL(value=0.0)]
+
+
+@pytest.mark.asyncio
+async def test_plc_driver_modbus_read_holding_string_even():
+    """
+    Test reading data from a Modbus PLC.
+    """
+    log = logging.getLogger(__name__)
+
+    # Initialize the PlcDriverManager
+    driver_manager = PlcDriverManager()
+
+    # Establish a connection to the Modbus PLC
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as 
connection:
+        with connection.read_request_builder() as builder:
+            builder.add_item("Random Tag", "4x00041:CHAR[6]")
+            request = builder.build()
+            response = await connection.execute(request)
+            value = response.tags["Random Tag"].value
+            assert value == [b'F', b'A', b'F', b'B', b'C', b'B']
+
+
+@pytest.mark.asyncio
+async def test_plc_driver_modbus_read_holding_string_odd():
+    """
+    Test reading data from a Modbus PLC.
+    """
+    log = logging.getLogger(__name__)
+
+    # Initialize the PlcDriverManager
+    driver_manager = PlcDriverManager()
+
+    # Establish a connection to the Modbus PLC
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as 
connection:
+        with connection.read_request_builder() as builder:
+            builder.add_item("Random Tag", "4x00041:CHAR[5]")
+            request = builder.build()
+            response = await connection.execute(request)
+            value = response.tags["Random Tag"].value
+            assert value == [b'F', b'A', b'F', b'B', b'C']
 
 
 @pytest.mark.asyncio
@@ -157,10 +300,10 @@ async def test_plc_driver_modbus_write_holding():
     driver_manager = PlcDriverManager()
 
     # Establish a connection to the Modbus PLC
-    async with driver_manager.connection("modbus://192.168.174.128:502") as 
connection:
+    async with driver_manager.connection(f"modbus://{TEST_SERVER_IP}:502") as 
connection:
         with connection.write_request_builder() as builder:
             builder.add_item("Random Tag", "4x00001", 2)
             request = builder.build()
             response = await connection.execute(request)
             value = response.tags["Random Tag"]
-            pass
\ No newline at end of file
+            pass

Reply via email to