This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch opc-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/opc-fix by this push:
new b0b689c317f Debounce
b0b689c317f is described below
commit b0b689c317f1a0b8f591eb009f6a94eefee21c41
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 20 18:00:24 2026 +0800
Debounce
---
.../db/pipe/sink/protocol/opcua/OpcUaSink.java | 13 +++-
.../sink/protocol/opcua/server/OpcUaNameSpace.java | 76 +++++++++++++++++-----
.../protocol/opcua/server/OpcUaServerBuilder.java | 14 +++-
.../pipe/config/constant/PipeSinkConstant.java | 5 ++
4 files changed, 89 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index e1985061224..79a5ecb434e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -63,6 +63,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY;
@@ -101,6 +103,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY;
@@ -278,6 +281,10 @@ public class OpcUaSink implements PipeConnector {
if (securityPolicies.isEmpty()) {
throw new PipeException("The security policy cannot be empty.");
}
+ final long debounceTimeMs =
+ parameters.getLongOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY,
SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY),
+ CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE);
synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
serverKey = httpsBindPort + ":" + tcpBindPort;
@@ -297,7 +304,8 @@ public class OpcUaSink implements PipeConnector {
.setPassword(password)
.setSecurityDir(securityDir)
.setEnableAnonymousAccess(enableAnonymousAccess)
- .setSecurityPolicies(securityPolicies);
+ .setSecurityPolicies(securityPolicies)
+ .setDebounceTimeMs(debounceTimeMs);
final OpcUaServer newServer = builder.build();
nameSpace = new OpcUaNameSpace(newServer, builder);
nameSpace.startup();
@@ -311,7 +319,8 @@ public class OpcUaSink implements PipeConnector {
password,
securityDir,
enableAnonymousAccess,
- securityPolicies);
+ securityPolicies,
+ debounceTimeMs);
return oldValue;
}
} catch (final PipeException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 4a1a3aead1f..3a42ec87969 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -72,6 +72,8 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
private static final Logger LOGGER =
LoggerFactory.getLogger(OpcUaNameSpace.class);
@@ -81,9 +83,18 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
// Do not use subscription model because the original subscription model has
some bugs
private final ConcurrentMap<NodeId, List<DataItem>> nodeSubscriptions = new
ConcurrentHashMap<>();
+ // Debounce task cache: used to merge updates within a short period of time,
avoiding unnecessary
+ // duplicate pushes
+ private final ConcurrentMap<NodeId, ScheduledFuture<?>> debounceTasks = new
ConcurrentHashMap<>();
+ // Debounce interval: within 10ms, the same node is updated multiple times,
and only the last one
+ // will be pushed (can be adjusted according to your site delay
requirements, the minimum can be
+ // set to 1ms)
+ private final long debounceIntervalMs;
+
public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder
builder) {
super(server, NAMESPACE_URI);
this.builder = builder;
+ debounceIntervalMs = builder.getDebounceTimeMs();
getLifecycleManager()
.addLifecycle(
@@ -468,24 +479,51 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
*/
public void notifyNodeValueChange(
NodeId nodeId, DataValue newValue, UaVariableNode variableNode) {
- // 1. Update the local value of the node first, to ensure that the latest
value can be obtained
- // directly when the client calls the Read service
+ // 1. Update the local cached value of the node
variableNode.setValue(newValue);
- // 2. Proactively push the change to all subscribed clients
+ // 2. If there are no subscribers, return directly without doing any extra
operations
List<DataItem> subscribedItems = nodeSubscriptions.get(nodeId);
- if (subscribedItems != null && !subscribedItems.isEmpty()) {
- for (DataItem item : subscribedItems) {
- try {
- // Proactively push, the client will immediately receive the change
notification, no need
- // to wait for polling
- item.setValue(newValue);
- } catch (Exception e) {
- // Single client push failure does not affect other clients, just
log it
- LOGGER.warn("Failed to push value change to subscription client,
nodeId={}", nodeId, e);
- }
- }
+ if (subscribedItems == null || subscribedItems.isEmpty()) {
+ return;
}
+
+ // 2. Debounce+Async Push: Asynchronously push the expensive push
operation, while merging
+ // high-frequency repeated updates
+ debounceTasks.compute(
+ nodeId,
+ (k, oldTask) -> {
+ // If there is already a pending push task, cancel it, we only need
the latest value
+ if (oldTask != null && !oldTask.isDone()) {
+ oldTask.cancel(false);
+ }
+
+ // Submit the push task to the Milo's scheduled thread pool, delay
DEBOUNCE_INTERVAL_MS
+ // execution
+ return getServer()
+ .getScheduledExecutorService()
+ .schedule(
+ () -> {
+ try {
+ // Batch push changes to all subscribers, this
time-consuming operation is put
+ // into the thread pool, not blocking your data update
thread
+ for (DataItem item : subscribedItems) {
+ try {
+ item.setValue(newValue);
+ } catch (Exception e) {
+ // Single client push failure does not affect other
clients
+ LOGGER.warn(
+ "Failed to push value change to client,
nodeId={}", nodeId, e);
+ }
+ }
+ } finally {
+ // Task execution completed, clean up the debounce cache
+ debounceTasks.remove(nodeId);
+ }
+ },
+ debounceIntervalMs,
+ TimeUnit.MILLISECONDS);
+ });
}
@Override
@@ -564,8 +602,14 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
final String password,
final String securityDir,
final boolean enableAnonymousAccess,
- final Set<SecurityPolicy> securityPolicies) {
+ final Set<SecurityPolicy> securityPolicies,
+ final long debounceTimeMs) {
builder.checkEquals(
- user, password, Paths.get(securityDir), enableAnonymousAccess,
securityPolicies);
+ user,
+ password,
+ Paths.get(securityDir),
+ enableAnonymousAccess,
+ securityPolicies,
+ debounceTimeMs);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
index f029031b617..281d6eae77e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
@@ -86,6 +86,7 @@ public class OpcUaServerBuilder implements Closeable {
private boolean enableAnonymousAccess;
private Set<SecurityPolicy> securityPolicies;
private DefaultTrustListManager trustListManager;
+ private long debounceTimeMs;
public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
this.tcpBindPort = tcpBindPort;
@@ -123,6 +124,15 @@ public class OpcUaServerBuilder implements Closeable {
return this;
}
+ public OpcUaServerBuilder setDebounceTimeMs(long debounceTimeMs) {
+ this.debounceTimeMs = debounceTimeMs;
+ return this;
+ }
+
+ public long getDebounceTimeMs() {
+ return debounceTimeMs;
+ }
+
public OpcUaServer build() throws Exception {
Files.createDirectories(securityDir);
if (!Files.exists(securityDir)) {
@@ -314,7 +324,8 @@ public class OpcUaServerBuilder implements Closeable {
final String password,
final Path securityDir,
final boolean enableAnonymousAccess,
- final Set<SecurityPolicy> securityPolicies) {
+ final Set<SecurityPolicy> securityPolicies,
+ final long debounceTimeMs) {
checkEquals("user", this.user, user);
checkEquals("password", this.password, password);
checkEquals(
@@ -323,6 +334,7 @@ public class OpcUaServerBuilder implements Closeable {
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess,
enableAnonymousAccess);
checkEquals("securityPolicies", this.securityPolicies, securityPolicies);
+ checkEquals("debounceTimeMs", this.debounceTimeMs, debounceTimeMs);
}
private void checkEquals(final String attrName, Object thisAttr, Object
thatAttr) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index 2eaf6f903de..e04f2578daa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -231,6 +231,11 @@ public class PipeSinkConstant {
"connector.opcua.timeout-seconds";
public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE =
10L;
+ public static final String CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY =
+ "connector.opcua.debounce-time-ms";
+ public static final String SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY =
"sink.opcua.debounce-time-ms";
+ public static final long CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE =
50L;
+
public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY =
"connector.leader-cache.enable";
public static final String SINK_LEADER_CACHE_ENABLE_KEY =
"sink.leader-cache.enable";
public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE =
true;