This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch opc-many-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/opc-many-fix by this push:
     new 0a1eedad213 fix
0a1eedad213 is described below

commit 0a1eedad213fb81a3c105bfe8990257e02a455f8
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jan 26 17:11:27 2026 +0800

    fix
---
 .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java     |  2 +-
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     | 79 +++++++++++++++++-----
 .../sink/protocol/opcua/client/ClientRunner.java   | 51 +++++++++++++-
 .../protocol/opcua/client/IoTDBOpcUaClient.java    | 34 +++++++++-
 .../config/executor/ClusterConfigTaskExecutor.java | 23 +++----
 .../pipe/config/constant/PipeSinkConstant.java     |  5 ++
 6 files changed, 161 insertions(+), 33 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 9ac2f4e17b7..6c89dabc63a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -345,7 +345,7 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
             + 
UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET));
 
     client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false);
-    new ClientRunner(client, securityDir, password).run();
+    new ClientRunner(client, securityDir, password, userName, 10).run();
     return client.getClient();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 071db688dd8..554ac3b0df4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -100,6 +100,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE;
@@ -118,6 +120,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TIMEOUT_SECONDS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;
 
@@ -137,8 +140,11 @@ public class OpcUaSink implements PipeConnector {
 
   private static final Map<String, Pair<AtomicInteger, OpcUaNameSpace>>
       SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new 
ConcurrentHashMap<>();
+  private static final Map<String, Pair<AtomicInteger, IoTDBOpcUaClient>>
+      CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP = new ConcurrentHashMap<>();
 
   private String serverKey;
+  private String nodeUrl;
   private boolean isClientServerModel;
   private String databaseName;
   private String placeHolder4NullTag;
@@ -238,8 +244,7 @@ public class OpcUaSink implements PipeConnector {
           "When the OPC UA sink sets 'with-quality' to true, the table model 
data is not supported.");
     }
 
-    final String nodeUrl =
-        parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, 
SINK_OPC_UA_NODE_URL_KEY);
+    nodeUrl = parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, 
SINK_OPC_UA_NODE_URL_KEY);
     if (Objects.isNull(nodeUrl)) {
       customizeServer(parameters);
     } else {
@@ -247,7 +252,7 @@ public class OpcUaSink implements PipeConnector {
         throw new PipeException(
             "When the OPC UA sink points to an outer server, the table model 
data is not supported.");
       }
-      customizeClient(nodeUrl, parameters);
+      customizeClient(parameters);
     }
   }
 
@@ -350,7 +355,7 @@ public class OpcUaSink implements PipeConnector {
     }
   }
 
-  private void customizeClient(final String nodeUrl, final PipeParameters 
parameters) {
+  private void customizeClient(final PipeParameters parameters) {
     final SecurityPolicy policy =
         getSecurityPolicy(
             parameters
@@ -380,15 +385,39 @@ public class OpcUaSink implements PipeConnector {
                     + File.separatorChar
                     + 
UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET))));
 
-    client =
-        new IoTDBOpcUaClient(
-            nodeUrl,
-            policy,
-            provider,
-            parameters.getBooleanOrDefault(
-                Arrays.asList(CONNECTOR_OPC_UA_HISTORIZING_KEY, 
SINK_OPC_UA_HISTORIZING_KEY),
-                CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE));
-    new ClientRunner(client, securityDir, password).run();
+    final long timeoutSeconds =
+        parameters.getLongOrDefault(
+            Arrays.asList(CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY, 
SINK_OPC_UA_TIMEOUT_SECONDS_KEY),
+            CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE);
+
+    synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) {
+      client =
+          CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP
+              .compute(
+                  nodeUrl,
+                  (key, oldValue) -> {
+                    if (Objects.isNull(oldValue)) {
+                      final IoTDBOpcUaClient result =
+                          new IoTDBOpcUaClient(
+                              nodeUrl,
+                              policy,
+                              provider,
+                              parameters.getBooleanOrDefault(
+                                  Arrays.asList(
+                                      CONNECTOR_OPC_UA_HISTORIZING_KEY,
+                                      SINK_OPC_UA_HISTORIZING_KEY),
+                                  CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE));
+                      final ClientRunner runner =
+                          new ClientRunner(result, securityDir, password, 
userName, timeoutSeconds);
+                      runner.run();
+                      return new Pair<>(new AtomicInteger(0), result);
+                    }
+                    oldValue.getRight().checkEquals(userName, password, 
securityDir, policy);
+                    return oldValue;
+                  })
+              .getRight();
+      
CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl).getLeft().incrementAndGet();
+    }
   }
 
   private SecurityPolicy getSecurityPolicy(final String securityPolicy) {
@@ -521,10 +550,6 @@ public class OpcUaSink implements PipeConnector {
 
   @Override
   public void close() throws Exception {
-    if (Objects.nonNull(client)) {
-      client.disconnect();
-    }
-
     if (serverKey == null) {
       return;
     }
@@ -544,6 +569,26 @@ public class OpcUaSink implements PipeConnector {
         }
       }
     }
+
+    if (nodeUrl == null) {
+      return;
+    }
+
+    synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) {
+      final Pair<AtomicInteger, IoTDBOpcUaClient> pair =
+          CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl);
+      if (pair == null) {
+        return;
+      }
+
+      if (pair.getLeft().decrementAndGet() <= 0) {
+        try {
+          pair.getRight().disconnect();
+        } finally {
+          CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.remove(nodeUrl);
+        }
+      }
+    }
   }
 
   /////////////////////////////// Getter ///////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
