This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch pipe-sep
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-sep by this push:
new bdf53c15c42 may-comp
bdf53c15c42 is described below
commit bdf53c15c427de77df3ec964efc8bf096c5f2b52
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 13 16:21:26 2026 +0800
may-comp
---
.../manual/enhanced/IoTDBPipeAutoDropIT.java | 95 +++++++++-------------
1 file changed, 40 insertions(+), 55 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
index 6ab974c2f1e..6902542d882 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced;
+import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
import
org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -40,6 +41,7 @@ import org.junit.runner.RunWith;
import java.sql.Connection;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashMap;
@@ -49,6 +51,7 @@ import java.util.function.Consumer;
import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2DualTableManualEnhanced.class})
@@ -70,48 +73,30 @@ public class IoTDBPipeAutoDropIT extends
AbstractPipeTableModelDualManualIT {
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
};
- final String receiverIp = receiverDataNode.getIp();
- final int receiverPort = receiverDataNode.getPort();
-
- try (final SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
-
- TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
- TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
-
- final Map<String, String> extractorAttributes = new HashMap<>();
- final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
-
- extractorAttributes.put("mode.snapshot", "true");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("user", "root");
-
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
-
- final TSStatus status =
- client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
- .setProcessorAttributes(processorAttributes));
+ // Create an ordinary full sync pipe
+ // The database & table name will be converted to lower case
+ final String sql =
+ String.format("create pipe a2b ('node-urls'='%s')",
receiverDataNode.getIpAndPortString());
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+ TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
+ TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
- TestUtils.assertDataEventuallyOnEnv(
- receiverEnv,
- TableModelUtils.getQueryCountSql("test"),
- "_col0,",
- Collections.singleton("100,"),
- "test",
- handleFailure);
- }
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ TableModelUtils.getQueryCountSql("test"),
+ "_col0,",
+ Collections.singleton("100,"),
+ "test",
+ handleFailure);
- try (final Connection connection =
makeItCloseQuietly(senderEnv.getConnection());
+ try (final Connection connection =
+
makeItCloseQuietly(senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT));
final Statement statement =
makeItCloseQuietly(connection.createStatement()); ) {
ResultSet result = statement.executeQuery("show pipes");
await()
@@ -124,9 +109,9 @@ public class IoTDBPipeAutoDropIT extends
AbstractPipeTableModelDualManualIT {
try {
int pipeNum = 0;
while (result.next()) {
- if (!result
- .getString(ColumnHeaderConstant.ID)
- .contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
+ final String pipeName =
result.getString(ColumnHeaderConstant.ID);
+ if
(!pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)
+ && pipeName.endsWith("_history")) {
pipeNum++;
}
}
@@ -157,25 +142,25 @@ public class IoTDBPipeAutoDropIT extends
AbstractPipeTableModelDualManualIT {
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
- final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
- extractorAttributes.put("mode.snapshot", "true");
- extractorAttributes.put("capture.table", "true");
- extractorAttributes.put("start-time", "0");
- extractorAttributes.put("end-time", "49");
- extractorAttributes.put("user", "root");
+ sourceAttributes.put("mode.snapshot", "true");
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("start-time", "0");
+ sourceAttributes.put("end-time", "49");
+ sourceAttributes.put("user", "root");
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("sink.batch.enable", "false");
+ sinkAttributes.put("sink.ip", receiverIp);
+ sinkAttributes.put("sink.port", Integer.toString(receiverPort));
final TSStatus status =
client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
+ new TCreatePipeReq("p1", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());