This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch cp-opc-client
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5f69a7398776259a881b77687af54c1be67afa02
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jan 28 10:16:54 2026 +0800

    Pipe: Fixed the OPC UA client connection problem (#17083)
    
    * fix
    
    * IT
    
    (cherry picked from commit 82f7ca6dfc54435c8ead7327669c888b1edbeeba)
---
 .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java     | 32 +++++++--
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     | 79 +++++++++++++++++-----
 .../sink/protocol/opcua/client/ClientRunner.java   | 51 +++++++++++++-
 .../protocol/opcua/client/IoTDBOpcUaClient.java    | 34 +++++++++-
 .../pipe/config/constant/PipeSinkConstant.java     |  5 ++
 5 files changed, 177 insertions(+), 24 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index a5443561ddc..599a43476c1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -120,7 +120,11 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT 
{
           env,
           Arrays.asList(
               "create aligned timeSeries root.db.opc(value double, quality 
boolean, other int32)",
-              "insert into root.db.opc(time, value, quality, other) values (0, 
0, true, 1)"),
+              "create aligned timeSeries root.db.opc1(value double, quality 
boolean, other int32)",
+              "create aligned timeSeries root.db.opc2(value double, quality 
boolean, other int32)",
+              "insert into root.db.opc(time, value, quality, other) values (0, 
0, true, 1)",
+              "insert into root.db.opc1(time, value, quality, other) values 
(0, 0, true, 1)",
+              "insert into root.db.opc2(time, value, quality, other) values 
(0, 0, true, 1)"),
           null);
 
       while (true) {
@@ -156,9 +160,13 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT 
{
         break;
       }
 
-      TestUtils.executeNonQuery(
+      // Test multiple regions
+      TestUtils.executeNonQueries(
           env,
-          "insert into root.db.opc(time, value, quality, other) values (1, 1, 
false, 1)",
+          Arrays.asList(
+              "insert into root.db.opc(time, value, quality, other) values (1, 
1, false, 1)",
+              "insert into root.db.opc1(time, value, quality, other) values 
(1, 1, false, 1)",
+              "insert into root.db.opc2(time, value, quality, other) values 
(1, 1, false, 1)"),
           null);
 
       long startTime = System.currentTimeMillis();
@@ -169,6 +177,22 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT 
{
           Assert.assertEquals(new Variant(1.0), value.getValue());
           Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
           Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+
+          value =
+              opcUaClient
+                  .readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc1"))
+                  .get();
+          Assert.assertEquals(new Variant(1.0), value.getValue());
+          Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
+          Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+
+          value =
+              opcUaClient
+                  .readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc2"))
+                  .get();
+          Assert.assertEquals(new Variant(1.0), value.getValue());
+          Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
+          Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
           break;
         } catch (final Throwable t) {
           if (System.currentTimeMillis() - startTime > 10_000L) {
@@ -269,7 +293,7 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
             + 
UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET));
 
     client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false);
-    new ClientRunner(client, securityDir, password).run();
+    new ClientRunner(client, securityDir, password, userName, 10).run();
     return client.getClient();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 6d44322a3d5..e1985061224 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -92,6 +92,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE;
@@ -109,6 +111,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TIMEOUT_SECONDS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;
 
@@ -126,8 +129,11 @@ public class OpcUaSink implements PipeConnector {
 
   private static final Map<String, Pair<AtomicInteger, OpcUaNameSpace>>
       SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new 
ConcurrentHashMap<>();
+  private static final Map<String, Pair<AtomicInteger, IoTDBOpcUaClient>>
+      CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP = new ConcurrentHashMap<>();
 
   private String serverKey;
+  private String nodeUrl;
   private boolean isClientServerModel;
   private @Nullable String valueName;
   private @Nullable String qualityName;
@@ -212,12 +218,11 @@ public class OpcUaSink implements PipeConnector {
                 CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
             .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE);
 
-    final String nodeUrl =
-        parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, 
SINK_OPC_UA_NODE_URL_KEY);
+    nodeUrl = parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, 
SINK_OPC_UA_NODE_URL_KEY);
     if (Objects.isNull(nodeUrl)) {
       customizeServer(parameters);
     } else {
-      customizeClient(nodeUrl, parameters);
+      customizeClient(parameters);
     }
   }
 
@@ -320,7 +325,7 @@ public class OpcUaSink implements PipeConnector {
     }
   }
 
