This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 747811016a8 [To dev/1.3] Pipe: Fixed some potential OPC UA problems 
(#17393) & Added IT for `` in opc (#17394)
747811016a8 is described below

commit 747811016a854369091fca7e477059df1ea9e04e
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 1 10:15:03 2026 +0800

    [To dev/1.3] Pipe: Fixed some potential OPC UA problems (#17393) & Added IT 
for `` in opc (#17394)
    
    * fix
    
    * sptls
    
    * fix
    
    * remove-side
    
    * opti
---
 .../it/env/cluster/node/AbstractNodeWrapper.java   |  3 +-
 .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java     | 40 +++++++++++++---------
 .../protocol/opcua/server/OpcUaKeyStoreLoader.java | 14 ++++++--
 .../sink/protocol/opcua/server/OpcUaNameSpace.java |  7 ++--
 .../protocol/opcua/server/OpcUaServerBuilder.java  |  1 +
 5 files changed, 42 insertions(+), 23 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index e1e013ea961..6c8f2717e71 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -461,7 +461,8 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
               "-XX:MaxDirectMemorySize=" + jvmConfig.getMaxDirectMemorySize() 
+ "m",
               "-Djdk.nio.maxCachedBufferSize=262144",
               "-D" + IoTDBConstant.INTEGRATION_TEST_KILL_POINTS + "=" + 
killPoints.toString(),
-              "-Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8",
+              "-Dsun.jnu.encoding=UTF-8",
+              "-Dfile.encoding=UTF-8",
               "-cp",
               server_node_lib_path));
       addStartCmdParams(startCmd);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 599a43476c1..33ee72a61e8 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -70,7 +70,7 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
 
-      TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values 
(1, 1)", null);
+      TestUtils.executeNonQuery(env, "insert into root.db.d1(time, `1`) values 
(1, 1)", null);
 
       final Map<String, String> sinkAttributes = new HashMap<>();
 
@@ -108,7 +108,9 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
           }
         }
         value =
-            opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/d1/s1")).get();
+            opcUaClient
+                .readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/d1/`1`"))
+                .get();
         Assert.assertEquals(new Variant(1.0), value.getValue());
         Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
         opcUaClient.disconnect().get();
