This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch opc-fix-master in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 099c42f0802f01c45c380ab7d107fcb40a981bb3 Author: Caideyipi <[email protected]> AuthorDate: Mon Apr 20 18:00:24 2026 +0800 Debounce --- .../db/pipe/sink/protocol/opcua/OpcUaSink.java | 13 +++- .../sink/protocol/opcua/server/OpcUaNameSpace.java | 76 +++++++++++++++++----- .../protocol/opcua/server/OpcUaServerBuilder.java | 14 +++- .../pipe/config/constant/PipeSinkConstant.java | 5 ++ 4 files changed, 89 insertions(+), 19 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java index 532ebf60159..b5f383619e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java @@ -69,6 +69,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY; @@ -109,6 +111,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY; @@ -308,6 +311,10 @@ public class OpcUaSink implements PipeConnector { if (securityPolicies.isEmpty()) { throw new PipeException("The security policy cannot be empty."); } + final long debounceTimeMs = + parameters.getLongOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY, SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY), + CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE); synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) { serverKey = httpsBindPort + ":" + tcpBindPort; @@ -327,7 +334,8 @@ public class OpcUaSink implements PipeConnector { .setPassword(password) .setSecurityDir(securityDir) .setEnableAnonymousAccess(enableAnonymousAccess) - .setSecurityPolicies(securityPolicies); + .setSecurityPolicies(securityPolicies) + .setDebounceTimeMs(debounceTimeMs); final OpcUaServer newServer = builder.build(); nameSpace = new OpcUaNameSpace(newServer, builder); nameSpace.startup(); @@ -341,7 +349,8 @@ public class OpcUaSink implements PipeConnector { password, securityDir, enableAnonymousAccess, - securityPolicies); + securityPolicies, + debounceTimeMs); return oldValue; } } catch (final PipeException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java index e1ff701f088..d1182e59b52 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java @@ -75,6 +75,8 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { @@ -85,9 +87,18 @@ public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { // Do not use subscription model because the original subscription model has some bugs private final ConcurrentMap<NodeId, List<DataItem>> nodeSubscriptions = new ConcurrentHashMap<>(); + // Debounce task cache: used to merge updates within a short period of time, avoiding unnecessary + // duplicate pushes + private final ConcurrentMap<NodeId, ScheduledFuture<?>> debounceTasks = new ConcurrentHashMap<>(); + // Debounce interval: within 10ms, the same node is updated multiple times, and only the last one + // will be pushed (can be adjusted according to your site delay requirements, the minimum can be + // set to 1ms) + private final long debounceIntervalMs; + public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) { super(server, NAMESPACE_URI); this.builder = builder; + debounceIntervalMs = builder.getDebounceTimeMs(); getLifecycleManager() .addLifecycle( @@ -563,24 +574,51 @@ public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { */ public void notifyNodeValueChange( NodeId nodeId, DataValue newValue, UaVariableNode variableNode) { - // 1. Update the local value of the node first, to ensure that the latest value can be obtained - // directly when the client calls the Read service + // 1. Update the local cached value of the node variableNode.setValue(newValue); - // 2. Proactively push the change to all subscribed clients + // 2. If there are no subscribers, return directly without doing any extra operations List<DataItem> subscribedItems = nodeSubscriptions.get(nodeId); - if (subscribedItems != null && !subscribedItems.isEmpty()) { - for (DataItem item : subscribedItems) { - try { - // Proactively push, the client will immediately receive the change notification, no need - // to wait for polling - item.setValue(newValue); - } catch (Exception e) { - // Single client push failure does not affect other clients, just log it - LOGGER.warn("Failed to push value change to subscription client, nodeId={}", nodeId, e); - } - } + if (subscribedItems == null || subscribedItems.isEmpty()) { + return; } + + // 2. Debounce+Async Push: Asynchronously push the expensive push operation, while merging + // high-frequency repeated updates + debounceTasks.compute( + nodeId, + (k, oldTask) -> { + // If there is already a pending push task, cancel it, we only need the latest value + if (oldTask != null && !oldTask.isDone()) { + oldTask.cancel(false); + } + + // Submit the push task to the Milo's scheduled thread pool, delay DEBOUNCE_INTERVAL_MS + // execution + return getServer() + .getScheduledExecutorService() + .schedule( + () -> { + try { + // Batch push changes to all subscribers, this time-consuming operation is put + // into the thread pool, not blocking your data update thread + for (DataItem item : subscribedItems) { + try { + item.setValue(newValue); + } catch (Exception e) { + // Single client push failure does not affect other clients + LOGGER.warn( + "Failed to push value change to client, nodeId={}", nodeId, e); + } + } + } finally { + // Task execution completed, clean up the debounce cache + debounceTasks.remove(nodeId); + } + }, + debounceIntervalMs, + TimeUnit.MILLISECONDS); + }); } @Override @@ -659,8 +697,14 @@ public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { final String password, final String securityDir, final boolean enableAnonymousAccess, - final Set<SecurityPolicy> securityPolicies) { + final Set<SecurityPolicy> securityPolicies, + final long debounceTimeMs) { builder.checkEquals( - user, password, Paths.get(securityDir), enableAnonymousAccess, securityPolicies); + user, + password, + Paths.get(securityDir), + enableAnonymousAccess, + securityPolicies, + debounceTimeMs); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java index f029031b617..281d6eae77e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java @@ -86,6 +86,7 @@ public class OpcUaServerBuilder implements Closeable { private boolean enableAnonymousAccess; private Set<SecurityPolicy> securityPolicies; private DefaultTrustListManager trustListManager; + private long debounceTimeMs; public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) { this.tcpBindPort = tcpBindPort; @@ -123,6 +124,15 @@ public class OpcUaServerBuilder implements Closeable { return this; } + public OpcUaServerBuilder setDebounceTimeMs(long debounceTimeMs) { + this.debounceTimeMs = debounceTimeMs; + return this; + } + + public long getDebounceTimeMs() { + return debounceTimeMs; + } + public OpcUaServer build() throws Exception { Files.createDirectories(securityDir); if (!Files.exists(securityDir)) { @@ -314,7 +324,8 @@ public class OpcUaServerBuilder implements Closeable { final String password, final Path securityDir, final boolean enableAnonymousAccess, - final Set<SecurityPolicy> securityPolicies) { + final Set<SecurityPolicy> securityPolicies, + final long debounceTimeMs) { checkEquals("user", this.user, user); checkEquals("password", this.password, password); checkEquals( @@ -323,6 +334,7 @@ public class OpcUaServerBuilder implements Closeable { FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString())); checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, enableAnonymousAccess); checkEquals("securityPolicies", this.securityPolicies, securityPolicies); + checkEquals("debounceTimeMs", this.debounceTimeMs, debounceTimeMs); } private void checkEquals(final String attrName, Object thisAttr, Object thatAttr) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java index 7e1e38e1490..e6f14911e66 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java @@ -242,6 +242,11 @@ public class PipeSinkConstant { "connector.opcua.timeout-seconds"; public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE = 10L; + public static final String CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY = + "connector.opcua.debounce-time-ms"; + public static final String SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY = "sink.opcua.debounce-time-ms"; + public static final long CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE = 50L; + public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = "connector.leader-cache.enable"; public static final String SINK_LEADER_CACHE_ENABLE_KEY = "sink.leader-cache.enable"; public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = true;
