This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch pipe-sep
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-sep by this push:
     new bdf53c15c42 may-comp
bdf53c15c42 is described below

commit bdf53c15c427de77df3ec964efc8bf096c5f2b52
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 13 16:21:26 2026 +0800

    may-comp
---
 .../manual/enhanced/IoTDBPipeAutoDropIT.java       | 95 +++++++++-------------
 1 file changed, 40 insertions(+), 55 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
index 6ab974c2f1e..6902542d882 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced;
+import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
 import 
org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -40,6 +41,7 @@ import org.junit.runner.RunWith;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
 import java.util.HashMap;
@@ -49,6 +51,7 @@ import java.util.function.Consumer;
 
 import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
 import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2DualTableManualEnhanced.class})
@@ -70,48 +73,30 @@ public class IoTDBPipeAutoDropIT extends 
AbstractPipeTableModelDualManualIT {
           TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
         };
 
-    final String receiverIp = receiverDataNode.getIp();
-    final int receiverPort = receiverDataNode.getPort();
-
-    try (final SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-
-      TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
-      TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
-
-      final Map<String, String> extractorAttributes = new HashMap<>();
-      final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
-
-      extractorAttributes.put("mode.snapshot", "true");
-      extractorAttributes.put("capture.table", "true");
-      extractorAttributes.put("user", "root");
-
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
-
-      final TSStatus status =
-          client.createPipe(
-              new TCreatePipeReq("p1", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
-                  .setProcessorAttributes(processorAttributes));
+    // Create an ordinary full sync pipe
+    // The database & table name will be converted to lower case
+    final String sql =
+        String.format("create pipe a2b ('node-urls'='%s')", 
receiverDataNode.getIpAndPortString());
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(sql);
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
 
-      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
+    TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
+    TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
 
-      TestUtils.assertDataEventuallyOnEnv(
-          receiverEnv,
-          TableModelUtils.getQueryCountSql("test"),
-          "_col0,",
-          Collections.singleton("100,"),
-          "test",
-          handleFailure);
-    }
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        TableModelUtils.getQueryCountSql("test"),
+        "_col0,",
+        Collections.singleton("100,"),
+        "test",
+        handleFailure);
 
-    try (final Connection connection = 
makeItCloseQuietly(senderEnv.getConnection());
+    try (final Connection connection =
+            
makeItCloseQuietly(senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT));
         final Statement statement = 
makeItCloseQuietly(connection.createStatement()); ) {
       ResultSet result = statement.executeQuery("show pipes");
       await()
@@ -124,9 +109,9 @@ public class IoTDBPipeAutoDropIT extends 
AbstractPipeTableModelDualManualIT {
                 try {
                   int pipeNum = 0;
                   while (result.next()) {
-                    if (!result
-                        .getString(ColumnHeaderConstant.ID)
-                        .contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
+                    final String pipeName = 
result.getString(ColumnHeaderConstant.ID);
+                    if 
(!pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)
+                        && pipeName.endsWith("_history")) {
                       pipeNum++;
                     }
                   }
@@ -157,25 +142,25 @@ public class IoTDBPipeAutoDropIT extends 
AbstractPipeTableModelDualManualIT {
       TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
       TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
 
-      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> sourceAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
 
-      extractorAttributes.put("mode.snapshot", "true");
-      extractorAttributes.put("capture.table", "true");
-      extractorAttributes.put("start-time", "0");
-      extractorAttributes.put("end-time", "49");
-      extractorAttributes.put("user", "root");
+      sourceAttributes.put("mode.snapshot", "true");
+      sourceAttributes.put("capture.table", "true");
+      sourceAttributes.put("start-time", "0");
+      sourceAttributes.put("end-time", "49");
+      sourceAttributes.put("user", "root");
 
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("sink.batch.enable", "false");
+      sinkAttributes.put("sink.ip", receiverIp);
+      sinkAttributes.put("sink.port", Integer.toString(receiverPort));
 
       final TSStatus status =
           client.createPipe(
-              new TCreatePipeReq("p1", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
+              new TCreatePipeReq("p1", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
                   .setProcessorAttributes(processorAttributes));
 
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());

Reply via email to