This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 556cd6628b5 Fix multi InnerTimeJoinNode with different TimePartitions 
in one FI (#17652)
556cd6628b5 is described below

commit 556cd6628b5e39adfa56959a97099da1a8228572
Author: Weihao Li <[email protected]>
AuthorDate: Thu May 14 14:02:03 2026 +0800

    Fix multi InnerTimeJoinNode with different TimePartitions in one FI (#17652)
---
 .../aggregation/IoTDBAlignByDeviceWildcardIT.java  | 94 ++++++++++++++++++++++
 .../planner/distribution/DistributionPlanner.java  | 24 ++++--
 .../planner/distribution/ExchangeNodeAdder.java    | 83 +++++++++++++++----
 .../planner/plan/node/process/ExchangeNode.java    | 18 ++++-
 4 files changed, 195 insertions(+), 24 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAlignByDeviceWildcardIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAlignByDeviceWildcardIT.java
new file mode 100644
index 00000000000..e2bb07f23be
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAlignByDeviceWildcardIT.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBAlignByDeviceWildcardIT {
+
+  private static final String[] SQL_LIST =
+      new String[] {
+        "CREATE DATABASE root.min",
+        "CREATE TIMESERIES root.min.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.min.d1.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.min.d2.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.min.d2.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "INSERT INTO root.min.d1(time, s1, s2) VALUES(1, 1, 1)",
+        "INSERT INTO root.min.d1(time, s1, s2) VALUES(2, 1, 1)",
+        "FLUSH",
+        "INSERT INTO root.min.d2(time, s1, s2) VALUES(5, 1, 1)",
+        "FLUSH"
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setDefaultDataRegionGroupNumPerDatabase(1)
+        .setTimePartitionInterval(1)
+        .setEnableSeqSpaceCompaction(false)
+        .setEnableUnseqSpaceCompaction(false)
+        .setEnableCrossSpaceCompaction(false);
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareData(SQL_LIST);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    // EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testWildcardAlignByDeviceWithTimePartitionSplit() {
+    String sql =
+        "SELECT count(s1) FROM root.min.** "
+            + "WHERE s2 is not null and s1 is not null "
+            + "GROUP BY([1, 6), 1ms) ALIGN BY DEVICE";
+    String[] expectedHeader = new String[] {"Time", "Device", "count(s1)"};
+    String[] expectedRows =
+        new String[] {
+          "1,root.min.d1,1,",
+          "2,root.min.d1,1,",
+          "3,root.min.d1,0,",
+          "4,root.min.d1,0,",
+          "5,root.min.d1,0,",
+          "1,root.min.d2,0,",
+          "2,root.min.d2,0,",
+          "3,root.min.d2,0,",
+          "4,root.min.d2,0,",
+          "5,root.min.d2,1,",
+        };
+    resultSetEqualTest(sql, expectedHeader, expectedRows);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index 078c2c3e846..317d293b504 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@ -145,13 +145,23 @@ public class DistributionPlanner {
         ExchangeNode exchangeNode = (ExchangeNode) child;
         TRegionReplicaSet regionOfChild =
             
context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).getRegion();
-        MultiChildrenSinkNode newChild =
-            memo.computeIfAbsent(
-                regionOfChild,
-                tRegionReplicaSet ->
-                    needShuffleSinkNode
-                        ? new 
ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
-                        : new 
IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId()));
+        MultiChildrenSinkNode newChild;
+        if (exchangeNode.isForcedExchange()) {
+          // Keep forced exchange branch isolated: do not merge into shared 
sink memo.
+          newChild =
+              needShuffleSinkNode
+                  ? new 
ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
+                  : new 
IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId());
+        } else {
+          newChild =
+              memo.computeIfAbsent(
+                  regionOfChild,
+                  tRegionReplicaSet ->
+                      needShuffleSinkNode
+                          ? new 
ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
+                          : new IdentitySinkNode(
+                              
context.queryContext.getQueryId().genPlanNodeId()));
+        }
         newChild.addChild(exchangeNode.getChild());
         newChild.addDownStreamChannelLocation(
             new 
DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 2c3e560159a..db21b6a7af2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -70,17 +70,17 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.RegionScanN
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static com.google.common.collect.ImmutableList.toImmutableList;
-
 public class ExchangeNodeAdder implements PlanVisitor<PlanNode, 
NodeGroupContext> {
 
   private final Analysis analysis;
+  private boolean containsInnerTimeJoinInCurrentSubtree = false;
 
   public ExchangeNodeAdder(Analysis analysis) {
     this.analysis = analysis;
@@ -93,10 +93,7 @@ public class ExchangeNodeAdder implements 
PlanVisitor<PlanNode, NodeGroupContext
       return node;
     }
     // Visit all the children of current node
-    List<PlanNode> children =
-        node.getChildren().stream()
-            .map(child -> child.accept(this, context))
-            .collect(toImmutableList());
+    List<PlanNode> children = 
visitChildrenAndRecordInnerTimeJoin(node.getChildren(), context);
 
     // Calculate the node distribution info according to its children
 
@@ -216,7 +213,13 @@ public class ExchangeNodeAdder implements 
PlanVisitor<PlanNode, NodeGroupContext
 
   @Override
   public PlanNode visitDeviceView(DeviceViewNode node, NodeGroupContext 
context) {
-    return processMultiChildNode(node, context);
+    List<PlanNode> visitedChildren =
+        visitChildrenAndRecordInnerTimeJoin(node.getChildren(), context);
+    // Force Exchange for multi-child DeviceView if any child subtree contains 
InnerTimeJoin.
+    if (node.getChildren().size() > 1 && 
containsInnerTimeJoinInCurrentSubtree) {
+      return processMultiChildNodeWithForcedExchange(node, context, 
visitedChildren);
+    }
+    return processMultiChildNode(node, context, visitedChildren);
   }
 
   @Override
@@ -237,7 +240,13 @@ public class ExchangeNodeAdder implements 
PlanVisitor<PlanNode, NodeGroupContext
 
   @Override
   public PlanNode visitMergeSort(MergeSortNode node, NodeGroupContext context) 
{
-    return processMultiChildNode(node, context);
+    List<PlanNode> visitedChildren =
+        visitChildrenAndRecordInnerTimeJoin(node.getChildren(), context);
+    // Force Exchange if any child subtree contains InnerTimeJoin.
+    if (containsInnerTimeJoinInCurrentSubtree) {
+      return processMultiChildNodeWithForcedExchange(node, context, 
visitedChildren);
+    }
+    return processMultiChildNode(node, context, visitedChildren);
   }
 
   @Override
@@ -438,11 +447,14 @@ public class ExchangeNodeAdder implements 
PlanVisitor<PlanNode, NodeGroupContext
       return processMultiChildNodeByLocation(node, context);
     }
 
-    MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
     List<PlanNode> visitedChildren =
-        node.getChildren().stream()
-            .map(child -> visit(child, context))
-            .collect(Collectors.toList());
+        visitChildrenAndRecordInnerTimeJoin(node.getChildren(), context);
+    return processMultiChildNode(node, context, visitedChildren);
+  }
+
+  private PlanNode processMultiChildNode(
+      MultiChildProcessNode node, NodeGroupContext context, List<PlanNode> 
visitedChildren) {
+    MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
 
     // DataRegion in which node locates
     TRegionReplicaSet dataRegion;
@@ -556,13 +568,47 @@ public class ExchangeNodeAdder implements 
PlanVisitor<PlanNode, NodeGroupContext
   }
 
   private ExchangeNode genExchangeNode(NodeGroupContext context, PlanNode 
child) {
+    return genExchangeNode(context, child, false);
+  }
+
+  private ExchangeNode genExchangeNode(
+      NodeGroupContext context, PlanNode child, boolean forcedExchange) {
     ExchangeNode exchangeNode = new 
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
     exchangeNode.setChild(child);
     exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
+    exchangeNode.setForcedExchange(forcedExchange);
     context.hasExchangeNode = true;
     return exchangeNode;
   }
 
+  private PlanNode processMultiChildNodeWithForcedExchange(
+      MultiChildProcessNode node, NodeGroupContext context, List<PlanNode> 
visitedChildren) {
+    MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
+    for (PlanNode child : visitedChildren) {
+      newNode.addChild(genExchangeNode(context, child, true));
+    }
+    context.putNodeDistribution(
+        newNode.getPlanNodeId(),
+        new NodeDistribution(
+            NodeDistributionType.SAME_WITH_SOME_CHILD, 
context.getMostlyUsedDataRegion()));
+    return newNode;
+  }
+
+  private List<PlanNode> visitChildrenAndRecordInnerTimeJoin(
+      List<PlanNode> children, NodeGroupContext context) {
+    List<PlanNode> result = new ArrayList<>(children.size());
+    boolean originalTimeJoin = containsInnerTimeJoinInCurrentSubtree;
+    boolean hasInnerTimeJoinInChildren = false;
+    for (PlanNode child : children) {
+      containsInnerTimeJoinInCurrentSubtree = false;
+      PlanNode visitedChild = visit(child, context);
+      hasInnerTimeJoinInChildren |= containsInnerTimeJoinInCurrentSubtree;
+      result.add(visitedChild);
+    }
+    containsInnerTimeJoinInCurrentSubtree = originalTimeJoin || 
hasInnerTimeJoinInChildren;
+    return result;
+  }
+
   @Override
   public PlanNode visitSlidingWindowAggregation(
       SlidingWindowAggregationNode node, NodeGroupContext context) {
@@ -596,9 +642,9 @@ public class ExchangeNodeAdder implements 
PlanVisitor<PlanNode, NodeGroupContext
                       if (region == null
                           && 
context.getNodeDistribution(child.getPlanNodeId()).getType()
                               == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
-                        return 
calculateSchemaRegionByChildren(child.getChildren(), context);
+                        region = 
calculateSchemaRegionByChildren(child.getChildren(), context);
                       }
-                      return region;
+                      return region == null ? DataPartition.NOT_ASSIGNED : 
region;
                     },
                     Collectors.counting()));
 
