This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch cp-opc-client
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c89256db2378d90f2f742c26bcb5885789463883
Author: Caideyipi <[email protected]>
AuthorDate: Thu Mar 26 16:58:23 2026 +0800

    spt
---
 .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java     | 336 ++++++++++-----------
 1 file changed, 168 insertions(+), 168 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 640336d6f0d..a5443561ddc 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -63,193 +63,193 @@ import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT1.class})
 public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
-    @Test
-    public void testOPCUAServerSink() throws Exception {
-        int tcpPort = -1;
-        int httpsPort = -1;
-        try (final SyncConfigNodeIServiceClient client =
-                     (SyncConfigNodeIServiceClient) 
env.getLeaderConfigNodeConnection()) {
+  @Test
+  public void testOPCUAServerSink() throws Exception {
+    int tcpPort = -1;
+    int httpsPort = -1;
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
 
-            TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) 
values (1, 1)", null);
+      TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values 
(1, 1)", null);
 
-            final Map<String, String> sinkAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
 
-            sinkAttributes.put("sink", "opc-ua-sink");
-            sinkAttributes.put("model", "client-server");
-            sinkAttributes.put("opcua.security-policy", "None");
+      sinkAttributes.put("sink", "opc-ua-sink");
+      sinkAttributes.put("model", "client-server");
+      sinkAttributes.put("opcua.security-policy", "None");
 
-            OpcUaClient opcUaClient;
-            DataValue value;
-            while (true) {
-                final int[] ports = EnvUtils.searchAvailablePorts();
-                tcpPort = ports[0];
-                httpsPort = ports[1];
-                sinkAttributes.put("opcua.tcp.port", 
Integer.toString(tcpPort));
-                sinkAttributes.put("https.port", Integer.toString(httpsPort));
+      OpcUaClient opcUaClient;
+      DataValue value;
+      while (true) {
+        final int[] ports = EnvUtils.searchAvailablePorts();
+        tcpPort = ports[0];
+        httpsPort = ports[1];
+        sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
+        sinkAttributes.put("https.port", Integer.toString(httpsPort));
 
-                Assert.assertEquals(
-                        TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-                        client
-                                .createPipe(
-                                        new TCreatePipeReq("testPipe", 
sinkAttributes)
-                                                
.setExtractorAttributes(Collections.singletonMap("user", "root"))
-                                                
.setProcessorAttributes(Collections.emptyMap()))
-                                .getCode());
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+            client
+                .createPipe(
+                    new TCreatePipeReq("testPipe", sinkAttributes)
+                        
.setExtractorAttributes(Collections.singletonMap("user", "root"))
+                        .setProcessorAttributes(Collections.emptyMap()))
+                .getCode());
 
-                try {
-                    opcUaClient =
-                            getOpcUaClient(
-                                    "opc.tcp://127.0.0.1:" + tcpPort + 
"/iotdb", SecurityPolicy.None, "root", "root");
-                } catch (final PipeException e) {
-                    if (e.getCause() instanceof ConnectException) {
-                        continue;
-                    } else {
-                        throw e;
-                    }
-                }
-                value =
-                        opcUaClient.readValue(0, TimestampsToReturn.Both, new 
NodeId(2, "root/db/d1/s1")).get();
-                Assert.assertEquals(new Variant(1.0), value.getValue());
-                Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
-                opcUaClient.disconnect().get();
-                break;
-            }
+        try {
+          opcUaClient =
+              getOpcUaClient(
+                  "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", 
SecurityPolicy.None, "root", "root");
+        } catch (final PipeException e) {
+          if (e.getCause() instanceof ConnectException) {
+            continue;
+          } else {
+            throw e;
+          }
+        }
+        value =
+            opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/d1/s1")).get();
+        Assert.assertEquals(new Variant(1.0), value.getValue());
+        Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+        opcUaClient.disconnect().get();
+        break;
+      }
 
-            // Create the region first to avoid tsFile parsing
-            TestUtils.executeNonQueries(
-                    env,
-                    Arrays.asList(
-                            "create aligned timeSeries root.db.opc(value 
double, quality boolean, other int32)",
-                            "insert into root.db.opc(time, value, quality, 
other) values (0, 0, true, 1)"),
-                    null);
+      // Create the region first to avoid tsFile parsing
+      TestUtils.executeNonQueries(
+          env,
+          Arrays.asList(
+              "create aligned timeSeries root.db.opc(value double, quality 
boolean, other int32)",
+              "insert into root.db.opc(time, value, quality, other) values (0, 
0, true, 1)"),
+          null);
 
