This is an automated email from the ASF dual-hosted git repository.

ldywicki pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 34a9db2fa7 Register all requested consumers with the opc ua 
subscription
34a9db2fa7 is described below

commit 34a9db2fa70b136b59fdfdefb6985b1eaae2a5fa
Author: Jonas Halvarsson <[email protected]>
AuthorDate: Mon Sep 15 17:02:33 2025 +0200

    Register all requested consumers with the opc ua subscription
    
    Signed-off-by: Jonas Halvarsson <[email protected]>
---
 .../plc4x/java/opcua/protocol/OpcuaProtocolLogic.java   |  9 +++++++++
 .../java/opcua/protocol/OpcuaSubscriptionHandle.java    | 17 ++++++++++++++++-
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
 
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
index c8f735fbff..41d8c94cd7 100644
--- 
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
+++ 
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
@@ -811,6 +811,15 @@ public class OpcuaProtocolLogic extends 
Plc4xProtocolBase<OpcuaAPU> implements H
                 long subscriptionId = response.getSubscriptionId();
                 OpcuaSubscriptionHandle handle = new 
OpcuaSubscriptionHandle(this, tm,
                     conversation, subscriptionRequest, subscriptionId, 
cycleTime);
+                if (subscriptionRequest.getConsumer() != null) {
+                    handle.register(subscriptionRequest.getConsumer());
+                }
+                subscriptionRequest.getTagNames().forEach(tagName -> {
+                    Consumer<PlcSubscriptionEvent> tagConsumer = 
subscriptionRequest.getTagConsumer(tagName);
+                    if (tagConsumer != null) {
+                        handle.registerTagConsumer(tagName, tagConsumer);
+                    }
+                });
                 subscriptions.put(handle.getSubscriptionId(), handle);
                 return handle;
             })
diff --git 
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
 
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
index 19e248a346..dd159ff334 100644
--- 
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
+++ 
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
@@ -64,6 +64,7 @@ public class OpcuaSubscriptionHandle extends 
DefaultPlcSubscriptionHandle {
 
     private final Logger logger = 
LoggerFactory.getLogger(OpcuaSubscriptionHandle.class);
     private final Set<Consumer<PlcSubscriptionEvent>> consumers;
+    private final Map<String, Consumer<PlcSubscriptionEvent>> tagConsumers;
     private final List<String> tagNames;
     private final Conversation conversation;
     private final PlcSubscriptionRequest subscriptionRequest;
@@ -83,6 +84,7 @@ public class OpcuaSubscriptionHandle extends 
DefaultPlcSubscriptionHandle {
         super(plcSubscriber);
         this.tm = tm;
         this.consumers = new HashSet<>();
+        this.tagConsumers = new HashMap<>();
         this.subscriptionRequest = subscriptionRequest;
         this.tagNames = new ArrayList<>(subscriptionRequest.getTagNames());
         this.conversation = conversation;
@@ -292,8 +294,15 @@ public class OpcuaSubscriptionHandle extends 
DefaultPlcSubscriptionHandle {
         Map<String, PlcTag> tagMap = new LinkedHashMap<>();
         for (MonitoredItemNotification value : values) {
             String tagName = tagNames.get((int) value.getClientHandle() - 1);
-            tagMap.put(tagName, subscriptionRequest.getTag(tagName).getTag());
+            PlcTag tag = subscriptionRequest.getTag(tagName).getTag();
+            tagMap.put(tagName, tag);
             dataValues.add(value.getValue());
+            Consumer<PlcSubscriptionEvent> tagConsumer = 
tagConsumers.get(tagName);
+            if (tagConsumer != null) {
+                Entry<Map<String, Metadata>, Map<String, 
PlcResponseItem<PlcValue>>> mappedResponse = 
plcSubscriber.readResponse(Map.of(tagName, tag), List.of(value.getValue()), 
responseMetadata);
+                PlcSubscriptionEvent event = new 
DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(receiveTs), 
mappedResponse.getValue(), mappedResponse.getKey());
+                tagConsumer.accept(event);
+            }
         }
 
         Entry<Map<String, Metadata>, Map<String, PlcResponseItem<PlcValue>>> 
mappedResponse = plcSubscriber.readResponse(tagMap, dataValues, 
responseMetadata);
@@ -346,6 +355,12 @@ public class OpcuaSubscriptionHandle extends 
DefaultPlcSubscriptionHandle {
         return new DefaultPlcConsumerRegistration(plcSubscriber, consumer, 
this);
     }
 
+    public PlcConsumerRegistration registerTagConsumer(String tagName, 
Consumer<PlcSubscriptionEvent> consumer) {
+        logger.info("Registering a new OPCUA subscription consumer for tag 
with name " + tagName);
+        tagConsumers.put(tagName, consumer);
+        return new DefaultPlcConsumerRegistration(plcSubscriber, consumer, 
this);
+    }
+
     public Long getSubscriptionId() {
         return subscriptionId;
     }

Reply via email to