This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch opc-fix-master
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 099c42f0802f01c45c380ab7d107fcb40a981bb3
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 20 18:00:24 2026 +0800

    Debounce
---
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     | 13 +++-
 .../sink/protocol/opcua/server/OpcUaNameSpace.java | 76 +++++++++++++++++-----
 .../protocol/opcua/server/OpcUaServerBuilder.java  | 14 +++-
 .../pipe/config/constant/PipeSinkConstant.java     |  5 ++
 4 files changed, 89 insertions(+), 19 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 532ebf60159..b5f383619e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -69,6 +69,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY;
@@ -109,6 +111,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY;
@@ -308,6 +311,10 @@ public class OpcUaSink implements PipeConnector {
     if (securityPolicies.isEmpty()) {
       throw new PipeException("The security policy cannot be empty.");
     }
+    final long debounceTimeMs =
+        parameters.getLongOrDefault(
+            Arrays.asList(CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY, 
SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY),
+            CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE);
 
     synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
       serverKey = httpsBindPort + ":" + tcpBindPort;
@@ -327,7 +334,8 @@ public class OpcUaSink implements PipeConnector {
                                 .setPassword(password)
                                 .setSecurityDir(securityDir)
                                 
.setEnableAnonymousAccess(enableAnonymousAccess)
-                                .setSecurityPolicies(securityPolicies);
+                                .setSecurityPolicies(securityPolicies)
+                                .setDebounceTimeMs(debounceTimeMs);
                         final OpcUaServer newServer = builder.build();
                         nameSpace = new OpcUaNameSpace(newServer, builder);
                         nameSpace.startup();
@@ -341,7 +349,8 @@ public class OpcUaSink implements PipeConnector {
                                 password,
                                 securityDir,
                                 enableAnonymousAccess,
-                                securityPolicies);
+                                securityPolicies,
+                                debounceTimeMs);
                         return oldValue;
                       }
                     } catch (final PipeException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index e1ff701f088..d1182e59b52 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -75,6 +75,8 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
@@ -85,9 +87,18 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
   // Do not use subscription model because the original subscription model has 
some bugs
   private final ConcurrentMap<NodeId, List<DataItem>> nodeSubscriptions = new 
ConcurrentHashMap<>();
 
+  // Debounce task cache: used to merge updates within a short period of time, 
avoiding unnecessary
+  // duplicate pushes
+  private final ConcurrentMap<NodeId, ScheduledFuture<?>> debounceTasks = new 
ConcurrentHashMap<>();
+  // Debounce interval: within 10ms, the same node is updated multiple times, 
and only the last one
+  // will be pushed (can be adjusted according to your site delay 
requirements, the minimum can be
+  // set to 1ms)
+  private final long debounceIntervalMs;
+
   public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder 
builder) {
     super(server, NAMESPACE_URI);
     this.builder = builder;
+    debounceIntervalMs = builder.getDebounceTimeMs();
 
     getLifecycleManager()
         .addLifecycle(
@@ -563,24 +574,51 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
    */
   public void notifyNodeValueChange(
       NodeId nodeId, DataValue newValue, UaVariableNode variableNode) {
-    // 1. Update the local value of the node first, to ensure that the latest 
value can be obtained
-    // directly when the client calls the Read service
+    // 1. Update the local cached value of the node
     variableNode.setValue(newValue);
 
-    // 2. Proactively push the change to all subscribed clients
+    // 2. If there are no subscribers, return directly without doing any extra 
operations
     List<DataItem> subscribedItems = nodeSubscriptions.get(nodeId);
-    if (subscribedItems != null && !subscribedItems.isEmpty()) {
-      for (DataItem item : subscribedItems) {
-        try {
-          // Proactively push, the client will immediately receive the change 
notification, no need
-          // to wait for polling
-          item.setValue(newValue);
-        } catch (Exception e) {
-          // Single client push failure does not affect other clients, just 
log it
-          LOGGER.warn("Failed to push value change to subscription client, 
nodeId={}", nodeId, e);
-        }
-      }
+    if (subscribedItems == null || subscribedItems.isEmpty()) {
+      return;
     }
+
+    // 2. Debounce+Async Push: Asynchronously push the expensive push 
operation, while merging
+    // high-frequency repeated updates
+    debounceTasks.compute(
+        nodeId,
+        (k, oldTask) -> {
+          // If there is already a pending push task, cancel it, we only need 
the latest value
+          if (oldTask != null && !oldTask.isDone()) {
+            oldTask.cancel(false);
+          }
+
+          // Submit the push task to the Milo's scheduled thread pool, delay 
DEBOUNCE_INTERVAL_MS
+          // execution
+          return getServer()
+              .getScheduledExecutorService()
+              .schedule(
+                  () -> {
+                    try {
+                      // Batch push changes to all subscribers, this 
time-consuming operation is put
+                      // into the thread pool, not blocking your data update 
thread
+                      for (DataItem item : subscribedItems) {
+                        try {
+                          item.setValue(newValue);
+                        } catch (Exception e) {
+                          // Single client push failure does not affect other 
clients
+                          LOGGER.warn(
+                              "Failed to push value change to client, 
nodeId={}", nodeId, e);
+                        }
+                      }
+                    } finally {
+                      // Task execution completed, clean up the debounce cache
+                      debounceTasks.remove(nodeId);
+                    }
+                  },
+                  debounceIntervalMs,
+                  TimeUnit.MILLISECONDS);
+        });
   }
 
   @Override
@@ -659,8 +697,14 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       final String password,
       final String securityDir,
       final boolean enableAnonymousAccess,
-      final Set<SecurityPolicy> securityPolicies) {
+      final Set<SecurityPolicy> securityPolicies,
+      final long debounceTimeMs) {
     builder.checkEquals(
-        user, password, Paths.get(securityDir), enableAnonymousAccess, 
securityPolicies);
+        user,
+        password,
+        Paths.get(securityDir),
+        enableAnonymousAccess,
+        securityPolicies,
+        debounceTimeMs);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
index f029031b617..281d6eae77e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
@@ -86,6 +86,7 @@ public class OpcUaServerBuilder implements Closeable {
   private boolean enableAnonymousAccess;
   private Set<SecurityPolicy> securityPolicies;
   private DefaultTrustListManager trustListManager;
+  private long debounceTimeMs;
 
   public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
     this.tcpBindPort = tcpBindPort;
@@ -123,6 +124,15 @@ public class OpcUaServerBuilder implements Closeable {
     return this;
   }
 
+  public OpcUaServerBuilder setDebounceTimeMs(long debounceTimeMs) {
+    this.debounceTimeMs = debounceTimeMs;
+    return this;
+  }
+
+  public long getDebounceTimeMs() {
+    return debounceTimeMs;
+  }
+
   public OpcUaServer build() throws Exception {
     Files.createDirectories(securityDir);
     if (!Files.exists(securityDir)) {
@@ -314,7 +324,8 @@ public class OpcUaServerBuilder implements Closeable {
       final String password,
       final Path securityDir,
       final boolean enableAnonymousAccess,
-      final Set<SecurityPolicy> securityPolicies) {
+      final Set<SecurityPolicy> securityPolicies,
+      final long debounceTimeMs) {
     checkEquals("user", this.user, user);
     checkEquals("password", this.password, password);
     checkEquals(
@@ -323,6 +334,7 @@ public class OpcUaServerBuilder implements Closeable {
         
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
     checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, 
enableAnonymousAccess);
     checkEquals("securityPolicies", this.securityPolicies, securityPolicies);
+    checkEquals("debounceTimeMs", this.debounceTimeMs, debounceTimeMs);
   }
 
   private void checkEquals(final String attrName, Object thisAttr, Object 
thatAttr) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index 7e1e38e1490..e6f14911e66 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -242,6 +242,11 @@ public class PipeSinkConstant {
       "connector.opcua.timeout-seconds";
   public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE = 
10L;
 
+  public static final String CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY =
+      "connector.opcua.debounce-time-ms";
+  public static final String SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY = 
"sink.opcua.debounce-time-ms";
+  public static final long CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE = 
50L;
+
   public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = 
"connector.leader-cache.enable";
   public static final String SINK_LEADER_CACHE_ENABLE_KEY = 
"sink.leader-cache.enable";
   public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = 
true;

Reply via email to