index 402091598f1..69fe16f1aaa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
@@ -25,15 +25,18 @@ import org.bouncycastle.jce.provider.BouncyCastleProvider;
 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
 import 
org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
 import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
 import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.Security;
+import java.util.Objects;
 
 import static 
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
 
@@ -49,14 +52,23 @@ public class ClientRunner {
   private final IoTDBOpcUaClient configurableUaClient;
   private final Path securityDir;
   private final String password;
+  private final long timeoutSeconds;
+
+  // For conflict checking
+  private final String user;
 
   public ClientRunner(
       final IoTDBOpcUaClient configurableUaClient,
       final String securityDir,
-      final String password) {
+      final String password,
+      final String user,
+      final long timeoutSeconds) {
     this.configurableUaClient = configurableUaClient;
     this.securityDir = Paths.get(securityDir);
     this.password = password;
+    this.user = user;
+    this.timeoutSeconds = timeoutSeconds;
+    configurableUaClient.setRunner(this);
   }
 
   private OpcUaClient createClient() throws Exception {
@@ -90,7 +102,9 @@ public class ClientRunner {
                 .setCertificateChain(loader.getClientCertificateChain())
                 .setCertificateValidator(certificateValidator)
                 
.setIdentityProvider(configurableUaClient.getIdentityProvider())
-                .setRequestTimeout(uint(5000))
+                .setRequestTimeout(uint(timeoutSeconds * 1000L))
+                .setConnectTimeout(uint(timeoutSeconds * 1000L))
+                .setMaxResponseMessageSize(uint(0))
                 .build());
   }
 
@@ -109,4 +123,37 @@ public class ClientRunner {
           "Error getting opc client: " + e.getClass().getSimpleName() + ": " + 
e.getMessage(), e);
     }
   }