@@ -119,12 +121,12 @@ public class IoTDBPipeOPCUAIT extends 
AbstractPipeSingleIT {
       TestUtils.executeNonQueries(
           env,
           Arrays.asList(
-              "create aligned timeSeries root.db.opc(value double, quality 
boolean, other int32)",
-              "create aligned timeSeries root.db.opc1(value double, quality 
boolean, other int32)",
-              "create aligned timeSeries root.db.opc2(value double, quality 
boolean, other int32)",
-              "insert into root.db.opc(time, value, quality, other) values (0, 
0, true, 1)",
-              "insert into root.db.opc1(time, value, quality, other) values 
(0, 0, true, 1)",
-              "insert into root.db.opc2(time, value, quality, other) values 
(0, 0, true, 1)"),
+              "create aligned timeSeries root.db.`123`(value double, quality 
boolean, other int32)",
+              "create aligned timeSeries root.db.`1231`(value double, quality 
boolean, other int32)",
+              "create aligned timeSeries root.db.`1232`(value double, quality 
boolean, other int32)",
+              "insert into root.db.`123`(time, value, quality, other) values 
(0, 0, true, 1)",
+              "insert into root.db.`1231`(time, value, quality, other) values 
(0, 0, true, 1)",
+              "insert into root.db.`1232`(time, value, quality, other) values 
(0, 0, true, 1)"),
           null);
 
       while (true) {
@@ -164,23 +166,25 @@ public class IoTDBPipeOPCUAIT extends 
AbstractPipeSingleIT {
       TestUtils.executeNonQueries(
           env,
           Arrays.asList(
-              "insert into root.db.opc(time, value, quality, other) values (1, 
1, false, 1)",
-              "insert into root.db.opc1(time, value, quality, other) values 
(1, 1, false, 1)",
-              "insert into root.db.opc2(time, value, quality, other) values 
(1, 1, false, 1)"),
+              "insert into root.db.`123`(time, value, quality, other) values 
(1, 1, false, 1)",
+              "insert into root.db.`1231`(time, value, quality, other) values 
(1, 1, false, 1)",
+              "insert into root.db.`1232`(time, value, quality, other) values 
(1, 1, false, 1)"),
           null);
 
       long startTime = System.currentTimeMillis();
       while (true) {
         try {
           value =
-              opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc")).get();
+              opcUaClient
+                  .readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/`123`"))
+                  .get();
           Assert.assertEquals(new Variant(1.0), value.getValue());
           Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
           Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
 
           value =
               opcUaClient
-                  .readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc1"))
+                  .readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/`1231`"))
                   .get();
           Assert.assertEquals(new Variant(1.0), value.getValue());
           Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
@@ -188,7 +192,7 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
 
           value =
               opcUaClient
-                  .readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc2"))
+                  .readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/`1232`"))
                   .get();
           Assert.assertEquals(new Variant(1.0), value.getValue());
           Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
@@ -202,14 +206,16 @@ public class IoTDBPipeOPCUAIT extends 
AbstractPipeSingleIT {
       }
 
       TestUtils.executeNonQuery(
-          env, "insert into root.db.opc(time, quality) values (2, true)", 
null);
-      TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) 
values (2, 2)", null);
+          env, "insert into root.db.`123`(time, quality) values (2, true)", 
null);
+      TestUtils.executeNonQuery(env, "insert into root.db.`123`(time, value) 
values (2, 2)", null);
 
       startTime = System.currentTimeMillis();
       while (true) {
         try {
           value =
-              opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc")).get();
+              opcUaClient
+                  .readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/`123`"))
+                  .get();
           Assert.assertEquals(new DateTime(timestampToUtc(2)), 
value.getSourceTime());
           Assert.assertEquals(new Variant(2.0), value.getValue());
           Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
index 56b231fb460..16f176a3c53 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.Key;
@@ -61,8 +63,8 @@ class OpcUaKeyStoreLoader {
     LOGGER.info("Loading KeyStore at {}", serverKeyStore);
 
     if (serverKeyStore.exists()) {
-      try {
-        keyStore.load(Files.newInputStream(serverKeyStore.toPath()), password);
+      try (InputStream is = Files.newInputStream(serverKeyStore.toPath())) {
+        keyStore.load(is, password);
       } catch (final IOException e) {
         LOGGER.warn("Load keyStore failed, the existing keyStore may be stale, 
re-constructing...");
         FileUtils.deleteFileOrDirectory(serverKeyStore);
@@ -105,7 +107,9 @@ class OpcUaKeyStoreLoader {
 
       keyStore.setKeyEntry(
           SERVER_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] 
{certificate});
-      keyStore.store(Files.newOutputStream(serverKeyStore.toPath()), password);
+      try (final OutputStream os = 
Files.newOutputStream(serverKeyStore.toPath())) {
+        keyStore.store(os, password);
+      }
     }
 
     final Key serverPrivateKey = keyStore.getKey(SERVER_ALIAS, password);
@@ -114,6 +118,10 @@ class OpcUaKeyStoreLoader {
 
       final PublicKey serverPublicKey = serverCertificate.getPublicKey();
       serverKeyPair = new KeyPair(serverPublicKey, (PrivateKey) 
serverPrivateKey);
+    } else {
+      throw new Exception(
+          "Invalid keyStore, the serverPrivateKey is "
+              + (serverPrivateKey != null ? 
serverPrivateKey.getClass().getSimpleName() : "null"));
     }
 
     return this;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 04cc251d655..43144cbdd15 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -91,8 +91,11 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
 
               @Override
               public void shutdown() {
-                getServer().shutdown();
-                builder.close();
+                try {
+                  getServer().shutdown();
+                } finally {
+                  builder.close();
+                }
               }
             });
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
index 61818ecf852..f029031b617 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
@@ -117,6 +117,7 @@ public class OpcUaServerBuilder implements Closeable {
     return this;
   }
 
+  // Must be a modifiable set.
   public OpcUaServerBuilder setSecurityPolicies(final Set<SecurityPolicy> 
securityPolicies) {
     this.securityPolicies = securityPolicies;
     return this;

Reply via email to