This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3a714e7226d Fix flaky table aggregation partition IT (#17653)
3a714e7226d is described below
commit 3a714e7226d34a0e954f7dc1acaab77d3d647859
Author: shuwenwei <[email protected]>
AuthorDate: Wed May 13 14:04:19 2026 +0800
Fix flaky table aggregation partition IT (#17653)
---
...ableAggregationQueryWithNetworkPartitionIT.java | 60 ++++++++++++++++++++++
1 file changed, 60 insertions(+)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java
index bb00451ea8f..3e8bc96f1db 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java
@@ -38,11 +38,19 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData;
@@ -55,6 +63,7 @@ public class IoTDBTableAggregationQueryWithNetworkPartitionIT
{
private static final int testReplicationFactor = 3;
private static final long testTimePartitionInterval = 604800000;
private static final int testDataRegionGroupPerDatabase = 4;
+ private static final String[] TARGET_TIME_PARTITIONS = new String[] {"0",
"-1"};
protected static final String DATABASE_NAME = "test";
protected static final String[] createSqls =
new String[] {
@@ -80,6 +89,57 @@ public class
IoTDBTableAggregationQueryWithNetworkPartitionIT {
.setEnableTopologyProbing(true);
EnvFactory.getEnv().initClusterEnvironment(1, 3);
prepareTableData(createSqls);
+ waitTsFilesOnDataNodes();
+ }
+
+ private static void waitTsFilesOnDataNodes()
+ throws IoTDBConnectionException,
+ StatementExecutionException,
+ IOException,
+ InterruptedException {
+ for (int i = 0; i < 30; i++) {
+ boolean allReady = true;
+ for (DataNodeWrapper dataNode :
EnvFactory.getEnv().getDataNodeWrapperList()) {
+ if (!hasTsFilesInTimePartitions(dataNode, DATABASE_NAME,
TARGET_TIME_PARTITIONS)) {
+ allReady = false;
+ break;
+ }
+ }
+ if (allReady) {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+ try (ITableSession session =
+
EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE_NAME)) {
+ session.executeNonQueryStatement("flush");
+ }
+ return;
+ }
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+ }
+ Assert.fail("Data is not synchronized to all DataNodes");
+ }
+
+ private static boolean hasTsFilesInTimePartitions(
+ DataNodeWrapper dataNode, String database, String[] timePartitions)
throws IOException {
+ Path dataDir = Paths.get(dataNode.getDataNodeDir(), "data");
+ if (!Files.exists(dataDir)) {
+ return false;
+ }
+
+ Set<String> existingTimePartitions = new HashSet<>();
+ try (Stream<Path> paths = Files.walk(dataDir)) {
+ paths
+ .filter(path -> path.getFileName().toString().endsWith(".tsfile"))
+ .filter(path -> path.toString().contains(File.separator + database +
File.separator))
+ .filter(path -> path.getParent() != null)
+ .forEach(path ->
existingTimePartitions.add(path.getParent().getFileName().toString()));
+ }
+
+ for (String timePartition : timePartitions) {
+ if (!existingTimePartitions.contains(timePartition)) {
+ return false;
+ }
+ }
+ return true;
}
@AfterClass