This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch cp-opc-client in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b6cf835cc2929460b327f2995c48abda4cffa323 Author: Caideyipi <[email protected]> AuthorDate: Fri Jan 23 09:46:25 2026 +0800 Optimized the logger when table does not exist in DN heartbeat && Pipe: Fixed the OPC UA Sink key getter logic and potentail NPE when closing client && Load: Fixed the missing schema writing for "root" table (#17063) * root-fix * f * fix * rest * spls * gsa * fix (cherry picked from commit 5101489d4126b40120d72d49ee5a58edd23aec22) --- .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java | 313 ++++++++++++--------- .../api/customizer/parameter/PipeParameters.java | 23 +- .../protocol/opcua/client/IoTDBOpcUaClient.java | 4 +- 3 files changed, 211 insertions(+), 129 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java index 4b4c69b951d..640336d6f0d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java @@ -49,6 +49,8 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import java.io.File; +import java.net.ConnectException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -61,134 +63,193 @@ import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT1.class}) public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT { - @Test - public void testOPCUAServerSink() throws Exception { - try (final SyncConfigNodeIServiceClient client = - (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) { - - TestUtils.executeNonQuery(env, "insert into root.db.d1(time,s1) values (1,1)", null); - - final Map<String, String> sinkAttributes = new HashMap<>(); - - sinkAttributes.put("sink", "opc-ua-sink"); - sinkAttributes.put("opcua.model", "client-server"); - sinkAttributes.put("security-policy", "None"); - - final int[] ports = EnvUtils.searchAvailablePorts(); - final int tcpPort = ports[0]; - final int httpsPort = ports[1]; - sinkAttributes.put("tcp.port", Integer.toString(tcpPort)); - sinkAttributes.put("https.port", Integer.toString(httpsPort)); - - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - client - .createPipe( - new TCreatePipeReq("testPipe", sinkAttributes) - .setExtractorAttributes(Collections.singletonMap("user", "root")) - .setProcessorAttributes(Collections.emptyMap())) - .getCode()); - - final OpcUaClient opcUaClient = - getOpcUaClient( - "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root"); - DataValue value = - opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get(); - Assert.assertEquals(new Variant(1.0), value.getValue()); - Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); - - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - client - .alterPipe( - new TAlterPipeReq() - .setPipeName("testPipe") - .setIsReplaceAllConnectorAttributes(false) - .setConnectorAttributes(Collections.singletonMap("with-quality", "true")) - .setProcessorAttributes(Collections.emptyMap()) - .setExtractorAttributes(Collections.emptyMap())) - .getCode()); - - TestUtils.executeNonQuery( - env, - "insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)", - null); - - long startTime = System.currentTimeMillis(); - while (true) { - try { - value = - opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get(); - Assert.assertEquals(new Variant(1.0), value.getValue()); - Assert.assertEquals(StatusCode.BAD, value.getStatusCode()); - Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); - break; - } catch (final Throwable t) { - if (System.currentTimeMillis() - startTime > 10_000L) { - throw t; - } - } - } - - TestUtils.executeNonQuery( - env, "insert into root.db.opc(time, quality) values (2, true)", null); - TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null); - - startTime = System.currentTimeMillis(); - while (true) { - try { - value = - opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get(); - Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime()); - Assert.assertEquals(new Variant(2.0), value.getValue()); - Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode()); - break; - } catch (final Throwable t) { - if (System.currentTimeMillis() - startTime > 10_000L) { - throw t; - } + @Test + public void testOPCUAServerSink() throws Exception { + int tcpPort = -1; + int httpsPort = -1; + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) { + + TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null); + + final Map<String, String> sinkAttributes = new HashMap<>(); + + sinkAttributes.put("sink", "opc-ua-sink"); + sinkAttributes.put("model", "client-server"); + sinkAttributes.put("opcua.security-policy", "None"); + + OpcUaClient opcUaClient; + DataValue value; + while (true) { + final int[] ports = EnvUtils.searchAvailablePorts(); + tcpPort = ports[0]; + httpsPort = ports[1]; + sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort)); + sinkAttributes.put("https.port", Integer.toString(httpsPort)); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client + .createPipe( + new TCreatePipeReq("testPipe", sinkAttributes) + .setExtractorAttributes(Collections.singletonMap("user", "root")) + .setProcessorAttributes(Collections.emptyMap())) + .getCode()); + + try { + opcUaClient = + getOpcUaClient( + "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root"); + } catch (final PipeException e) { + if (e.getCause() instanceof ConnectException) { + continue; + } else { + throw e; + } + } + value = + opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get(); + Assert.assertEquals(new Variant(1.0), value.getValue()); + Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + opcUaClient.disconnect().get(); + break; + } + + // Create the region first to avoid tsFile parsing + TestUtils.executeNonQueries( + env, + Arrays.asList( + "create aligned timeSeries root.db.opc(value double, quality boolean, other int32)", + "insert into root.db.opc(time, value, quality, other) values (0, 0, true, 1)"), + null); + + while (true) { + final int[] ports = EnvUtils.searchAvailablePorts(); + tcpPort = ports[0]; + httpsPort = ports[1]; + sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort)); + sinkAttributes.put("https.port", Integer.toString(httpsPort)); + sinkAttributes.put("with-quality", "true"); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client + .alterPipe( + new TAlterPipeReq() + .setPipeName("testPipe") + .setIsReplaceAllConnectorAttributes(true) + .setConnectorAttributes(sinkAttributes) + .setProcessorAttributes(Collections.emptyMap()) + .setExtractorAttributes(Collections.emptyMap())) + .getCode()); + try { + opcUaClient = + getOpcUaClient( + "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root"); + } catch (final PipeException e) { + if (e.getCause() instanceof ConnectException) { + continue; + } else { + throw e; + } + } + break; + } + + TestUtils.executeNonQuery( + env, + "insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)", + null); + + long startTime = System.currentTimeMillis(); + while (true) { + try { + value = + opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get(); + Assert.assertEquals(new Variant(1.0), value.getValue()); + Assert.assertEquals(StatusCode.BAD, value.getStatusCode()); + Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + break; + } catch (final Throwable t) { + if (System.currentTimeMillis() - startTime > 10_000L) { + throw t; + } + } + } + + TestUtils.executeNonQuery( + env, "insert into root.db.opc(time, quality) values (2, true)", null); + TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null); + + startTime = System.currentTimeMillis(); + while (true) { + try { + value = + opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get(); + Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime()); + Assert.assertEquals(new Variant(2.0), value.getValue()); + Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode()); + break; + } catch (final Throwable t) { + if (System.currentTimeMillis() - startTime > 10_000L) { + throw t; + } + } + } + + opcUaClient.disconnect().get(); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode()); + + // Test reconstruction + sinkAttributes.put("password", "test"); + sinkAttributes.put("security-policy", "basic256sha256"); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client + .createPipe( + new TCreatePipeReq("testPipe", sinkAttributes) + .setExtractorAttributes(Collections.emptyMap()) + .setProcessorAttributes(Collections.emptyMap())) + .getCode()); + + // Banned none, only allows basic256sha256 + final int finalTcpPort = tcpPort; + Assert.assertThrows( + PipeException.class, + () -> + getOpcUaClient( + "opc.tcp://127.0.0.1:" + finalTcpPort + "/iotdb", + SecurityPolicy.None, + "root", + "root")); + + // Test conflict + sinkAttributes.put("password", "conflict"); + try { + TestUtils.executeNonQuery( + env, + String.format( + "create pipe test1 ('sink'='opc-ua-sink', 'password'='conflict@pswd', 'tcp.port'='%s', 'https.port'='%s')", + tcpPort, httpsPort), + null); + Assert.fail(); + } catch (final Exception e) { + Assert.assertEquals( + String.format( + "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port %s and https port %s's password **** conflicts to the new password ****, reject reusing.", + tcpPort, httpsPort), + e.getMessage()); + } + } finally { + if (tcpPort >= 0) { + final String lockPath = EnvUtils.getLockFilePath(tcpPort); + if (!new File(lockPath).delete()) { + System.out.printf("Delete lock file %s failed%n", lockPath); + } + } } - } - - opcUaClient.disconnect().get(); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode()); - - // Test reconstruction - sinkAttributes.put("password", "test"); - sinkAttributes.put("security-policy", "basic256sha256"); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - client - .createPipe( - new TCreatePipeReq("testPipe", sinkAttributes) - .setExtractorAttributes(Collections.emptyMap()) - .setProcessorAttributes(Collections.emptyMap())) - .getCode()); - - // Banned none, only allows basic256sha256 - Assert.assertThrows( - PipeException.class, - () -> - getOpcUaClient( - "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", - SecurityPolicy.None, - "root", - "root")); - - // Test conflict - sinkAttributes.put("password", "conflict"); - try { - TestUtils.executeNonQuery( - env, "create pipe test1 ('sink'='opc-ua-sink', 'password'='conflict')", null); - Assert.fail(); - } catch (final Exception e) { - Assert.assertEquals( - "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port 12686 and https port 8443's password **** conflicts to the new password ****, reject reusing.", - e.getMessage()); - } } - } private static OpcUaClient getOpcUaClient( final String nodeUrl, diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index 3dcb2d19b0a..a06630cb2a9 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -59,7 +59,9 @@ public class PipeParameters { } public boolean hasAttribute(final String key) { - return attributes.containsKey(key) || attributes.containsKey(KeyReducer.reduce(key)); + return attributes.containsKey(key) + || attributes.containsKey(KeyReducer.shallowReduce(key)) + || attributes.containsKey(KeyReducer.reduce(key)); } public boolean hasAnyAttributes(final String... keys) { @@ -76,7 +78,11 @@ public class PipeParameters { } public String getString(final String key) { - final String value = attributes.get(key); + String value = attributes.get(key); + if (Objects.nonNull(value)) { + return value; + } + value = attributes.get(KeyReducer.shallowReduce(key)); return value != null ? value : attributes.get(KeyReducer.reduce(key)); } @@ -361,6 +367,19 @@ public class PipeParameters { PREFIXES.add("sink."); } + static String shallowReduce(String key) { + if (key == null) { + return null; + } + final String lowerCaseKey = key.toLowerCase(); + for (final String prefix : FIRST_PREFIXES) { + if (lowerCaseKey.startsWith(prefix)) { + return key.substring(prefix.length()); + } + } + return key; + } + static String reduce(final String key) { if (key == null) { return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java index 94889be906e..ce63081c71a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java @@ -242,7 +242,9 @@ public class IoTDBOpcUaClient { } public void disconnect() throws Exception { - client.disconnect().get(); + if (Objects.nonNull(client)) { + client.disconnect().get(); + } } /////////////////////////////// Getter ///////////////////////////////
