This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 68fc992af53 Pipe: Fixed some potential OPC UA problems & Added IT for
`` in opc (#17393)
68fc992af53 is described below
commit 68fc992af536b9ba10965d440bb3d1b699ec820b
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 1 10:14:53 2026 +0800
Pipe: Fixed some potential OPC UA problems & Added IT for `` in opc (#17393)
* fix
* Update OpcUaKeyStoreLoader.java
* fix
* remove-side
* opti
---
.../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java | 40 +++++++++++++---------
.../protocol/opcua/server/OpcUaKeyStoreLoader.java | 14 ++++++--
.../sink/protocol/opcua/server/OpcUaNameSpace.java | 7 ++--
.../protocol/opcua/server/OpcUaServerBuilder.java | 1 +
4 files changed, 40 insertions(+), 22 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index d0c8186b561..b1d0a4dda73 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -89,7 +89,7 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values
(1, 1)", null);
+ TestUtils.executeNonQuery(env, "insert into root.db.d1(time, `1`) values
(1, 1)", null);
final Map<String, String> sinkAttributes = new HashMap<>();
@@ -127,7 +127,9 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
}
}
value =
- opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/d1/s1")).get();
+ opcUaClient
+ .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/d1/`1`"))
+ .get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(new DateTime(timestampToUtc(1)),
value.getSourceTime());
opcUaClient.disconnect().get();
@@ -138,12 +140,12 @@ public class IoTDBPipeOPCUAIT extends
AbstractPipeSingleIT {
TestUtils.executeNonQueries(
env,
Arrays.asList(
- "create aligned timeSeries root.db.opc(value double, quality
boolean, other int32)",
- "create aligned timeSeries root.db.opc1(value double, quality
boolean, other int32)",
- "create aligned timeSeries root.db.opc2(value double, quality
boolean, other int32)",
- "insert into root.db.opc(time, value, quality, other) values (0,
0, true, 1)",
- "insert into root.db.opc1(time, value, quality, other) values
(0, 0, true, 1)",
- "insert into root.db.opc2(time, value, quality, other) values
(0, 0, true, 1)"),
+ "create aligned timeSeries root.db.`123`(value double, quality
boolean, other int32)",
+ "create aligned timeSeries root.db.`1231`(value double, quality
boolean, other int32)",
+ "create aligned timeSeries root.db.`1232`(value double, quality
boolean, other int32)",
+ "insert into root.db.`123`(time, value, quality, other) values
(0, 0, true, 1)",
+ "insert into root.db.`1231`(time, value, quality, other) values
(0, 0, true, 1)",
+ "insert into root.db.`1232`(time, value, quality, other) values
(0, 0, true, 1)"),
null);
while (true) {
@@ -183,23 +185,25 @@ public class IoTDBPipeOPCUAIT extends
AbstractPipeSingleIT {
TestUtils.executeNonQueries(
env,
Arrays.asList(
- "insert into root.db.opc(time, value, quality, other) values (1,
1, false, 1)",
- "insert into root.db.opc1(time, value, quality, other) values
(1, 1, false, 1)",
- "insert into root.db.opc2(time, value, quality, other) values
(1, 1, false, 1)"),
+ "insert into root.db.`123`(time, value, quality, other) values
(1, 1, false, 1)",
+ "insert into root.db.`1231`(time, value, quality, other) values
(1, 1, false, 1)",
+ "insert into root.db.`1232`(time, value, quality, other) values
(1, 1, false, 1)"),
null);
long startTime = System.currentTimeMillis();
while (true) {
try {
value =
- opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/opc")).get();
+ opcUaClient
+ .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/`123`"))
+ .get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
Assert.assertEquals(new DateTime(timestampToUtc(1)),
value.getSourceTime());
value =
opcUaClient
- .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/opc1"))
+ .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/`1231`"))
.get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
@@ -207,7 +211,7 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
value =
opcUaClient
- .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/opc2"))
+ .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/`1232`"))
.get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
@@ -221,14 +225,16 @@ public class IoTDBPipeOPCUAIT extends
AbstractPipeSingleIT {
}
TestUtils.executeNonQuery(
- env, "insert into root.db.opc(time, quality) values (2, true)",
null);
- TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value)
values (2, 2)", null);
+ env, "insert into root.db.`123`(time, quality) values (2, true)",
null);
+ TestUtils.executeNonQuery(env, "insert into root.db.`123`(time, value)
values (2, 2)", null);
startTime = System.currentTimeMillis();
while (true) {
try {
value =
- opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/opc")).get();
+ opcUaClient
+ .readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/`123`"))
+ .get();
Assert.assertEquals(new DateTime(timestampToUtc(2)),
value.getSourceTime());
Assert.assertEquals(new Variant(2.0), value.getValue());
Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
index 56b231fb460..16f176a3c53 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.Key;
@@ -61,8 +63,8 @@ class OpcUaKeyStoreLoader {
LOGGER.info("Loading KeyStore at {}", serverKeyStore);
if (serverKeyStore.exists()) {
- try {
- keyStore.load(Files.newInputStream(serverKeyStore.toPath()), password);
+ try (InputStream is = Files.newInputStream(serverKeyStore.toPath())) {
+ keyStore.load(is, password);
} catch (final IOException e) {
LOGGER.warn("Load keyStore failed, the existing keyStore may be stale,
re-constructing...");
FileUtils.deleteFileOrDirectory(serverKeyStore);
@@ -105,7 +107,9 @@ class OpcUaKeyStoreLoader {
keyStore.setKeyEntry(
SERVER_ALIAS, keyPair.getPrivate(), password, new X509Certificate[]
{certificate});
- keyStore.store(Files.newOutputStream(serverKeyStore.toPath()), password);
+ try (final OutputStream os =
Files.newOutputStream(serverKeyStore.toPath())) {
+ keyStore.store(os, password);
+ }
}
final Key serverPrivateKey = keyStore.getKey(SERVER_ALIAS, password);
@@ -114,6 +118,10 @@ class OpcUaKeyStoreLoader {
final PublicKey serverPublicKey = serverCertificate.getPublicKey();
serverKeyPair = new KeyPair(serverPublicKey, (PrivateKey)
serverPrivateKey);
+ } else {
+ throw new Exception(
+ "Invalid keyStore, the serverPrivateKey is "
+ + (serverPrivateKey != null ?
serverPrivateKey.getClass().getSimpleName() : "null"));
}
return this;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 0969cea574d..3c79a3aa304 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -95,8 +95,11 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
@Override
public void shutdown() {
- getServer().shutdown();
- builder.close();
+ try {
+ getServer().shutdown();
+ } finally {
+ builder.close();
+ }
}
});
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
index 61818ecf852..f029031b617 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
@@ -117,6 +117,7 @@ public class OpcUaServerBuilder implements Closeable {
return this;
}
+ // Must be a modifiable set.
public OpcUaServerBuilder setSecurityPolicies(final Set<SecurityPolicy>
securityPolicies) {
this.securityPolicies = securityPolicies;
return this;