This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch opc-many-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/opc-many-fix by this push:
new 0a1eedad213 fix
0a1eedad213 is described below
commit 0a1eedad213fb81a3c105bfe8990257e02a455f8
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jan 26 17:11:27 2026 +0800
fix
---
.../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java | 2 +-
.../db/pipe/sink/protocol/opcua/OpcUaSink.java | 79 +++++++++++++++++-----
.../sink/protocol/opcua/client/ClientRunner.java | 51 +++++++++++++-
.../protocol/opcua/client/IoTDBOpcUaClient.java | 34 +++++++++-
.../config/executor/ClusterConfigTaskExecutor.java | 23 +++----
.../pipe/config/constant/PipeSinkConstant.java | 5 ++
6 files changed, 161 insertions(+), 33 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 9ac2f4e17b7..6c89dabc63a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -345,7 +345,7 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
+
UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET));
client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false);
- new ClientRunner(client, securityDir, password).run();
+ new ClientRunner(client, securityDir, password, userName, 10).run();
return client.getClient();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 071db688dd8..554ac3b0df4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -100,6 +100,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE;
@@ -118,6 +120,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TIMEOUT_SECONDS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;
@@ -137,8 +140,11 @@ public class OpcUaSink implements PipeConnector {
private static final Map<String, Pair<AtomicInteger, OpcUaNameSpace>>
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new
ConcurrentHashMap<>();
+ private static final Map<String, Pair<AtomicInteger, IoTDBOpcUaClient>>
+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP = new ConcurrentHashMap<>();
private String serverKey;
+ private String nodeUrl;
private boolean isClientServerModel;
private String databaseName;
private String placeHolder4NullTag;
@@ -238,8 +244,7 @@ public class OpcUaSink implements PipeConnector {
"When the OPC UA sink sets 'with-quality' to true, the table model
data is not supported.");
}
- final String nodeUrl =
- parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY,
SINK_OPC_UA_NODE_URL_KEY);
+ nodeUrl = parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY,
SINK_OPC_UA_NODE_URL_KEY);
if (Objects.isNull(nodeUrl)) {
customizeServer(parameters);
} else {
@@ -247,7 +252,7 @@ public class OpcUaSink implements PipeConnector {
throw new PipeException(
"When the OPC UA sink points to an outer server, the table model
data is not supported.");
}
- customizeClient(nodeUrl, parameters);
+ customizeClient(parameters);
}
}
@@ -350,7 +355,7 @@ public class OpcUaSink implements PipeConnector {
}
}
- private void customizeClient(final String nodeUrl, final PipeParameters
parameters) {
+ private void customizeClient(final PipeParameters parameters) {
final SecurityPolicy policy =
getSecurityPolicy(
parameters
@@ -380,15 +385,39 @@ public class OpcUaSink implements PipeConnector {
+ File.separatorChar
+
UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET))));
- client =
- new IoTDBOpcUaClient(
- nodeUrl,
- policy,
- provider,
- parameters.getBooleanOrDefault(
- Arrays.asList(CONNECTOR_OPC_UA_HISTORIZING_KEY,
SINK_OPC_UA_HISTORIZING_KEY),
- CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE));
- new ClientRunner(client, securityDir, password).run();
+ final long timeoutSeconds =
+ parameters.getLongOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY,
SINK_OPC_UA_TIMEOUT_SECONDS_KEY),
+ CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE);
+
+ synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) {
+ client =
+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP
+ .compute(
+ nodeUrl,
+ (key, oldValue) -> {
+ if (Objects.isNull(oldValue)) {
+ final IoTDBOpcUaClient result =
+ new IoTDBOpcUaClient(
+ nodeUrl,
+ policy,
+ provider,
+ parameters.getBooleanOrDefault(
+ Arrays.asList(
+ CONNECTOR_OPC_UA_HISTORIZING_KEY,
+ SINK_OPC_UA_HISTORIZING_KEY),
+ CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE));
+ final ClientRunner runner =
+ new ClientRunner(result, securityDir, password,
userName, timeoutSeconds);
+ runner.run();
+ return new Pair<>(new AtomicInteger(0), result);
+ }
+ oldValue.getRight().checkEquals(userName, password,
securityDir, policy);
+ return oldValue;
+ })
+ .getRight();
+
CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl).getLeft().incrementAndGet();
+ }
}
private SecurityPolicy getSecurityPolicy(final String securityPolicy) {
@@ -521,10 +550,6 @@ public class OpcUaSink implements PipeConnector {
@Override
public void close() throws Exception {
- if (Objects.nonNull(client)) {
- client.disconnect();
- }
-
if (serverKey == null) {
return;
}
@@ -544,6 +569,26 @@ public class OpcUaSink implements PipeConnector {
}
}
}
+
+ if (nodeUrl == null) {
+ return;
+ }
+
+ synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) {
+ final Pair<AtomicInteger, IoTDBOpcUaClient> pair =
+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl);
+ if (pair == null) {
+ return;
+ }
+
+ if (pair.getLeft().decrementAndGet() <= 0) {
+ try {
+ pair.getRight().disconnect();
+ } finally {
+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.remove(nodeUrl);
+ }
+ }
+ }
}
/////////////////////////////// Getter ///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
index 402091598f1..69fe16f1aaa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
@@ -25,15 +25,18 @@ import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import
org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.Security;
+import java.util.Objects;
import static
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
@@ -49,14 +52,23 @@ public class ClientRunner {
private final IoTDBOpcUaClient configurableUaClient;
private final Path securityDir;
private final String password;
+ private final long timeoutSeconds;
+
+ // For conflict checking
+ private final String user;
public ClientRunner(
final IoTDBOpcUaClient configurableUaClient,
final String securityDir,
- final String password) {
+ final String password,
+ final String user,
+ final long timeoutSeconds) {
this.configurableUaClient = configurableUaClient;
this.securityDir = Paths.get(securityDir);
this.password = password;
+ this.user = user;
+ this.timeoutSeconds = timeoutSeconds;
+ configurableUaClient.setRunner(this);
}
private OpcUaClient createClient() throws Exception {
@@ -90,7 +102,9 @@ public class ClientRunner {
.setCertificateChain(loader.getClientCertificateChain())
.setCertificateValidator(certificateValidator)
.setIdentityProvider(configurableUaClient.getIdentityProvider())
- .setRequestTimeout(uint(5000))
+ .setRequestTimeout(uint(timeoutSeconds * 1000L))
+ .setConnectTimeout(uint(timeoutSeconds * 1000L))
+ .setMaxResponseMessageSize(uint(0))
.build());
}
@@ -109,4 +123,37 @@ public class ClientRunner {
"Error getting opc client: " + e.getClass().getSimpleName() + ": " +
e.getMessage(), e);
}
}
+
+ long getTimeoutSeconds() {
+ return timeoutSeconds;
+ }
+
+ /////////////////////////////// Conflict detection
///////////////////////////////
+
+ void checkEquals(
+ final String user,
+ final String password,
+ final Path securityDir,
+ final SecurityPolicy securityPolicy) {
+ checkEquals("user", this.user, user);
+ checkEquals("password", this.password, password);
+ checkEquals(
+ "security dir",
+
FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()),
+
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
+ checkEquals("securityPolicy", configurableUaClient.getSecurityPolicy(),
securityPolicy);
+ }
+
+ private void checkEquals(final String attrName, Object thisAttr, Object
thatAttr) {
+ if (!Objects.equals(thisAttr, thatAttr)) {
+ if (attrName.equals("password")) {
+ thisAttr = "****";
+ thatAttr = "****";
+ }
+ throw new PipeException(
+ String.format(
+ "The existing server with nodeUrl %s's %s %s conflicts to the
new %s %s, reject reusing.",
+ configurableUaClient.getNodeUrl(), attrName, thisAttr, attrName,
thatAttr));
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index c6d8da47878..977d672df73 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -34,6 +34,7 @@ import org.eclipse.milo.opcua.sdk.core.AccessLevel;
import org.eclipse.milo.opcua.sdk.core.ValueRanks;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.StatusCodes;
+import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
@@ -55,14 +56,17 @@ import
org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import static
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType;
import static
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
+import static org.eclipse.milo.opcua.stack.core.StatusCodes.Bad_Timeout;
public class IoTDBOpcUaClient {
private static final Logger LOGGER =
LoggerFactory.getLogger(OpcUaNameSpace.class);
@@ -78,6 +82,7 @@ public class IoTDBOpcUaClient {
private final IdentityProvider identityProvider;
private OpcUaClient client;
private final boolean historizing;
+ private ClientRunner runner;
public IoTDBOpcUaClient(
final String nodeUrl,
@@ -93,7 +98,20 @@ public class IoTDBOpcUaClient {
public void run(final OpcUaClient client) throws Exception {
// synchronous connect
this.client = client;
- client.connect().get();
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < runner.getTimeoutSeconds()
* 1000L) {
+ try {
+ client.connect().get();
+ } catch (final ExecutionException e) {
+ if (e.getCause() instanceof UaException
+ && ((UaException) e.getCause()).getStatusCode().getValue() ==
Bad_Timeout) {
+ Thread.sleep(1000L);
+ continue;
+ }
+ throw e;
+ }
+ break;
+ }
}
// Only support tree model & client-server
@@ -300,4 +318,18 @@ public class IoTDBOpcUaClient {
null // notifier
);
}
+
+ /////////////////////////////// Conflict detection
///////////////////////////////
+
+ public void setRunner(ClientRunner runner) {
+ this.runner = runner;
+ }
+
+ public void checkEquals(
+ final String user,
+ final String password,
+ final String securityDir,
+ final SecurityPolicy securityPolicy) {
+ runner.checkEquals(user, password, Paths.get(securityDir), securityPolicy);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index fac08cfe0e8..bf2ee3d17a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2333,9 +2333,9 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
// Construct temporary pipe static meta for validation
final String pipeName = alterPipeStatement.getPipeName();
- final Map<String, String> extractorAttributes;
+ final Map<String, String> sourceAttributes;
final Map<String, String> processorAttributes;
- final Map<String, String> connectorAttributes;
+ final Map<String, String> sinkAttributes;
try {
if (!alterPipeStatement.getSourceAttributes().isEmpty()) {
// We don't allow changing the extractor plugin type
@@ -2347,7 +2347,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new PipeParameters(alterPipeStatement.getSourceAttributes()));
}
if (alterPipeStatement.isReplaceAllSourceAttributes()) {
- extractorAttributes = alterPipeStatement.getSourceAttributes();
+ sourceAttributes = alterPipeStatement.getSourceAttributes();
} else {
final boolean onlyContainsUser =
onlyContainsUser(alterPipeStatement.getSourceAttributes());
@@ -2356,14 +2356,14 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
.getSourceParameters()
.addOrReplaceEquivalentAttributes(
new
PipeParameters(alterPipeStatement.getSourceAttributes()));
- extractorAttributes =
+ sourceAttributes =
pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute();
if (onlyContainsUser) {
- checkSourceType(alterPipeStatement.getPipeName(),
extractorAttributes);
+ checkSourceType(alterPipeStatement.getPipeName(),
sourceAttributes);
}
}
} else {
- extractorAttributes =
+ sourceAttributes =
pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute();
}
@@ -2386,7 +2386,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
if (!alterPipeStatement.getSinkAttributes().isEmpty()) {
if (alterPipeStatement.isReplaceAllSinkAttributes()) {
- connectorAttributes = alterPipeStatement.getSinkAttributes();
+ sinkAttributes = alterPipeStatement.getSinkAttributes();
} else {
final boolean onlyContainsUser =
onlyContainsUser(alterPipeStatement.getSinkAttributes());
pipeMetaFromCoordinator
@@ -2394,19 +2394,18 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
.getSinkParameters()
.addOrReplaceEquivalentAttributes(
new PipeParameters(alterPipeStatement.getSinkAttributes()));
- connectorAttributes =
+ sinkAttributes =
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
if (onlyContainsUser) {
- checkSinkType(alterPipeStatement.getPipeName(),
connectorAttributes);
+ checkSinkType(alterPipeStatement.getPipeName(), sinkAttributes);
}
}
} else {
- connectorAttributes =
-
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
+ sinkAttributes =
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
}
PipeDataNodeAgent.plugin()
- .validate(pipeName, extractorAttributes, processorAttributes,
connectorAttributes);
+ .validate(pipeName, sourceAttributes, processorAttributes,
sinkAttributes);
} catch (final Exception e) {
future.setException(
new IoTDBException(e.getMessage(),
TSStatusCode.PIPE_ERROR.getStatusCode()));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index b27bb59224d..ec9afce7c6f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -229,6 +229,11 @@ public class PipeSinkConstant {
public static final String SINK_OPC_UA_HISTORIZING_KEY =
"sink.opcua.historizing";
public static final boolean CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE =
false;
+ public static final String SINK_OPC_UA_TIMEOUT_SECONDS_KEY =
"sink.opcua.timeout-seconds";
+ public static final String CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY =
+ "connector.opcua.timeout-seconds";
+ public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE =
10L;
+
public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY =
"connector.leader-cache.enable";
public static final String SINK_LEADER_CACHE_ENABLE_KEY =
"sink.leader-cache.enable";
public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE =
true;