-            while (true) {
-                final int[] ports = EnvUtils.searchAvailablePorts();
-                tcpPort = ports[0];
-                httpsPort = ports[1];
-                sinkAttributes.put("opcua.tcp.port", 
Integer.toString(tcpPort));
-                sinkAttributes.put("https.port", Integer.toString(httpsPort));
-                sinkAttributes.put("with-quality", "true");
+      while (true) {
+        final int[] ports = EnvUtils.searchAvailablePorts();
+        tcpPort = ports[0];
+        httpsPort = ports[1];
+        sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
+        sinkAttributes.put("https.port", Integer.toString(httpsPort));
+        sinkAttributes.put("with-quality", "true");
 
-                Assert.assertEquals(
-                        TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-                        client
-                                .alterPipe(
-                                        new TAlterPipeReq()
-                                                .setPipeName("testPipe")
-                                                
.setIsReplaceAllConnectorAttributes(true)
-                                                
.setConnectorAttributes(sinkAttributes)
-                                                
.setProcessorAttributes(Collections.emptyMap())
-                                                
.setExtractorAttributes(Collections.emptyMap()))
-                                .getCode());
-                try {
-                    opcUaClient =
-                            getOpcUaClient(
-                                    "opc.tcp://127.0.0.1:" + tcpPort + 
"/iotdb", SecurityPolicy.None, "root", "root");
-                } catch (final PipeException e) {
-                    if (e.getCause() instanceof ConnectException) {
-                        continue;
-                    } else {
-                        throw e;
-                    }
-                }
-                break;
-            }
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+            client
+                .alterPipe(
+                    new TAlterPipeReq()
+                        .setPipeName("testPipe")
+                        .setIsReplaceAllConnectorAttributes(true)
+                        .setConnectorAttributes(sinkAttributes)
+                        .setProcessorAttributes(Collections.emptyMap())
+                        .setExtractorAttributes(Collections.emptyMap()))
+                .getCode());
+        try {
+          opcUaClient =
+              getOpcUaClient(
+                  "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", 
SecurityPolicy.None, "root", "root");
+        } catch (final PipeException e) {
+          if (e.getCause() instanceof ConnectException) {
+            continue;
+          } else {
+            throw e;
+          }
+        }
+        break;
+      }
 
-            TestUtils.executeNonQuery(
-                    env,
-                    "insert into root.db.opc(time, value, quality, other) 
values (1, 1, false, 1)",
-                    null);
+      TestUtils.executeNonQuery(
+          env,
+          "insert into root.db.opc(time, value, quality, other) values (1, 1, 
false, 1)",
+          null);
 
-            long startTime = System.currentTimeMillis();
-            while (true) {
-                try {
-                    value =
-                            opcUaClient.readValue(0, TimestampsToReturn.Both, 
new NodeId(2, "root/db/opc")).get();
-                    Assert.assertEquals(new Variant(1.0), value.getValue());
-                    Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
-                    Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
-                    break;
-                } catch (final Throwable t) {
-                    if (System.currentTimeMillis() - startTime > 10_000L) {
-                        throw t;
-                    }
-                }
-            }
+      long startTime = System.currentTimeMillis();
+      while (true) {
+        try {
+          value =
+              opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc")).get();
+          Assert.assertEquals(new Variant(1.0), value.getValue());
+          Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
+          Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+          break;
+        } catch (final Throwable t) {
+          if (System.currentTimeMillis() - startTime > 10_000L) {
+            throw t;
+          }
+        }
+      }
 
-            TestUtils.executeNonQuery(
-                    env, "insert into root.db.opc(time, quality) values (2, 
true)", null);
-            TestUtils.executeNonQuery(env, "insert into root.db.opc(time, 
value) values (2, 2)", null);
+      TestUtils.executeNonQuery(
+          env, "insert into root.db.opc(time, quality) values (2, true)", 
null);
+      TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) 
values (2, 2)", null);
 
