This is an automated email from the ASF dual-hosted git repository.

Wei-hao-Li pushed a commit to branch fix-time-partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0711930e2a320df64340c37f7b51e5c2e3ef0b3a
Author: Weihao Li <[email protected]>
AuthorDate: Wed May 13 09:55:40 2026 +0800

    fix
    
    Signed-off-by: Weihao Li <[email protected]>
---
 .../aggregation/IoTDBAlignByDeviceWildcardIT.java  | 94 ++++++++++++++++++++++
 .../planner/distribution/ExchangeNodeAdder.java    | 36 +++++++++
 2 files changed, 130 insertions(+)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAlignByDeviceWildcardIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAlignByDeviceWildcardIT.java
new file mode 100644
index 00000000000..e2bb07f23be
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAlignByDeviceWildcardIT.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBAlignByDeviceWildcardIT {
+
+  private static final String[] SQL_LIST =
+      new String[] {
+        "CREATE DATABASE root.min",
+        "CREATE TIMESERIES root.min.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.min.d1.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.min.d2.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.min.d2.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "INSERT INTO root.min.d1(time, s1, s2) VALUES(1, 1, 1)",
+        "INSERT INTO root.min.d1(time, s1, s2) VALUES(2, 1, 1)",
+        "FLUSH",
+        "INSERT INTO root.min.d2(time, s1, s2) VALUES(5, 1, 1)",
+        "FLUSH"
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setDefaultDataRegionGroupNumPerDatabase(1)
+        .setTimePartitionInterval(1)
+        .setEnableSeqSpaceCompaction(false)
+        .setEnableUnseqSpaceCompaction(false)
+        .setEnableCrossSpaceCompaction(false);
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareData(SQL_LIST);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    // EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testWildcardAlignByDeviceWithTimePartitionSplit() {
+    String sql =
+        "SELECT count(s1) FROM root.min.** "
+            + "WHERE s2 is not null and s1 is not null "
+            + "GROUP BY([1, 6), 1ms) ALIGN BY DEVICE";
+    String[] expectedHeader = new String[] {"Time", "Device", "count(s1)"};
+    String[] expectedRows =
+        new String[] {
+          "1,root.min.d1,1,",
+          "2,root.min.d1,1,",
+          "3,root.min.d1,0,",
+          "4,root.min.d1,0,",
+          "5,root.min.d1,0,",
+          "1,root.min.d2,0,",
+          "2,root.min.d2,0,",
+          "3,root.min.d2,0,",
+          "4,root.min.d2,0,",
+          "5,root.min.d2,1,",
+        };
+    resultSetEqualTest(sql, expectedHeader, expectedRows);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 2c3e560159a..168ca0530a3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -216,6 +216,11 @@ public class ExchangeNodeAdder implements 
PlanVisitor<PlanNode, NodeGroupContext
 
   @Override
   public PlanNode visitDeviceView(DeviceViewNode node, NodeGroupContext 
context) {
+    // May contain multiple child branches with different InnerTimeJoin 
timePartitions.
+    // Force Exchange for each branch to isolate them into different 
fragments/FIs.
+    if (hasDirectInnerTimeJoinChild(node.getChildren())) {
+      return processMultiChildNodeWithForcedExchange(node, context);
+    }
     return processMultiChildNode(node, context);
   }
 
@@ -237,6 +242,11 @@ public class ExchangeNodeAdder implements 
PlanVisitor<PlanNode, NodeGroupContext
 
   @Override
   public PlanNode visitMergeSort(MergeSortNode node, NodeGroupContext context) 
{
+    // May contain multiple child branches with different InnerTimeJoin 
timePartitions.
+    // Force Exchange for each branch to isolate them into different 
fragments/FIs.
+    if (hasDirectInnerTimeJoinChild(node.getChildren())) {
+      return processMergeSortWithForcedExchange(node, context);
+    }
     return processMultiChildNode(node, context);
   }
 
@@ -563,6 +573,32 @@ public class ExchangeNodeAdder implements 
PlanVisitor<PlanNode, NodeGroupContext
     return exchangeNode;
   }
 
+  private PlanNode processMultiChildNodeWithForcedExchange(
+      MultiChildProcessNode node, NodeGroupContext context) {
+    MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
+    List<PlanNode> visitedChildren =
+        node.getChildren().stream()
+            .map(child -> visit(child, context))
+            .collect(Collectors.toList());
+    for (PlanNode child : visitedChildren) {
+      newNode.addChild(genExchangeNode(context, child));
+    }
+    context.putNodeDistribution(
+        newNode.getPlanNodeId(),
+        new NodeDistribution(
+            NodeDistributionType.SAME_WITH_SOME_CHILD, 
context.getMostlyUsedDataRegion()));
+    return newNode;
+  }
+
+  private boolean hasDirectInnerTimeJoinChild(List<PlanNode> children) {
+    return children.stream().anyMatch(child -> child instanceof 
InnerTimeJoinNode);
+  }
+
+  private PlanNode processMergeSortWithForcedExchange(
+      MergeSortNode node, NodeGroupContext context) {
+    return processMultiChildNodeWithForcedExchange(node, context);
+  }
+
   @Override
   public PlanNode visitSlidingWindowAggregation(
       SlidingWindowAggregationNode node, NodeGroupContext context) {

Reply via email to