+
+  long getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
+  /////////////////////////////// Conflict detection 
///////////////////////////////
+
+  void checkEquals(
+      final String user,
+      final String password,
+      final Path securityDir,
+      final SecurityPolicy securityPolicy) {
+    checkEquals("user", this.user, user);
+    checkEquals("password", this.password, password);
+    checkEquals(
+        "security dir",
+        
FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()),
+        
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
+    checkEquals("securityPolicy", configurableUaClient.getSecurityPolicy(), 
securityPolicy);
+  }
+
+  private void checkEquals(final String attrName, Object thisAttr, Object 
thatAttr) {
+    if (!Objects.equals(thisAttr, thatAttr)) {
+      if (attrName.equals("password")) {
+        thisAttr = "****";
+        thatAttr = "****";
+      }
+      throw new PipeException(
+          String.format(
+              "The existing server with nodeUrl %s's %s %s conflicts to the 
new %s %s, reject reusing.",
+              configurableUaClient.getNodeUrl(), attrName, thisAttr, attrName, 
thatAttr));
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index c6d8da47878..977d672df73 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -34,6 +34,7 @@ import org.eclipse.milo.opcua.sdk.core.AccessLevel;
 import org.eclipse.milo.opcua.sdk.core.ValueRanks;
 import org.eclipse.milo.opcua.stack.core.Identifiers;
 import org.eclipse.milo.opcua.stack.core.StatusCodes;
+import org.eclipse.milo.opcua.stack.core.UaException;
 import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
@@ -55,14 +56,17 @@ import 
org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Predicate;
 
 import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType;
 import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
+import static org.eclipse.milo.opcua.stack.core.StatusCodes.Bad_Timeout;
 
 public class IoTDBOpcUaClient {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcUaNameSpace.class);
@@ -78,6 +82,7 @@ public class IoTDBOpcUaClient {
   private final IdentityProvider identityProvider;
   private OpcUaClient client;
   private final boolean historizing;
+  private ClientRunner runner;
 
   public IoTDBOpcUaClient(
       final String nodeUrl,
@@ -93,7 +98,20 @@ public class IoTDBOpcUaClient {
   public void run(final OpcUaClient client) throws Exception {
     // synchronous connect
     this.client = client;
-    client.connect().get();
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < runner.getTimeoutSeconds() 
* 1000L) {
+      try {
+        client.connect().get();
+      } catch (final ExecutionException e) {
+        if (e.getCause() instanceof UaException
+            && ((UaException) e.getCause()).getStatusCode().getValue() == 
Bad_Timeout) {
+          Thread.sleep(1000L);
+          continue;
+        }
+        throw e;
+      }
+      break;
+    }
   }
 
   // Only support tree model & client-server
@@ -300,4 +318,18 @@ public class IoTDBOpcUaClient {
         null // notifier
         );
   }
+
+  /////////////////////////////// Conflict detection 
///////////////////////////////
+
+  public void setRunner(ClientRunner runner) {
+    this.runner = runner;
+  }
+
+  public void checkEquals(
+      final String user,
+      final String password,
+      final String securityDir,
+      final SecurityPolicy securityPolicy) {
+    runner.checkEquals(user, password, Paths.get(securityDir), securityPolicy);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index fac08cfe0e8..bf2ee3d17a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2333,9 +2333,9 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
     // Construct temporary pipe static meta for validation
     final String pipeName = alterPipeStatement.getPipeName();
-    final Map<String, String> extractorAttributes;
+    final Map<String, String> sourceAttributes;
     final Map<String, String> processorAttributes;
-    final Map<String, String> connectorAttributes;
+    final Map<String, String> sinkAttributes;
     try {
       if (!alterPipeStatement.getSourceAttributes().isEmpty()) {
         // We don't allow changing the extractor plugin type
@@ -2347,7 +2347,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
               new PipeParameters(alterPipeStatement.getSourceAttributes()));
         }
         if (alterPipeStatement.isReplaceAllSourceAttributes()) {
-          extractorAttributes = alterPipeStatement.getSourceAttributes();
+          sourceAttributes = alterPipeStatement.getSourceAttributes();
         } else {
           final boolean onlyContainsUser =
               onlyContainsUser(alterPipeStatement.getSourceAttributes());
@@ -2356,14 +2356,14 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
               .getSourceParameters()
               .addOrReplaceEquivalentAttributes(
                   new 
PipeParameters(alterPipeStatement.getSourceAttributes()));
-          extractorAttributes =
+          sourceAttributes =
               
pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute();
           if (onlyContainsUser) {
-            checkSourceType(alterPipeStatement.getPipeName(), 
extractorAttributes);
+            checkSourceType(alterPipeStatement.getPipeName(), 
sourceAttributes);
           }
         }
       } else {
-        extractorAttributes =
+        sourceAttributes =
             
pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute();
       }
 
@@ -2386,7 +2386,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
       if (!alterPipeStatement.getSinkAttributes().isEmpty()) {
         if (alterPipeStatement.isReplaceAllSinkAttributes()) {
-          connectorAttributes = alterPipeStatement.getSinkAttributes();
+          sinkAttributes = alterPipeStatement.getSinkAttributes();
         } else {
           final boolean onlyContainsUser = 
onlyContainsUser(alterPipeStatement.getSinkAttributes());
           pipeMetaFromCoordinator
@@ -2394,19 +2394,18 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
               .getSinkParameters()
               .addOrReplaceEquivalentAttributes(
                   new PipeParameters(alterPipeStatement.getSinkAttributes()));
-          connectorAttributes =
+          sinkAttributes =
               
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
           if (onlyContainsUser) {
-            checkSinkType(alterPipeStatement.getPipeName(), 
connectorAttributes);
+            checkSinkType(alterPipeStatement.getPipeName(), sinkAttributes);
           }
         }
       } else {
-        connectorAttributes =
-            
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
+        sinkAttributes = 
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
       }
 
       PipeDataNodeAgent.plugin()
-          .validate(pipeName, extractorAttributes, processorAttributes, 
connectorAttributes);
+          .validate(pipeName, sourceAttributes, processorAttributes, 
sinkAttributes);
     } catch (final Exception e) {
       future.setException(
           new IoTDBException(e.getMessage(), 
TSStatusCode.PIPE_ERROR.getStatusCode()));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index b27bb59224d..ec9afce7c6f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -229,6 +229,11 @@ public class PipeSinkConstant {
   public static final String SINK_OPC_UA_HISTORIZING_KEY = 
"sink.opcua.historizing";
   public static final boolean CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE = 
false;
 
+  public static final String SINK_OPC_UA_TIMEOUT_SECONDS_KEY = 
"sink.opcua.timeout-seconds";
+  public static final String CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY =
+      "connector.opcua.timeout-seconds";
+  public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE = 
10L;
+
   public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = 
"connector.leader-cache.enable";
   public static final String SINK_LEADER_CACHE_ENABLE_KEY = 
"sink.leader-cache.enable";
   public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = 
true;

Reply via email to