This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch ger-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ger-fix by this push:
     new 3235551dd98 f
3235551dd98 is described below

commit 3235551dd98317e8ef9864e4aec32232ec407299
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jan 22 17:00:53 2026 +0800

    f
---
 .../apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java |  6 +++---
 .../pipe/api/customizer/parameter/PipeParameters.java | 19 ++++++++++++++++++-
 .../sink/protocol/opcua/client/IoTDBOpcUaClient.java  |  4 +++-
 3 files changed, 24 insertions(+), 5 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 67060c6002b..1ae4ec03fb2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -94,8 +94,8 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
       final Map<String, String> sinkAttributes = new HashMap<>();
 
       sinkAttributes.put("sink", "opc-ua-sink");
-      sinkAttributes.put("opcua.model", "client-server");
-      sinkAttributes.put("security-policy", "None");
+      sinkAttributes.put("model", "client-server");
+      sinkAttributes.put("opcua.security-policy", "None");
 
       OpcUaClient opcUaClient;
       DataValue value;
@@ -103,7 +103,7 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
         final int[] ports = EnvUtils.searchAvailablePorts();
         tcpPort = ports[0];
         httpsPort = ports[1];
-        sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
+        sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
         sinkAttributes.put("https.port", Integer.toString(httpsPort));
 
         Assert.assertEquals(
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 989590f40a6..09b70db1f02 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -92,7 +92,11 @@ public class PipeParameters {
   }
 
   public String getString(final String key) {
-    final String value = attributes.get(key);
+    String value = attributes.get(key);
+    if (Objects.nonNull(value)) {
+      return value;
+    }
+    value = attributes.get(KeyReducer.shallowReduce(key));
     return value != null ? value : attributes.get(KeyReducer.reduce(key));
   }
 
@@ -380,6 +384,19 @@ public class PipeParameters {
       SECOND_PREFIXES.add("opcua.");
     }
 
+    static String shallowReduce(String key) {
+      if (key == null) {
+        return null;
+      }
+      final String lowerCaseKey = key.toLowerCase();
+      for (final String prefix : FIRST_PREFIXES) {
+        if (lowerCaseKey.startsWith(prefix)) {
+          return key.substring(prefix.length());
+        }
+      }
+      return key;
+    }
+
     static String reduce(String key) {
       if (key == null) {
         return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index bf96d988180..c6d8da47878 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -242,7 +242,9 @@ public class IoTDBOpcUaClient {
   }
 
   public void disconnect() throws Exception {
-    client.disconnect().get();
+    if (Objects.nonNull(client)) {
+      client.disconnect().get();
+    }
   }
 
   /////////////////////////////// Getter ///////////////////////////////

Reply via email to