This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new eea8a8c2e62 [To dev/1.3] Pipe CI: always flush for
`IoTDBPipeSinkCompressionIT` with batch mode (#16247) (#16269)
eea8a8c2e62 is described below
commit eea8a8c2e621fac47b67714e447468a818c693dc
Author: VGalaxies <[email protected]>
AuthorDate: Tue Aug 26 17:18:36 2025 +0800
[To dev/1.3] Pipe CI: always flush for `IoTDBPipeSinkCompressionIT` with
batch mode (#16247) (#16269)
* Pipe CI: always flush for `IoTDBPipeSinkCompressionIT` with batch mode
(#16247)
* setup
* more fix
* fixup
* fixup
---
.../it/autocreate/IoTDBPipeSinkCompressionIT.java | 25 +++++++++++++++++++---
1 file changed, 22 insertions(+), 3 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java
index 59cfa4321e4..7c576696c63 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java
@@ -46,6 +46,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import static org.junit.Assert.fail;
@@ -129,6 +130,12 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualAutoIT {
? receiverDataNode.getPipeAirGapReceiverPort()
: receiverDataNode.getPort();
+ final Consumer<String> handleFailure =
+ o -> {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+ };
+
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
@@ -170,7 +177,8 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualAutoIT {
receiverEnv,
"select count(*) from root.**",
"count(root.db.d1.s1),",
- Collections.singleton("2,"));
+ Collections.singleton("2,"),
+ handleFailure);
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
@@ -189,7 +197,8 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualAutoIT {
receiverEnv,
"select count(*) from root.**",
"count(root.db.d1.s1),",
- Collections.singleton("8,"));
+ Collections.singleton("8,"),
+ handleFailure);
}
}
@@ -200,6 +209,12 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualAutoIT {
final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
+ final Consumer<String> handleFailure =
+ o -> {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+ };
+
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
@@ -307,7 +322,11 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualAutoIT {
Assert.assertEquals(3, showPipeResult.size());
TestUtils.assertDataEventuallyOnEnv(
- receiverEnv, "count timeseries", "count(timeseries),",
Collections.singleton("3,"));
+ receiverEnv,
+ "count timeseries",
+ "count(timeseries),",
+ Collections.singleton("3,"),
+ handleFailure);
}
}
}