This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 747811016a8 [To dev/1.3] Pipe: Fixed some potential OPC UA problems
(#17393) & Added IT for `` in opc (#17394)
747811016a8 is described below
commit 747811016a854369091fca7e477059df1ea9e04e
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 1 10:15:03 2026 +0800
[To dev/1.3] Pipe: Fixed some potential OPC UA problems (#17393) & Added IT
for `` in opc (#17394)
* fix
* sptls
* fix
* remove-side
* opti
---
.../it/env/cluster/node/AbstractNodeWrapper.java | 3 +-
.../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java | 40 +++++++++++++---------
.../protocol/opcua/server/OpcUaKeyStoreLoader.java | 14 ++++++--
.../sink/protocol/opcua/server/OpcUaNameSpace.java | 7 ++--
.../protocol/opcua/server/OpcUaServerBuilder.java | 1 +
5 files changed, 42 insertions(+), 23 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index e1e013ea961..6c8f2717e71 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -461,7 +461,8 @@ public abstract class AbstractNodeWrapper implements
BaseNodeWrapper {
"-XX:MaxDirectMemorySize=" + jvmConfig.getMaxDirectMemorySize()
+ "m",
"-Djdk.nio.maxCachedBufferSize=262144",
"-D" + IoTDBConstant.INTEGRATION_TEST_KILL_POINTS + "=" +
killPoints.toString(),
- "-Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8",
+ "-Dsun.jnu.encoding=UTF-8",
+ "-Dfile.encoding=UTF-8",
"-cp",
server_node_lib_path));
addStartCmdParams(startCmd);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 599a43476c1..33ee72a61e8 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -70,7 +70,7 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values
(1, 1)", null);
+ TestUtils.executeNonQuery(env, "insert into root.db.d1(time, `1`) values
(1, 1)", null);
final Map<String, String> sinkAttributes = new HashMap<>();
@@ -108,7 +108,9 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
}
}
value =
- opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/d1/s1")).get();
+ opcUaClient
+ .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/d1/`1`"))
+ .get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(new DateTime(timestampToUtc(1)),
value.getSourceTime());
opcUaClient.disconnect().get();
@@ -119,12 +121,12 @@ public class IoTDBPipeOPCUAIT extends
AbstractPipeSingleIT {
TestUtils.executeNonQueries(
env,
Arrays.asList(
- "create aligned timeSeries root.db.opc(value double, quality
boolean, other int32)",
- "create aligned timeSeries root.db.opc1(value double, quality
boolean, other int32)",
- "create aligned timeSeries root.db.opc2(value double, quality
boolean, other int32)",
- "insert into root.db.opc(time, value, quality, other) values (0,
0, true, 1)",
- "insert into root.db.opc1(time, value, quality, other) values
(0, 0, true, 1)",
- "insert into root.db.opc2(time, value, quality, other) values
(0, 0, true, 1)"),
+ "create aligned timeSeries root.db.`123`(value double, quality
boolean, other int32)",
+ "create aligned timeSeries root.db.`1231`(value double, quality
boolean, other int32)",
+ "create aligned timeSeries root.db.`1232`(value double, quality
boolean, other int32)",
+ "insert into root.db.`123`(time, value, quality, other) values
(0, 0, true, 1)",
+ "insert into root.db.`1231`(time, value, quality, other) values
(0, 0, true, 1)",
+ "insert into root.db.`1232`(time, value, quality, other) values
(0, 0, true, 1)"),
null);
while (true) {
@@ -164,23 +166,25 @@ public class IoTDBPipeOPCUAIT extends
AbstractPipeSingleIT {
TestUtils.executeNonQueries(
env,
Arrays.asList(
- "insert into root.db.opc(time, value, quality, other) values (1,
1, false, 1)",
- "insert into root.db.opc1(time, value, quality, other) values
(1, 1, false, 1)",
- "insert into root.db.opc2(time, value, quality, other) values
(1, 1, false, 1)"),
+ "insert into root.db.`123`(time, value, quality, other) values
(1, 1, false, 1)",
+ "insert into root.db.`1231`(time, value, quality, other) values
(1, 1, false, 1)",
+ "insert into root.db.`1232`(time, value, quality, other) values
(1, 1, false, 1)"),
null);
long startTime = System.currentTimeMillis();
while (true) {
try {
value =
- opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/opc")).get();
+ opcUaClient
+ .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/`123`"))
+ .get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
Assert.assertEquals(new DateTime(timestampToUtc(1)),
value.getSourceTime());
value =
opcUaClient
- .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/opc1"))
+ .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/`1231`"))
.get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
@@ -188,7 +192,7 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
value =
opcUaClient
- .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/opc2"))
+ .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/`1232`"))
.get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
@@ -202,14 +206,16 @@ public class IoTDBPipeOPCUAIT extends
AbstractPipeSingleIT {
}
TestUtils.executeNonQuery(
- env, "insert into root.db.opc(time, quality) values (2, true)",
null);
- TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value)
values (2, 2)", null);
+ env, "insert into root.db.`123`(time, quality) values (2, true)",
null);
+ TestUtils.executeNonQuery(env, "insert into root.db.`123`(time, value)
values (2, 2)", null);
startTime = System.currentTimeMillis();
while (true) {
try {
value =
- opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/opc")).get();
+ opcUaClient
+ .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/`123`"))
+ .get();
Assert.assertEquals(new DateTime(timestampToUtc(2)),
value.getSourceTime());
Assert.assertEquals(new Variant(2.0), value.getValue());
Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
index 56b231fb460..16f176a3c53 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.Key;
@@ -61,8 +63,8 @@ class OpcUaKeyStoreLoader {
LOGGER.info("Loading KeyStore at {}", serverKeyStore);
if (serverKeyStore.exists()) {
- try {
- keyStore.load(Files.newInputStream(serverKeyStore.toPath()), password);
+ try (InputStream is = Files.newInputStream(serverKeyStore.toPath())) {
+ keyStore.load(is, password);
} catch (final IOException e) {
LOGGER.warn("Load keyStore failed, the existing keyStore may be stale,
re-constructing...");
FileUtils.deleteFileOrDirectory(serverKeyStore);
@@ -105,7 +107,9 @@ class OpcUaKeyStoreLoader {
keyStore.setKeyEntry(
SERVER_ALIAS, keyPair.getPrivate(), password, new X509Certificate[]
{certificate});
- keyStore.store(Files.newOutputStream(serverKeyStore.toPath()), password);
+ try (final OutputStream os =
Files.newOutputStream(serverKeyStore.toPath())) {
+ keyStore.store(os, password);
+ }
}
final Key serverPrivateKey = keyStore.getKey(SERVER_ALIAS, password);
@@ -114,6 +118,10 @@ class OpcUaKeyStoreLoader {
final PublicKey serverPublicKey = serverCertificate.getPublicKey();
serverKeyPair = new KeyPair(serverPublicKey, (PrivateKey)
serverPrivateKey);
+ } else {
+ throw new Exception(
+ "Invalid keyStore, the serverPrivateKey is "
+ + (serverPrivateKey != null ?
serverPrivateKey.getClass().getSimpleName() : "null"));
}
return this;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 04cc251d655..43144cbdd15 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -91,8 +91,11 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
@Override
public void shutdown() {
- getServer().shutdown();
- builder.close();
+ try {
+ getServer().shutdown();
+ } finally {
+ builder.close();
+ }
}
});
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
index 61818ecf852..f029031b617 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
@@ -117,6 +117,7 @@ public class OpcUaServerBuilder implements Closeable {
return this;
}
+ // Must be a modifiable set.
public OpcUaServerBuilder setSecurityPolicies(final Set<SecurityPolicy>
securityPolicies) {
this.securityPolicies = securityPolicies;
return this;