This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1d5c8e298b6 Pipe CI: always flush for `IoTDBPipeSinkCompressionIT`
with batch mode (#16247)
1d5c8e298b6 is described below
commit 1d5c8e298b67d3136e51c1d606764137250c9dba
Author: VGalaxies <[email protected]>
AuthorDate: Tue Aug 26 14:46:45 2025 +0800
Pipe CI: always flush for `IoTDBPipeSinkCompressionIT` with batch mode
(#16247)
* setup
* more fix
---
.../auto/enhanced/IoTDBPipeSinkCompressionIT.java | 22 +++++++++++++++++++---
1 file changed, 19 insertions(+), 3 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
index 7e508c4b3eb..1ac37a6c768 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -48,6 +48,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import static org.junit.Assert.fail;
@@ -131,6 +132,12 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
? receiverDataNode.getPipeAirGapReceiverPort()
: receiverDataNode.getPort();
+ final Consumer<String> handleFailure =
+ o -> {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+ };
+
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
@@ -173,7 +180,8 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
receiverEnv,
"select count(*) from root.db.**",
"count(root.db.d1.s1),",
- Collections.singleton("2,"));
+ Collections.singleton("2,"),
+ handleFailure);
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
@@ -193,7 +201,8 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
receiverEnv,
"select count(*) from root.db.**",
"count(root.db.d1.s1),",
- Collections.singleton("8,"));
+ Collections.singleton("8,"),
+ handleFailure);
}
}
@@ -204,6 +213,12 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
+ final Consumer<String> handleFailure =
+ o -> {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+ };
+
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
@@ -320,7 +335,8 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
receiverEnv,
"count timeseries root.db.**",
"count(timeseries),",
- Collections.singleton("3,"));
+ Collections.singleton("3,"),
+ handleFailure);
}
}
}