This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e06946fac0 Pipe: Rewrote the OPC UA subscription logic to avoid the 
bug of third-party subscription model (#17525)
4e06946fac0 is described below

commit 4e06946fac0cc54d47f5487b2a3b08180f46f2be
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 21 16:46:06 2026 +0800

    Pipe: Rewrote the OPC UA subscription logic to avoid the bug of third-party 
subscription model (#17525)
    
    * complete
    
    * Debounce
    
    * if
---
 .../treemodel/auto/basic/IoTDBPipeAutoSplitIT.java |   4 +-
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     |  13 +-
 .../sink/protocol/opcua/server/OpcUaNameSpace.java | 156 +++++++++++++++++++--
 .../protocol/opcua/server/OpcUaServerBuilder.java  |  14 +-
 .../pipe/config/constant/PipeSinkConstant.java     |   5 +
 5 files changed, 174 insertions(+), 18 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
index 2ebad93348c..5be609aba2f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
@@ -111,8 +111,8 @@ public class IoTDBPipeAutoSplitIT extends 
AbstractPipeDualTreeModelAutoIT {
     TestUtils.executeNonQueries(
         senderEnv,
         Arrays.asList(
-            "drop pipe a2b_history",
-            "drop pipe a2b_realtime",
+            "drop pipe if exists a2b_history",
+            "drop pipe if exists a2b_realtime",
             String.format(
                 "create pipe a2b1 with source ('inclusion'='schema') with sink 
('node-urls'='%s')",
                 receiverDataNode.getIpAndPortString()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 532ebf60159..b5f383619e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -69,6 +69,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY;
@@ -109,6 +111,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY;
@@ -308,6 +311,10 @@ public class OpcUaSink implements PipeConnector {
     if (securityPolicies.isEmpty()) {
       throw new PipeException("The security policy cannot be empty.");
     }
+    final long debounceTimeMs =
+        parameters.getLongOrDefault(
+            Arrays.asList(CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY, 
SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY),
+            CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE);
 
     synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
       serverKey = httpsBindPort + ":" + tcpBindPort;
@@ -327,7 +334,8 @@ public class OpcUaSink implements PipeConnector {
                                 .setPassword(password)
                                 .setSecurityDir(securityDir)
                                 
.setEnableAnonymousAccess(enableAnonymousAccess)
-                                .setSecurityPolicies(securityPolicies);
+                                .setSecurityPolicies(securityPolicies)
+                                .setDebounceTimeMs(debounceTimeMs);
                         final OpcUaServer newServer = builder.build();
                         nameSpace = new OpcUaNameSpace(newServer, builder);
                         nameSpace.startup();
@@ -341,7 +349,8 @@ public class OpcUaSink implements PipeConnector {
                                 password,
                                 securityDir,
                                 enableAnonymousAccess,
-                                securityPolicies);
+                                securityPolicies,
+                                debounceTimeMs);
                         return oldValue;
                       }
                     } catch (final PipeException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 3c79a3aa304..d1182e59b52 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -47,7 +47,7 @@ import 
org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode;
 import org.eclipse.milo.opcua.sdk.server.nodes.UaFolderNode;
 import org.eclipse.milo.opcua.sdk.server.nodes.UaNode;
 import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode;
-import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
 import org.eclipse.milo.opcua.stack.core.Identifiers;
 import org.eclipse.milo.opcua.stack.core.UaException;
 import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
@@ -57,6 +57,7 @@ import 
org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,20 +72,34 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcUaNameSpace.class);
   public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
-  private final SubscriptionModel subscriptionModel;
   private final OpcUaServerBuilder builder;
 
+  // Do not use subscription model because the original subscription model has 
some bugs
+  private final ConcurrentMap<NodeId, List<DataItem>> nodeSubscriptions = new 
ConcurrentHashMap<>();
+
+  // Debounce task cache: used to merge updates within a short period of time, 
avoiding unnecessary
+  // duplicate pushes
+  private final ConcurrentMap<NodeId, ScheduledFuture<?>> debounceTasks = new 
ConcurrentHashMap<>();
+  // Debounce interval: within 10ms, the same node is updated multiple times, 
and only the last one
+  // will be pushed (can be adjusted according to your site delay 
requirements, the minimum can be
+  // set to 1ms)
+  private final long debounceIntervalMs;
+
   public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder 
builder) {
     super(server, NAMESPACE_URI);
     this.builder = builder;
+    debounceIntervalMs = builder.getDebounceTimeMs();
 
-    subscriptionModel = new SubscriptionModel(server, this);
-    getLifecycleManager().addLifecycle(subscriptionModel);
     getLifecycleManager()
         .addLifecycle(
             new Lifecycle() {
@@ -291,7 +306,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
         if (Objects.isNull(measurementNode.getValue())
             || Objects.isNull(measurementNode.getValue().getSourceTime())
             || measurementNode.getValue().getSourceTime().getUtcTime() < 
utcTimestamp) {
-          measurementNode.setValue(dataValue);
+          notifyNodeValueChange(measurementNode.getNodeId(), dataValue, 
measurementNode);
         }
       } else {
         value = values.get(i);
@@ -311,9 +326,11 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       if (Objects.isNull(valueNode.getValue())
           || Objects.isNull(valueNode.getValue().getSourceTime())
           || valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
-        valueNode.setValue(
+        notifyNodeValueChange(
+            valueNode.getNodeId(),
             new DataValue(
-                new Variant(value), currentQuality, new DateTime(timestamp), 
new DateTime()));
+                new Variant(value), currentQuality, new DateTime(timestamp), 
new DateTime()),
+            valueNode);
       }
     }
   }
@@ -546,24 +563,131 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     }
   }
 
+  /**
+   * On point value changing, notify all subscribed clients proactively
+   *
+   * @param nodeId NodeId of the changing node
+   * @param newValue New value of the node (DataValue object containing value, 
status code, and
+   *     timestamp)
+   * @param variableNode Corresponding UaVariableNode instance, used to update 
the local cached
+   *     value of the node
+   */
+  public void notifyNodeValueChange(
+      NodeId nodeId, DataValue newValue, UaVariableNode variableNode) {
+    // 1. Update the local cached value of the node
+    variableNode.setValue(newValue);
+
+    // 2. If there are no subscribers, return directly without doing any extra 
operations
+    List<DataItem> subscribedItems = nodeSubscriptions.get(nodeId);
+    if (subscribedItems == null || subscribedItems.isEmpty()) {
+      return;
+    }
+
+    // 2. Debounce+Async Push: Asynchronously push the expensive push 
operation, while merging
+    // high-frequency repeated updates
+    debounceTasks.compute(
+        nodeId,
+        (k, oldTask) -> {
+          // If there is already a pending push task, cancel it, we only need 
the latest value
+          if (oldTask != null && !oldTask.isDone()) {
+            oldTask.cancel(false);
+          }
+
+          // Submit the push task to the Milo's scheduled thread pool, delay 
DEBOUNCE_INTERVAL_MS
+          // execution
+          return getServer()
+              .getScheduledExecutorService()
+              .schedule(
+                  () -> {
+                    try {
+                      // Batch push changes to all subscribers, this 
time-consuming operation is put
+                      // into the thread pool, not blocking your data update 
thread
+                      for (DataItem item : subscribedItems) {
+                        try {
+                          item.setValue(newValue);
+                        } catch (Exception e) {
+                          // Single client push failure does not affect other 
clients
+                          LOGGER.warn(
+                              "Failed to push value change to client, 
nodeId={}", nodeId, e);
+                        }
+                      }
+                    } finally {
+                      // Task execution completed, clean up the debounce cache
+                      debounceTasks.remove(nodeId);
+                    }
+                  },
+                  debounceIntervalMs,
+                  TimeUnit.MILLISECONDS);
+        });
+  }
+
   @Override
   public void onDataItemsCreated(final List<DataItem> dataItems) {
-    subscriptionModel.onDataItemsCreated(dataItems);
+    for (DataItem item : dataItems) {
+      final ReadValueId readValueId = item.getReadValueId();
+      // Only handle Value attribute subscription (align with the original 
SubscriptionModel logic,
+      // ignore other attribute subscriptions)
+      if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
+        continue;
+      }
+      final NodeId nodeId = readValueId.getNodeId();
+
+      // 1. Add the new subscription item to the subscription mapping
+      nodeSubscriptions.compute(
+          nodeId,
+          (k, existingList) -> {
+            List<DataItem> list =
+                existingList != null ? existingList : new 
CopyOnWriteArrayList<>();
+            list.add(item);
+            return list;
+          });
+
+      // 2. 【Key Optimization】Proactively push the current node's initial 
value when the new
+      // subscription item is created
+      // Eliminate Bad_WaitingForInitialData, no need to wait for any polling
+      try {
+        UaVariableNode node = (UaVariableNode) 
getNodeManager().getNode(nodeId).orElse(null);
+        if (node != null && node.getValue() != null) {
+          // Immediately push the current value to the new subscriber, the 
client will instantly be
+          // able to get the initial data
+          item.setValue(node.getValue());
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Failed to send initial value to new subscription, 
nodeId={}", nodeId, e);
+      }
+    }
   }
 
   @Override
   public void onDataItemsModified(final List<DataItem> dataItems) {
-    subscriptionModel.onDataItemsModified(dataItems);
+    // Push mode, client modifies subscription parameters (e.g. sampling 
interval) has no effect on
+    // our active push, no additional processing is needed
   }
 
   @Override
   public void onDataItemsDeleted(final List<DataItem> dataItems) {
-    subscriptionModel.onDataItemsDeleted(dataItems);
+    for (DataItem item : dataItems) {
+      final ReadValueId readValueId = item.getReadValueId();
+      if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
+        continue;
+      }
+      final NodeId nodeId = readValueId.getNodeId();
+
+      // When the client cancels the subscription, remove this subscription 
item from the mapping
+      nodeSubscriptions.computeIfPresent(
+          nodeId,
+          (k, existingList) -> {
+            existingList.remove(item);
+            // Automatically clean up the key when there are no subscribers, 
save memory
+            return existingList.isEmpty() ? null : existingList;
+          });
+    }
   }
 
   @Override
   public void onMonitoringModeChanged(final List<MonitoredItem> 
monitoredItems) {
-    subscriptionModel.onMonitoringModeChanged(monitoredItems);
+    // Push mode, monitoring mode change has no effect on active push, no 
additional processing is
+    // needed
   }
 
   /////////////////////////////// Conflict detection 
///////////////////////////////
@@ -573,8 +697,14 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       final String password,
       final String securityDir,
       final boolean enableAnonymousAccess,
-      final Set<SecurityPolicy> securityPolicies) {
+      final Set<SecurityPolicy> securityPolicies,
+      final long debounceTimeMs) {
     builder.checkEquals(
-        user, password, Paths.get(securityDir), enableAnonymousAccess, 
securityPolicies);
+        user,
+        password,
+        Paths.get(securityDir),
+        enableAnonymousAccess,
+        securityPolicies,
+        debounceTimeMs);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
index f029031b617..281d6eae77e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
@@ -86,6 +86,7 @@ public class OpcUaServerBuilder implements Closeable {
   private boolean enableAnonymousAccess;
   private Set<SecurityPolicy> securityPolicies;
   private DefaultTrustListManager trustListManager;
+  private long debounceTimeMs;
 
   public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
     this.tcpBindPort = tcpBindPort;
@@ -123,6 +124,15 @@ public class OpcUaServerBuilder implements Closeable {
     return this;
   }
 
+  public OpcUaServerBuilder setDebounceTimeMs(long debounceTimeMs) {
+    this.debounceTimeMs = debounceTimeMs;
+    return this;
+  }
+
+  public long getDebounceTimeMs() {
+    return debounceTimeMs;
+  }
+
   public OpcUaServer build() throws Exception {
     Files.createDirectories(securityDir);
     if (!Files.exists(securityDir)) {
@@ -314,7 +324,8 @@ public class OpcUaServerBuilder implements Closeable {
       final String password,
       final Path securityDir,
       final boolean enableAnonymousAccess,
-      final Set<SecurityPolicy> securityPolicies) {
+      final Set<SecurityPolicy> securityPolicies,
+      final long debounceTimeMs) {
     checkEquals("user", this.user, user);
     checkEquals("password", this.password, password);
     checkEquals(
@@ -323,6 +334,7 @@ public class OpcUaServerBuilder implements Closeable {
         
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
     checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, 
enableAnonymousAccess);
     checkEquals("securityPolicies", this.securityPolicies, securityPolicies);
+    checkEquals("debounceTimeMs", this.debounceTimeMs, debounceTimeMs);
   }
 
   private void checkEquals(final String attrName, Object thisAttr, Object 
thatAttr) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index 7e1e38e1490..e6f14911e66 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -242,6 +242,11 @@ public class PipeSinkConstant {
       "connector.opcua.timeout-seconds";
   public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE = 
10L;
 
+  public static final String CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY =
+      "connector.opcua.debounce-time-ms";
+  public static final String SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY = 
"sink.opcua.debounce-time-ms";
+  public static final long CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE = 
50L;
+
   public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = 
"connector.leader-cache.enable";
   public static final String SINK_LEADER_CACHE_ENABLE_KEY = 
"sink.leader-cache.enable";
   public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = 
true;

Reply via email to