-            startTime = System.currentTimeMillis();
-            while (true) {
-                try {
-                    value =
-                            opcUaClient.readValue(0, TimestampsToReturn.Both, 
new NodeId(2, "root/db/opc")).get();
-                    Assert.assertEquals(new DateTime(timestampToUtc(2)), 
value.getSourceTime());
-                    Assert.assertEquals(new Variant(2.0), value.getValue());
-                    Assert.assertEquals(StatusCode.UNCERTAIN, 
value.getStatusCode());
-                    break;
-                } catch (final Throwable t) {
-                    if (System.currentTimeMillis() - startTime > 10_000L) {
-                        throw t;
-                    }
-                }
-            }
+      startTime = System.currentTimeMillis();
+      while (true) {
+        try {
+          value =
+              opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc")).get();
+          Assert.assertEquals(new DateTime(timestampToUtc(2)), 
value.getSourceTime());
+          Assert.assertEquals(new Variant(2.0), value.getValue());
+          Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
+          break;
+        } catch (final Throwable t) {
+          if (System.currentTimeMillis() - startTime > 10_000L) {
+            throw t;
+          }
+        }
+      }
 
-            opcUaClient.disconnect().get();
-            Assert.assertEquals(
-                    TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.dropPipe("testPipe").getCode());
+      opcUaClient.disconnect().get();
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.dropPipe("testPipe").getCode());
 
-            // Test reconstruction
-            sinkAttributes.put("password", "test");
-            sinkAttributes.put("security-policy", "basic256sha256");
-            Assert.assertEquals(
-                    TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-                    client
-                            .createPipe(
-                                    new TCreatePipeReq("testPipe", 
sinkAttributes)
-                                            
.setExtractorAttributes(Collections.emptyMap())
-                                            
.setProcessorAttributes(Collections.emptyMap()))
-                            .getCode());
+      // Test reconstruction
+      sinkAttributes.put("password", "test");
+      sinkAttributes.put("security-policy", "basic256sha256");
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client
+              .createPipe(
+                  new TCreatePipeReq("testPipe", sinkAttributes)
+                      .setExtractorAttributes(Collections.emptyMap())
+                      .setProcessorAttributes(Collections.emptyMap()))
+              .getCode());
 
-            // Banned none, only allows basic256sha256
-            final int finalTcpPort = tcpPort;
-            Assert.assertThrows(
-                    PipeException.class,
-                    () ->
-                            getOpcUaClient(
-                                    "opc.tcp://127.0.0.1:" + finalTcpPort + 
"/iotdb",
-                                    SecurityPolicy.None,
-                                    "root",
-                                    "root"));
+      // Banned none, only allows basic256sha256
+      final int finalTcpPort = tcpPort;
+      Assert.assertThrows(
+          PipeException.class,
+          () ->
+              getOpcUaClient(
+                  "opc.tcp://127.0.0.1:" + finalTcpPort + "/iotdb",
+                  SecurityPolicy.None,
+                  "root",
+                  "root"));
 
-            // Test conflict
-            sinkAttributes.put("password", "conflict");
-            try {
-                TestUtils.executeNonQuery(
-                        env,
-                        String.format(
-                                "create pipe test1 ('sink'='opc-ua-sink', 
'password'='conflict@pswd', 'tcp.port'='%s', 'https.port'='%s')",
-                                tcpPort, httpsPort),
-                        null);
-                Assert.fail();
-            } catch (final Exception e) {
-                Assert.assertEquals(
-                        String.format(
-                                "org.apache.iotdb.jdbc.IoTDBSQLException: 
1107: The existing server with tcp port %s and https port %s's password **** 
conflicts to the new password ****, reject reusing.",
-                                tcpPort, httpsPort),
-                        e.getMessage());
-            }
-        } finally {
-            if (tcpPort >= 0) {
-                final String lockPath = EnvUtils.getLockFilePath(tcpPort);
-                if (!new File(lockPath).delete()) {
-                    System.out.printf("Delete lock file %s failed%n", 
lockPath);
-                }
-            }
+      // Test conflict
+      sinkAttributes.put("password", "conflict");
+      try {
+        TestUtils.executeNonQuery(
+            env,
+            String.format(
+                "create pipe test1 ('sink'='opc-ua-sink', 
'password'='conflict@pswd', 'tcp.port'='%s', 'https.port'='%s')",
+                tcpPort, httpsPort),
+            null);
+        Assert.fail();
+      } catch (final Exception e) {
+        Assert.assertEquals(
+            String.format(
+                "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing 
server with tcp port %s and https port %s's password **** conflicts to the new 
password ****, reject reusing.",
+                tcpPort, httpsPort),
+            e.getMessage());
+      }
+    } finally {
+      if (tcpPort >= 0) {
+        final String lockPath = EnvUtils.getLockFilePath(tcpPort);
+        if (!new File(lockPath).delete()) {
+          System.out.printf("Delete lock file %s failed%n", lockPath);
         }
+      }
     }
+  }
 
   private static OpcUaClient getOpcUaClient(
       final String nodeUrl,

Reply via email to