This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch cp-opc-client
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b6cf835cc2929460b327f2995c48abda4cffa323
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jan 23 09:46:25 2026 +0800

    Optimized the logger when table does not exist in DN heartbeat && Pipe: 
Fixed the OPC UA Sink key getter logic and potentail NPE when closing client && 
Load: Fixed the missing schema writing for "root" table (#17063)
    
    * root-fix
    
    * f
    
    * fix
    
    * rest
    
    * spls
    
    * gsa
    
    * fix
    
    (cherry picked from commit 5101489d4126b40120d72d49ee5a58edd23aec22)
---
 .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java     | 313 ++++++++++++---------
 .../api/customizer/parameter/PipeParameters.java   |  23 +-
 .../protocol/opcua/client/IoTDBOpcUaClient.java    |   4 +-
 3 files changed, 211 insertions(+), 129 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 4b4c69b951d..640336d6f0d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -49,6 +49,8 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
 import java.io.File;
+import java.net.ConnectException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -61,134 +63,193 @@ import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT1.class})
 public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
-  @Test
-  public void testOPCUAServerSink() throws Exception {
-    try (final SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
-
-      TestUtils.executeNonQuery(env, "insert into root.db.d1(time,s1) values 
(1,1)", null);
-
-      final Map<String, String> sinkAttributes = new HashMap<>();
-
-      sinkAttributes.put("sink", "opc-ua-sink");
-      sinkAttributes.put("opcua.model", "client-server");
-      sinkAttributes.put("security-policy", "None");
-
-      final int[] ports = EnvUtils.searchAvailablePorts();
-      final int tcpPort = ports[0];
-      final int httpsPort = ports[1];
-      sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
-      sinkAttributes.put("https.port", Integer.toString(httpsPort));
-
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-          client
-              .createPipe(
-                  new TCreatePipeReq("testPipe", sinkAttributes)
-                      .setExtractorAttributes(Collections.singletonMap("user", 
"root"))
-                      .setProcessorAttributes(Collections.emptyMap()))
-              .getCode());
-
-      final OpcUaClient opcUaClient =
-          getOpcUaClient(
-              "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", 
SecurityPolicy.None, "root", "root");
-      DataValue value =
-          opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/d1/s1")).get();
-      Assert.assertEquals(new Variant(1.0), value.getValue());
-      Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
-
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-          client
-              .alterPipe(
-                  new TAlterPipeReq()
-                      .setPipeName("testPipe")
-                      .setIsReplaceAllConnectorAttributes(false)
-                      
.setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
-                      .setProcessorAttributes(Collections.emptyMap())
-                      .setExtractorAttributes(Collections.emptyMap()))
-              .getCode());
-
-      TestUtils.executeNonQuery(
-          env,
-          "insert into root.db.opc(time, value, quality, other) values (1, 1, 
false, 1)",
-          null);
-
-      long startTime = System.currentTimeMillis();
-      while (true) {
-        try {
-          value =
-              opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc")).get();
-          Assert.assertEquals(new Variant(1.0), value.getValue());
-          Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
-          Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
-          break;
-        } catch (final Throwable t) {
-          if (System.currentTimeMillis() - startTime > 10_000L) {
-            throw t;
-          }
-        }
-      }
-
-      TestUtils.executeNonQuery(
-          env, "insert into root.db.opc(time, quality) values (2, true)", 
null);
-      TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) 
values (2, 2)", null);
-
-      startTime = System.currentTimeMillis();
-      while (true) {
-        try {
-          value =
-              opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc")).get();
-          Assert.assertEquals(new DateTime(timestampToUtc(2)), 
value.getSourceTime());
-          Assert.assertEquals(new Variant(2.0), value.getValue());
-          Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
-          break;
-        } catch (final Throwable t) {
-          if (System.currentTimeMillis() - startTime > 10_000L) {
-            throw t;
-          }
+    @Test
+    public void testOPCUAServerSink() throws Exception {
+        int tcpPort = -1;
+        int httpsPort = -1;
+        try (final SyncConfigNodeIServiceClient client =
+                     (SyncConfigNodeIServiceClient) 
env.getLeaderConfigNodeConnection()) {
+
+            TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) 
values (1, 1)", null);
+
+            final Map<String, String> sinkAttributes = new HashMap<>();
+
+            sinkAttributes.put("sink", "opc-ua-sink");
+            sinkAttributes.put("model", "client-server");
+            sinkAttributes.put("opcua.security-policy", "None");
+
+            OpcUaClient opcUaClient;
+            DataValue value;
+            while (true) {
+                final int[] ports = EnvUtils.searchAvailablePorts();
+                tcpPort = ports[0];
+                httpsPort = ports[1];
+                sinkAttributes.put("opcua.tcp.port", 
Integer.toString(tcpPort));
+                sinkAttributes.put("https.port", Integer.toString(httpsPort));
+
+                Assert.assertEquals(
+                        TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+                        client
+                                .createPipe(
+                                        new TCreatePipeReq("testPipe", 
sinkAttributes)
+                                                
.setExtractorAttributes(Collections.singletonMap("user", "root"))
+                                                
.setProcessorAttributes(Collections.emptyMap()))
+                                .getCode());
+
+                try {
+                    opcUaClient =
+                            getOpcUaClient(
+                                    "opc.tcp://127.0.0.1:" + tcpPort + 
"/iotdb", SecurityPolicy.None, "root", "root");
+                } catch (final PipeException e) {
+                    if (e.getCause() instanceof ConnectException) {
+                        continue;
+                    } else {
+                        throw e;
+                    }
+                }
+                value =
+                        opcUaClient.readValue(0, TimestampsToReturn.Both, new 
NodeId(2, "root/db/d1/s1")).get();
+                Assert.assertEquals(new Variant(1.0), value.getValue());
+                Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+                opcUaClient.disconnect().get();
+                break;
+            }
+
+            // Create the region first to avoid tsFile parsing
+            TestUtils.executeNonQueries(
+                    env,
+                    Arrays.asList(
+                            "create aligned timeSeries root.db.opc(value 
double, quality boolean, other int32)",
+                            "insert into root.db.opc(time, value, quality, 
other) values (0, 0, true, 1)"),
+                    null);
+
+            while (true) {
+                final int[] ports = EnvUtils.searchAvailablePorts();
+                tcpPort = ports[0];
+                httpsPort = ports[1];
+                sinkAttributes.put("opcua.tcp.port", 
Integer.toString(tcpPort));
+                sinkAttributes.put("https.port", Integer.toString(httpsPort));
+                sinkAttributes.put("with-quality", "true");
+
+                Assert.assertEquals(
+                        TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+                        client
+                                .alterPipe(
+                                        new TAlterPipeReq()
+                                                .setPipeName("testPipe")
+                                                
.setIsReplaceAllConnectorAttributes(true)
+                                                
.setConnectorAttributes(sinkAttributes)
+                                                
.setProcessorAttributes(Collections.emptyMap())
+                                                
.setExtractorAttributes(Collections.emptyMap()))
+                                .getCode());
+                try {
+                    opcUaClient =
+                            getOpcUaClient(
+                                    "opc.tcp://127.0.0.1:" + tcpPort + 
"/iotdb", SecurityPolicy.None, "root", "root");
+                } catch (final PipeException e) {
+                    if (e.getCause() instanceof ConnectException) {
+                        continue;
+                    } else {
+                        throw e;
+                    }
+                }
+                break;
+            }
+
+            TestUtils.executeNonQuery(
+                    env,
+                    "insert into root.db.opc(time, value, quality, other) 
values (1, 1, false, 1)",
+                    null);
+
+            long startTime = System.currentTimeMillis();
+            while (true) {
+                try {
+                    value =
+                            opcUaClient.readValue(0, TimestampsToReturn.Both, 
new NodeId(2, "root/db/opc")).get();
+                    Assert.assertEquals(new Variant(1.0), value.getValue());
+                    Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
+                    Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+                    break;
+                } catch (final Throwable t) {
+                    if (System.currentTimeMillis() - startTime > 10_000L) {
+                        throw t;
+                    }
+                }
+            }
+
+            TestUtils.executeNonQuery(
+                    env, "insert into root.db.opc(time, quality) values (2, 
true)", null);
+            TestUtils.executeNonQuery(env, "insert into root.db.opc(time, 
value) values (2, 2)", null);
+
+            startTime = System.currentTimeMillis();
+            while (true) {
+                try {
+                    value =
+                            opcUaClient.readValue(0, TimestampsToReturn.Both, 
new NodeId(2, "root/db/opc")).get();
+                    Assert.assertEquals(new DateTime(timestampToUtc(2)), 
value.getSourceTime());
+                    Assert.assertEquals(new Variant(2.0), value.getValue());
+                    Assert.assertEquals(StatusCode.UNCERTAIN, 
value.getStatusCode());
+                    break;
+                } catch (final Throwable t) {
+                    if (System.currentTimeMillis() - startTime > 10_000L) {
+                        throw t;
+                    }
+                }
+            }
+
+            opcUaClient.disconnect().get();
+            Assert.assertEquals(
+                    TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.dropPipe("testPipe").getCode());
+
+            // Test reconstruction
+            sinkAttributes.put("password", "test");
+            sinkAttributes.put("security-policy", "basic256sha256");
+            Assert.assertEquals(
+                    TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+                    client
+                            .createPipe(
+                                    new TCreatePipeReq("testPipe", 
sinkAttributes)
+                                            
.setExtractorAttributes(Collections.emptyMap())
+                                            
.setProcessorAttributes(Collections.emptyMap()))
+                            .getCode());
+
+            // Banned none, only allows basic256sha256
+            final int finalTcpPort = tcpPort;
+            Assert.assertThrows(
+                    PipeException.class,
+                    () ->
+                            getOpcUaClient(
+                                    "opc.tcp://127.0.0.1:" + finalTcpPort + 
"/iotdb",
+                                    SecurityPolicy.None,
+                                    "root",
+                                    "root"));
+
+            // Test conflict
+            sinkAttributes.put("password", "conflict");
+            try {
+                TestUtils.executeNonQuery(
+                        env,
+                        String.format(
+                                "create pipe test1 ('sink'='opc-ua-sink', 
'password'='conflict@pswd', 'tcp.port'='%s', 'https.port'='%s')",
+                                tcpPort, httpsPort),
+                        null);
+                Assert.fail();
+            } catch (final Exception e) {
+                Assert.assertEquals(
+                        String.format(
+                                "org.apache.iotdb.jdbc.IoTDBSQLException: 
1107: The existing server with tcp port %s and https port %s's password **** 
conflicts to the new password ****, reject reusing.",
+                                tcpPort, httpsPort),
+                        e.getMessage());
+            }
+        } finally {
+            if (tcpPort >= 0) {
+                final String lockPath = EnvUtils.getLockFilePath(tcpPort);
+                if (!new File(lockPath).delete()) {
+                    System.out.printf("Delete lock file %s failed%n", 
lockPath);
+                }
+            }
         }
