This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4e06946fac0 Pipe: Rewrote the OPC UA subscription logic to avoid the
bug of third-party subscription model (#17525)
4e06946fac0 is described below
commit 4e06946fac0cc54d47f5487b2a3b08180f46f2be
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 21 16:46:06 2026 +0800
Pipe: Rewrote the OPC UA subscription logic to avoid the bug of third-party
subscription model (#17525)
* complete
* Debounce
* if
---
.../treemodel/auto/basic/IoTDBPipeAutoSplitIT.java | 4 +-
.../db/pipe/sink/protocol/opcua/OpcUaSink.java | 13 +-
.../sink/protocol/opcua/server/OpcUaNameSpace.java | 156 +++++++++++++++++++--
.../protocol/opcua/server/OpcUaServerBuilder.java | 14 +-
.../pipe/config/constant/PipeSinkConstant.java | 5 +
5 files changed, 174 insertions(+), 18 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
index 2ebad93348c..5be609aba2f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
@@ -111,8 +111,8 @@ public class IoTDBPipeAutoSplitIT extends
AbstractPipeDualTreeModelAutoIT {
TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
- "drop pipe a2b_history",
- "drop pipe a2b_realtime",
+ "drop pipe if exists a2b_history",
+ "drop pipe if exists a2b_realtime",
String.format(
"create pipe a2b1 with source ('inclusion'='schema') with sink
('node-urls'='%s')",
receiverDataNode.getIpAndPortString()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 532ebf60159..b5f383619e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -69,6 +69,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY;
@@ -109,6 +111,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY;
@@ -308,6 +311,10 @@ public class OpcUaSink implements PipeConnector {
if (securityPolicies.isEmpty()) {
throw new PipeException("The security policy cannot be empty.");
}
+ final long debounceTimeMs =
+ parameters.getLongOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY,
SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY),
+ CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE);
synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
serverKey = httpsBindPort + ":" + tcpBindPort;
@@ -327,7 +334,8 @@ public class OpcUaSink implements PipeConnector {
.setPassword(password)
.setSecurityDir(securityDir)
.setEnableAnonymousAccess(enableAnonymousAccess)
- .setSecurityPolicies(securityPolicies);
+ .setSecurityPolicies(securityPolicies)
+ .setDebounceTimeMs(debounceTimeMs);
final OpcUaServer newServer = builder.build();
nameSpace = new OpcUaNameSpace(newServer, builder);
nameSpace.startup();
@@ -341,7 +349,8 @@ public class OpcUaSink implements PipeConnector {
password,
securityDir,
enableAnonymousAccess,
- securityPolicies);
+ securityPolicies,
+ debounceTimeMs);
return oldValue;
}
} catch (final PipeException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 3c79a3aa304..d1182e59b52 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -47,7 +47,7 @@ import
org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode;
import org.eclipse.milo.opcua.sdk.server.nodes.UaFolderNode;
import org.eclipse.milo.opcua.sdk.server.nodes.UaNode;
import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode;
-import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
@@ -57,6 +57,7 @@ import
org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,20 +72,34 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
private static final Logger LOGGER =
LoggerFactory.getLogger(OpcUaNameSpace.class);
public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
- private final SubscriptionModel subscriptionModel;
private final OpcUaServerBuilder builder;
+ // Do not use subscription model because the original subscription model has
some bugs
+ private final ConcurrentMap<NodeId, List<DataItem>> nodeSubscriptions = new
ConcurrentHashMap<>();
+
+ // Debounce task cache: used to merge updates within a short period of time,
avoiding unnecessary
+ // duplicate pushes
+ private final ConcurrentMap<NodeId, ScheduledFuture<?>> debounceTasks = new
ConcurrentHashMap<>();
+ // Debounce interval: within 10ms, the same node is updated multiple times,
and only the last one
+ // will be pushed (can be adjusted according to your site delay
requirements, the minimum can be
+ // set to 1ms)
+ private final long debounceIntervalMs;
+
public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder
builder) {
super(server, NAMESPACE_URI);
this.builder = builder;
+ debounceIntervalMs = builder.getDebounceTimeMs();
- subscriptionModel = new SubscriptionModel(server, this);
- getLifecycleManager().addLifecycle(subscriptionModel);
getLifecycleManager()
.addLifecycle(
new Lifecycle() {
@@ -291,7 +306,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
if (Objects.isNull(measurementNode.getValue())
|| Objects.isNull(measurementNode.getValue().getSourceTime())
|| measurementNode.getValue().getSourceTime().getUtcTime() <
utcTimestamp) {
- measurementNode.setValue(dataValue);
+ notifyNodeValueChange(measurementNode.getNodeId(), dataValue,
measurementNode);
}
} else {
value = values.get(i);
@@ -311,9 +326,11 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
if (Objects.isNull(valueNode.getValue())
|| Objects.isNull(valueNode.getValue().getSourceTime())
|| valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
- valueNode.setValue(
+ notifyNodeValueChange(
+ valueNode.getNodeId(),
new DataValue(
- new Variant(value), currentQuality, new DateTime(timestamp),
new DateTime()));
+ new Variant(value), currentQuality, new DateTime(timestamp),
new DateTime()),
+ valueNode);
}
}
}
@@ -546,24 +563,131 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
}
}
+ /**
+ * On point value changing, notify all subscribed clients proactively
+ *
+ * @param nodeId NodeId of the changing node
+ * @param newValue New value of the node (DataValue object containing value,
status code, and
+ * timestamp)
+ * @param variableNode Corresponding UaVariableNode instance, used to update
the local cached
+ * value of the node
+ */
+ public void notifyNodeValueChange(
+ NodeId nodeId, DataValue newValue, UaVariableNode variableNode) {
+ // 1. Update the local cached value of the node
+ variableNode.setValue(newValue);
+
+ // 2. If there are no subscribers, return directly without doing any extra
operations
+ List<DataItem> subscribedItems = nodeSubscriptions.get(nodeId);
+ if (subscribedItems == null || subscribedItems.isEmpty()) {
+ return;
+ }
+
+ // 2. Debounce+Async Push: Asynchronously push the expensive push
operation, while merging
+ // high-frequency repeated updates
+ debounceTasks.compute(
+ nodeId,
+ (k, oldTask) -> {
+ // If there is already a pending push task, cancel it, we only need
the latest value
+ if (oldTask != null && !oldTask.isDone()) {
+ oldTask.cancel(false);
+ }
+
+ // Submit the push task to the Milo's scheduled thread pool, delay
DEBOUNCE_INTERVAL_MS
+ // execution
+ return getServer()
+ .getScheduledExecutorService()
+ .schedule(
+ () -> {
+ try {
+ // Batch push changes to all subscribers, this
time-consuming operation is put
+ // into the thread pool, not blocking your data update
thread
+ for (DataItem item : subscribedItems) {
+ try {
+ item.setValue(newValue);
+ } catch (Exception e) {
+ // Single client push failure does not affect other
clients
+ LOGGER.warn(
+ "Failed to push value change to client,
nodeId={}", nodeId, e);
+ }
+ }
+ } finally {
+ // Task execution completed, clean up the debounce cache
+ debounceTasks.remove(nodeId);
+ }
+ },
+ debounceIntervalMs,
+ TimeUnit.MILLISECONDS);
+ });
+ }
+
@Override
public void onDataItemsCreated(final List<DataItem> dataItems) {
- subscriptionModel.onDataItemsCreated(dataItems);
+ for (DataItem item : dataItems) {
+ final ReadValueId readValueId = item.getReadValueId();
+ // Only handle Value attribute subscription (align with the original
SubscriptionModel logic,
+ // ignore other attribute subscriptions)
+ if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
+ continue;
+ }
+ final NodeId nodeId = readValueId.getNodeId();
+
+ // 1. Add the new subscription item to the subscription mapping
+ nodeSubscriptions.compute(
+ nodeId,
+ (k, existingList) -> {
+ List<DataItem> list =
+ existingList != null ? existingList : new
CopyOnWriteArrayList<>();
+ list.add(item);
+ return list;
+ });
+
+ // 2. 【Key Optimization】Proactively push the current node's initial
value when the new
+ // subscription item is created
+ // Eliminate Bad_WaitingForInitialData, no need to wait for any polling
+ try {
+ UaVariableNode node = (UaVariableNode)
getNodeManager().getNode(nodeId).orElse(null);
+ if (node != null && node.getValue() != null) {
+ // Immediately push the current value to the new subscriber, the
client will instantly be
+ // able to get the initial data
+ item.setValue(node.getValue());
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to send initial value to new subscription,
nodeId={}", nodeId, e);
+ }
+ }
}
@Override
public void onDataItemsModified(final List<DataItem> dataItems) {
- subscriptionModel.onDataItemsModified(dataItems);
+ // Push mode, client modifies subscription parameters (e.g. sampling
interval) has no effect on
+ // our active push, no additional processing is needed
}
@Override
public void onDataItemsDeleted(final List<DataItem> dataItems) {
- subscriptionModel.onDataItemsDeleted(dataItems);
+ for (DataItem item : dataItems) {
+ final ReadValueId readValueId = item.getReadValueId();
+ if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
+ continue;
+ }
+ final NodeId nodeId = readValueId.getNodeId();
+
+ // When the client cancels the subscription, remove this subscription
item from the mapping
+ nodeSubscriptions.computeIfPresent(
+ nodeId,
+ (k, existingList) -> {
+ existingList.remove(item);
+ // Automatically clean up the key when there are no subscribers,
save memory
+ return existingList.isEmpty() ? null : existingList;
+ });
+ }
}
@Override
public void onMonitoringModeChanged(final List<MonitoredItem>
monitoredItems) {
- subscriptionModel.onMonitoringModeChanged(monitoredItems);
+ // Push mode, monitoring mode change has no effect on active push, no
additional processing is
+ // needed
}
/////////////////////////////// Conflict detection
///////////////////////////////
@@ -573,8 +697,14 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
final String password,
final String securityDir,
final boolean enableAnonymousAccess,
- final Set<SecurityPolicy> securityPolicies) {
+ final Set<SecurityPolicy> securityPolicies,
+ final long debounceTimeMs) {
builder.checkEquals(
- user, password, Paths.get(securityDir), enableAnonymousAccess,
securityPolicies);
+ user,
+ password,
+ Paths.get(securityDir),
+ enableAnonymousAccess,
+ securityPolicies,
+ debounceTimeMs);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
index f029031b617..281d6eae77e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
@@ -86,6 +86,7 @@ public class OpcUaServerBuilder implements Closeable {
private boolean enableAnonymousAccess;
private Set<SecurityPolicy> securityPolicies;
private DefaultTrustListManager trustListManager;
+ private long debounceTimeMs;
public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
this.tcpBindPort = tcpBindPort;
@@ -123,6 +124,15 @@ public class OpcUaServerBuilder implements Closeable {
return this;
}
+ public OpcUaServerBuilder setDebounceTimeMs(long debounceTimeMs) {
+ this.debounceTimeMs = debounceTimeMs;
+ return this;
+ }
+
+ public long getDebounceTimeMs() {
+ return debounceTimeMs;
+ }
+
public OpcUaServer build() throws Exception {
Files.createDirectories(securityDir);
if (!Files.exists(securityDir)) {
@@ -314,7 +324,8 @@ public class OpcUaServerBuilder implements Closeable {
final String password,
final Path securityDir,
final boolean enableAnonymousAccess,
- final Set<SecurityPolicy> securityPolicies) {
+ final Set<SecurityPolicy> securityPolicies,
+ final long debounceTimeMs) {
checkEquals("user", this.user, user);
checkEquals("password", this.password, password);
checkEquals(
@@ -323,6 +334,7 @@ public class OpcUaServerBuilder implements Closeable {
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess,
enableAnonymousAccess);
checkEquals("securityPolicies", this.securityPolicies, securityPolicies);
+ checkEquals("debounceTimeMs", this.debounceTimeMs, debounceTimeMs);
}
private void checkEquals(final String attrName, Object thisAttr, Object
thatAttr) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index 7e1e38e1490..e6f14911e66 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -242,6 +242,11 @@ public class PipeSinkConstant {
"connector.opcua.timeout-seconds";
public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE =
10L;
+ public static final String CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY =
+ "connector.opcua.debounce-time-ms";
+ public static final String SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY =
"sink.opcua.debounce-time-ms";
+ public static final long CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE =
50L;
+
public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY =
"connector.leader-cache.enable";
public static final String SINK_LEADER_CACHE_ENABLE_KEY =
"sink.leader-cache.enable";
public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE =
true;