-  private void customizeClient(final String nodeUrl, final PipeParameters 
parameters) {
+  private void customizeClient(final PipeParameters parameters) {
     final SecurityPolicy policy =
         getSecurityPolicy(
             parameters
@@ -350,15 +355,39 @@ public class OpcUaSink implements PipeConnector {
                     + File.separatorChar
                     + 
UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET))));
 
-    client =
-        new IoTDBOpcUaClient(
-            nodeUrl,
-            policy,
-            provider,
-            parameters.getBooleanOrDefault(
-                Arrays.asList(CONNECTOR_OPC_UA_HISTORIZING_KEY, 
SINK_OPC_UA_HISTORIZING_KEY),
-                CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE));
-    new ClientRunner(client, securityDir, password).run();
+    final long timeoutSeconds =
+        parameters.getLongOrDefault(
+            Arrays.asList(CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY, 
SINK_OPC_UA_TIMEOUT_SECONDS_KEY),
+            CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE);
+
+    synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) {
+      client =
+          CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP
+              .compute(
+                  nodeUrl,
+                  (key, oldValue) -> {
+                    if (Objects.isNull(oldValue)) {
+                      final IoTDBOpcUaClient result =
+                          new IoTDBOpcUaClient(
+                              nodeUrl,
+                              policy,
+                              provider,
+                              parameters.getBooleanOrDefault(
+                                  Arrays.asList(
+                                      CONNECTOR_OPC_UA_HISTORIZING_KEY,
+                                      SINK_OPC_UA_HISTORIZING_KEY),
+                                  CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE));
+                      final ClientRunner runner =
+                          new ClientRunner(result, securityDir, password, 
userName, timeoutSeconds);
+                      runner.run();
+                      return new Pair<>(new AtomicInteger(0), result);
+                    }
+                    oldValue.getRight().checkEquals(userName, password, 
securityDir, policy);
+                    return oldValue;
+                  })
+              .getRight();
+      
CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl).getLeft().incrementAndGet();
+    }
   }
 
   private SecurityPolicy getSecurityPolicy(final String securityPolicy) {
@@ -489,10 +518,6 @@ public class OpcUaSink implements PipeConnector {
 
   @Override
   public void close() throws Exception {
-    if (Objects.nonNull(client)) {
-      client.disconnect();
-    }
-
     if (serverKey == null) {
       return;
     }
@@ -512,6 +537,26 @@ public class OpcUaSink implements PipeConnector {
         }
       }
     }
+
+    if (nodeUrl == null) {
+      return;
+    }
+
+    synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) {
+      final Pair<AtomicInteger, IoTDBOpcUaClient> pair =
+          CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl);
+      if (pair == null) {
+        return;
+      }
+
+      if (pair.getLeft().decrementAndGet() <= 0) {
+        try {
+          pair.getRight().disconnect();
+        } finally {
+          CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.remove(nodeUrl);
+        }
+      }
+    }
   }
 
   /////////////////////////////// Getter ///////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
index 725ecbbae98..9e3b46d463a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
@@ -25,15 +25,18 @@ import org.bouncycastle.jce.provider.BouncyCastleProvider;
 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
 import 
org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
 import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
 import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.Security;
+import java.util.Objects;
 
 import static 
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
 
@@ -49,14 +52,23 @@ public class ClientRunner {
   private final IoTDBOpcUaClient configurableUaClient;
   private final Path securityDir;
   private final String password;
+  private final long timeoutSeconds;
+
+  // For conflict checking
+  private final String user;
 
   public ClientRunner(
       final IoTDBOpcUaClient configurableUaClient,
       final String securityDir,
-      final String password) {
+      final String password,
+      final String user,
+      final long timeoutSeconds) {
     this.configurableUaClient = configurableUaClient;
     this.securityDir = Paths.get(securityDir);
     this.password = password;
+    this.user = user;
+    this.timeoutSeconds = timeoutSeconds;
+    configurableUaClient.setRunner(this);
   }
 
   private OpcUaClient createClient() throws Exception {
@@ -90,7 +102,9 @@ public class ClientRunner {
                 .setCertificateChain(loader.getClientCertificateChain())
                 .setCertificateValidator(certificateValidator)
                 
.setIdentityProvider(configurableUaClient.getIdentityProvider())
-                .setRequestTimeout(uint(5000))
+                .setRequestTimeout(uint(timeoutSeconds * 1000L))
+                .setConnectTimeout(uint(timeoutSeconds * 1000L))
+                .setMaxResponseMessageSize(uint(0))
                 .build());
   }
 
@@ -109,4 +123,37 @@ public class ClientRunner {
           "Error getting opc client: " + e.getClass().getSimpleName() + ": " + 
e.getMessage());
     }
   }