-      }
-
-      opcUaClient.disconnect().get();
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.dropPipe("testPipe").getCode());
-
-      // Test reconstruction
-      sinkAttributes.put("password", "test");
-      sinkAttributes.put("security-policy", "basic256sha256");
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-          client
-              .createPipe(
-                  new TCreatePipeReq("testPipe", sinkAttributes)
-                      .setExtractorAttributes(Collections.emptyMap())
-                      .setProcessorAttributes(Collections.emptyMap()))
-              .getCode());
-
-      // Banned none, only allows basic256sha256
-      Assert.assertThrows(
-          PipeException.class,
-          () ->
-              getOpcUaClient(
-                  "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
-                  SecurityPolicy.None,
-                  "root",
-                  "root"));
-
-      // Test conflict
-      sinkAttributes.put("password", "conflict");
-      try {
-        TestUtils.executeNonQuery(
-            env, "create pipe test1 ('sink'='opc-ua-sink', 
'password'='conflict')", null);
-        Assert.fail();
-      } catch (final Exception e) {
-        Assert.assertEquals(
-            "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing 
server with tcp port 12686 and https port 8443's password **** conflicts to the 
new password ****, reject reusing.",
-            e.getMessage());
-      }
     }
-  }
 
   private static OpcUaClient getOpcUaClient(
       final String nodeUrl,
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 3dcb2d19b0a..a06630cb2a9 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -59,7 +59,9 @@ public class PipeParameters {
   }
 
   public boolean hasAttribute(final String key) {
-    return attributes.containsKey(key) || 
attributes.containsKey(KeyReducer.reduce(key));
+    return attributes.containsKey(key)
+        || attributes.containsKey(KeyReducer.shallowReduce(key))
+        || attributes.containsKey(KeyReducer.reduce(key));
   }
 
   public boolean hasAnyAttributes(final String... keys) {
@@ -76,7 +78,11 @@ public class PipeParameters {
   }
 
   public String getString(final String key) {
-    final String value = attributes.get(key);
+    String value = attributes.get(key);
+    if (Objects.nonNull(value)) {
+      return value;
+    }
+    value = attributes.get(KeyReducer.shallowReduce(key));
     return value != null ? value : attributes.get(KeyReducer.reduce(key));
   }
 
@@ -361,6 +367,19 @@ public class PipeParameters {
       PREFIXES.add("sink.");
     }
 
+    static String shallowReduce(String key) {
+      if (key == null) {
+        return null;
+      }
+      final String lowerCaseKey = key.toLowerCase();
+      for (final String prefix : FIRST_PREFIXES) {
+        if (lowerCaseKey.startsWith(prefix)) {
+          return key.substring(prefix.length());
+        }
+      }
+      return key;
+    }
+
     static String reduce(final String key) {
       if (key == null) {
         return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index 94889be906e..ce63081c71a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -242,7 +242,9 @@ public class IoTDBOpcUaClient {
   }
 
   public void disconnect() throws Exception {
-    client.disconnect().get();
+    if (Objects.nonNull(client)) {
+      client.disconnect().get();
+    }
   }
 
   /////////////////////////////// Getter ///////////////////////////////

Reply via email to