@@ -656,6 +702,13 @@ public class ExchangeNodeAdder implements 
PlanVisitor<PlanNode, NodeGroupContext
   }
 
   public PlanNode visit(PlanNode node, NodeGroupContext context) {
-    return node.accept(this, context);
+    boolean originalTimeJoin = containsInnerTimeJoinInCurrentSubtree;
+    containsInnerTimeJoinInCurrentSubtree = false;
+    PlanNode visitedNode = node.accept(this, context);
+    containsInnerTimeJoinInCurrentSubtree =
+        originalTimeJoin
+            || containsInnerTimeJoinInCurrentSubtree
+            || node instanceof InnerTimeJoinNode;
+    return visitedNode;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/ExchangeNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/ExchangeNode.java
index 084141e90a7..0902f52995c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/ExchangeNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/ExchangeNode.java
@@ -51,6 +51,9 @@ public class ExchangeNode extends SingleChildProcessNode {
   /** Exchange needs to know which child of IdentitySinkNode/ShuffleSinkNode 
it matches */
   private int indexOfUpstreamSinkHandle = 0;
 
+  /** Planner-only flag: this exchange is forced and should keep independent 
upstream sink. */
+  private transient boolean forcedExchange = false;
+
   public ExchangeNode(PlanNodeId id) {
     super(id);
   }
@@ -75,6 +78,7 @@ public class ExchangeNode extends SingleChildProcessNode {
     ExchangeNode node = new ExchangeNode(getPlanNodeId());
     node.setOutputColumnNames(outputColumnNames);
     node.setIndexOfUpstreamSinkHandle(indexOfUpstreamSinkHandle);
+    node.setForcedExchange(forcedExchange);
     return node;
   }
 
@@ -171,6 +175,14 @@ public class ExchangeNode extends SingleChildProcessNode {
     this.indexOfUpstreamSinkHandle = indexOfUpstreamSinkHandle;
   }
 
+  public boolean isForcedExchange() {
+    return forcedExchange;
+  }
+
+  public void setForcedExchange(boolean forcedExchange) {
+    this.forcedExchange = forcedExchange;
+  }
+
   public TEndPoint getUpstreamEndpoint() {
     return upstreamEndpoint;
   }
@@ -197,11 +209,13 @@ public class ExchangeNode extends SingleChildProcessNode {
     ExchangeNode that = (ExchangeNode) o;
     return Objects.equals(upstreamEndpoint, that.upstreamEndpoint)
         && Objects.equals(upstreamInstanceId, that.upstreamInstanceId)
-        && Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId);
+        && Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId)
+        && forcedExchange == that.forcedExchange;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), upstreamEndpoint, 
upstreamInstanceId, upstreamPlanNodeId);
+    return Objects.hash(
+        super.hashCode(), upstreamEndpoint, upstreamInstanceId, 
upstreamPlanNodeId, forcedExchange);
   }
 }

Reply via email to