+
+  long getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
+  /////////////////////////////// Conflict detection 
///////////////////////////////
+
+  void checkEquals(
+      final String user,
+      final String password,
+      final Path securityDir,
+      final SecurityPolicy securityPolicy) {
+    checkEquals("user", this.user, user);
+    checkEquals("password", this.password, password);
+    checkEquals(
+        "security dir",
+        
FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()),
+        
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
+    checkEquals("securityPolicy", configurableUaClient.getSecurityPolicy(), 
securityPolicy);
+  }
+
+  private void checkEquals(final String attrName, Object thisAttr, Object 
thatAttr) {
+    if (!Objects.equals(thisAttr, thatAttr)) {
+      if (attrName.equals("password")) {
+        thisAttr = "****";
+        thatAttr = "****";
+      }
+      throw new PipeException(
+          String.format(
+              "The existing server with nodeUrl %s's %s %s conflicts to the 
new %s %s, reject reusing.",
+              configurableUaClient.getNodeUrl(), attrName, thisAttr, attrName, 
thatAttr));
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index ce63081c71a..2019c0fe833 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -34,6 +34,7 @@ import org.eclipse.milo.opcua.sdk.core.AccessLevel;
 import org.eclipse.milo.opcua.sdk.core.ValueRanks;
 import org.eclipse.milo.opcua.stack.core.Identifiers;
 import org.eclipse.milo.opcua.stack.core.StatusCodes;
+import org.eclipse.milo.opcua.stack.core.UaException;
 import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
@@ -55,14 +56,17 @@ import 
org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Predicate;
 
 import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType;
 import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
+import static org.eclipse.milo.opcua.stack.core.StatusCodes.Bad_Timeout;
 
 public class IoTDBOpcUaClient {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcUaNameSpace.class);
@@ -78,6 +82,7 @@ public class IoTDBOpcUaClient {
   private final IdentityProvider identityProvider;
   private OpcUaClient client;
   private final boolean historizing;
+  private ClientRunner runner;
 
   public IoTDBOpcUaClient(
       final String nodeUrl,
@@ -93,7 +98,20 @@ public class IoTDBOpcUaClient {
   public void run(final OpcUaClient client) throws Exception {
     // synchronous connect
     this.client = client;
-    client.connect().get();
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < runner.getTimeoutSeconds() 
* 1000L) {
+      try {
+        client.connect().get();
+      } catch (final ExecutionException e) {
+        if (e.getCause() instanceof UaException
+            && ((UaException) e.getCause()).getStatusCode().getValue() == 
Bad_Timeout) {
+          Thread.sleep(1000L);
+          continue;
+        }
+        throw e;
+      }
+      break;
+    }
   }
 
   // Only support tree model & client-server
@@ -300,4 +318,18 @@ public class IoTDBOpcUaClient {
         null // notifier
         );
   }
+
+  /////////////////////////////// Conflict detection 
///////////////////////////////
+
+  public void setRunner(ClientRunner runner) {
+    this.runner = runner;
+  }
+
+  public void checkEquals(
+      final String user,
+      final String password,
+      final String securityDir,
+      final SecurityPolicy securityPolicy) {
+    runner.checkEquals(user, password, Paths.get(securityDir), securityPolicy);
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index fd4f2917df2..2eaf6f903de 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -226,6 +226,11 @@ public class PipeSinkConstant {
   public static final String SINK_OPC_UA_HISTORIZING_KEY = 
"sink.opcua.historizing";
   public static final boolean CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE = 
false;
 
+  public static final String SINK_OPC_UA_TIMEOUT_SECONDS_KEY = 
"sink.opcua.timeout-seconds";
+  public static final String CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY =
+      "connector.opcua.timeout-seconds";
+  public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE = 
10L;
+
   public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = 
"connector.leader-cache.enable";
   public static final String SINK_LEADER_CACHE_ENABLE_KEY = 
"sink.leader-cache.enable";
   public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = 
true;

Reply via email to