This is an automated email from the ASF dual-hosted git repository.
ldywicki pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 34a9db2fa7 Register all requested consumers with the opc ua
subscription
34a9db2fa7 is described below
commit 34a9db2fa70b136b59fdfdefb6985b1eaae2a5fa
Author: Jonas Halvarsson <[email protected]>
AuthorDate: Mon Sep 15 17:02:33 2025 +0200
Register all requested consumers with the opc ua subscription
Signed-off-by: Jonas Halvarsson <[email protected]>
---
.../plc4x/java/opcua/protocol/OpcuaProtocolLogic.java | 9 +++++++++
.../java/opcua/protocol/OpcuaSubscriptionHandle.java | 17 ++++++++++++++++-
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
index c8f735fbff..41d8c94cd7 100644
---
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
+++
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
@@ -811,6 +811,15 @@ public class OpcuaProtocolLogic extends
Plc4xProtocolBase<OpcuaAPU> implements H
long subscriptionId = response.getSubscriptionId();
OpcuaSubscriptionHandle handle = new
OpcuaSubscriptionHandle(this, tm,
conversation, subscriptionRequest, subscriptionId,
cycleTime);
+ if (subscriptionRequest.getConsumer() != null) {
+ handle.register(subscriptionRequest.getConsumer());
+ }
+ subscriptionRequest.getTagNames().forEach(tagName -> {
+ Consumer<PlcSubscriptionEvent> tagConsumer =
subscriptionRequest.getTagConsumer(tagName);
+ if (tagConsumer != null) {
+ handle.registerTagConsumer(tagName, tagConsumer);
+ }
+ });
subscriptions.put(handle.getSubscriptionId(), handle);
return handle;
})
diff --git
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
index 19e248a346..dd159ff334 100644
---
a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
+++
b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java
@@ -64,6 +64,7 @@ public class OpcuaSubscriptionHandle extends
DefaultPlcSubscriptionHandle {
private final Logger logger =
LoggerFactory.getLogger(OpcuaSubscriptionHandle.class);
private final Set<Consumer<PlcSubscriptionEvent>> consumers;
+ private final Map<String, Consumer<PlcSubscriptionEvent>> tagConsumers;
private final List<String> tagNames;
private final Conversation conversation;
private final PlcSubscriptionRequest subscriptionRequest;
@@ -83,6 +84,7 @@ public class OpcuaSubscriptionHandle extends
DefaultPlcSubscriptionHandle {
super(plcSubscriber);
this.tm = tm;
this.consumers = new HashSet<>();
+ this.tagConsumers = new HashMap<>();
this.subscriptionRequest = subscriptionRequest;
this.tagNames = new ArrayList<>(subscriptionRequest.getTagNames());
this.conversation = conversation;
@@ -292,8 +294,15 @@ public class OpcuaSubscriptionHandle extends
DefaultPlcSubscriptionHandle {
Map<String, PlcTag> tagMap = new LinkedHashMap<>();
for (MonitoredItemNotification value : values) {
String tagName = tagNames.get((int) value.getClientHandle() - 1);
- tagMap.put(tagName, subscriptionRequest.getTag(tagName).getTag());
+ PlcTag tag = subscriptionRequest.getTag(tagName).getTag();
+ tagMap.put(tagName, tag);
dataValues.add(value.getValue());
+ Consumer<PlcSubscriptionEvent> tagConsumer =
tagConsumers.get(tagName);
+ if (tagConsumer != null) {
+ Entry<Map<String, Metadata>, Map<String,
PlcResponseItem<PlcValue>>> mappedResponse =
plcSubscriber.readResponse(Map.of(tagName, tag), List.of(value.getValue()),
responseMetadata);
+ PlcSubscriptionEvent event = new
DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(receiveTs),
mappedResponse.getValue(), mappedResponse.getKey());
+ tagConsumer.accept(event);
+ }
}
Entry<Map<String, Metadata>, Map<String, PlcResponseItem<PlcValue>>>
mappedResponse = plcSubscriber.readResponse(tagMap, dataValues,
responseMetadata);
@@ -346,6 +355,12 @@ public class OpcuaSubscriptionHandle extends
DefaultPlcSubscriptionHandle {
return new DefaultPlcConsumerRegistration(plcSubscriber, consumer,
this);
}
+ public PlcConsumerRegistration registerTagConsumer(String tagName,
Consumer<PlcSubscriptionEvent> consumer) {
+ logger.info("Registering a new OPCUA subscription consumer for tag
with name " + tagName);
+ tagConsumers.put(tagName, consumer);
+ return new DefaultPlcConsumerRegistration(plcSubscriber, consumer,
this);
+ }
+
public Long getSubscriptionId() {
return subscriptionId;
}