This is an automated email from the ASF dual-hosted git repository. Wei-hao-Li pushed a commit to branch fix-time-partition in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0711930e2a320df64340c37f7b51e5c2e3ef0b3a Author: Weihao Li <[email protected]> AuthorDate: Wed May 13 09:55:40 2026 +0800 fix Signed-off-by: Weihao Li <[email protected]> --- .../aggregation/IoTDBAlignByDeviceWildcardIT.java | 94 ++++++++++++++++++++++ .../planner/distribution/ExchangeNodeAdder.java | 36 +++++++++ 2 files changed, 130 insertions(+) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAlignByDeviceWildcardIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAlignByDeviceWildcardIT.java new file mode 100644 index 00000000000..e2bb07f23be --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAlignByDeviceWildcardIT.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.aggregation; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareData; +import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class}) +public class IoTDBAlignByDeviceWildcardIT { + + private static final String[] SQL_LIST = + new String[] { + "CREATE DATABASE root.min", + "CREATE TIMESERIES root.min.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN", + "CREATE TIMESERIES root.min.d1.s2 WITH DATATYPE=INT32, ENCODING=PLAIN", + "CREATE TIMESERIES root.min.d2.s1 WITH DATATYPE=INT32, ENCODING=PLAIN", + "CREATE TIMESERIES root.min.d2.s2 WITH DATATYPE=INT32, ENCODING=PLAIN", + "INSERT INTO root.min.d1(time, s1, s2) VALUES(1, 1, 1)", + "INSERT INTO root.min.d1(time, s1, s2) VALUES(2, 1, 1)", + "FLUSH", + "INSERT INTO root.min.d2(time, s1, s2) VALUES(5, 1, 1)", + "FLUSH" + }; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDefaultDataRegionGroupNumPerDatabase(1) + .setTimePartitionInterval(1) + .setEnableSeqSpaceCompaction(false) + .setEnableUnseqSpaceCompaction(false) + .setEnableCrossSpaceCompaction(false); + EnvFactory.getEnv().initClusterEnvironment(); + prepareData(SQL_LIST); + } + + @AfterClass + public static void tearDown() throws Exception { + // EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testWildcardAlignByDeviceWithTimePartitionSplit() { + String sql = + "SELECT count(s1) FROM root.min.** " + + "WHERE s2 is not null and s1 is not null " + + "GROUP BY([1, 6), 1ms) ALIGN BY DEVICE"; + String[] expectedHeader = new String[] {"Time", "Device", "count(s1)"}; + String[] expectedRows = + new String[] { + "1,root.min.d1,1,", + "2,root.min.d1,1,", + "3,root.min.d1,0,", + "4,root.min.d1,0,", + "5,root.min.d1,0,", + "1,root.min.d2,0,", + "2,root.min.d2,0,", + "3,root.min.d2,0,", + "4,root.min.d2,0,", + "5,root.min.d2,1,", + }; + resultSetEqualTest(sql, expectedHeader, expectedRows); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java index 2c3e560159a..168ca0530a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java @@ -216,6 +216,11 @@ public class ExchangeNodeAdder implements PlanVisitor<PlanNode, NodeGroupContext @Override public PlanNode visitDeviceView(DeviceViewNode node, NodeGroupContext context) { + // May contain multiple child branches with different InnerTimeJoin timePartitions. + // Force Exchange for each branch to isolate them into different fragments/FIs. + if (hasDirectInnerTimeJoinChild(node.getChildren())) { + return processMultiChildNodeWithForcedExchange(node, context); + } return processMultiChildNode(node, context); } @@ -237,6 +242,11 @@ public class ExchangeNodeAdder implements PlanVisitor<PlanNode, NodeGroupContext @Override public PlanNode visitMergeSort(MergeSortNode node, NodeGroupContext context) { + // May contain multiple child branches with different InnerTimeJoin timePartitions. + // Force Exchange for each branch to isolate them into different fragments/FIs. + if (hasDirectInnerTimeJoinChild(node.getChildren())) { + return processMergeSortWithForcedExchange(node, context); + } return processMultiChildNode(node, context); } @@ -563,6 +573,32 @@ public class ExchangeNodeAdder implements PlanVisitor<PlanNode, NodeGroupContext return exchangeNode; } + private PlanNode processMultiChildNodeWithForcedExchange( + MultiChildProcessNode node, NodeGroupContext context) { + MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone(); + List<PlanNode> visitedChildren = + node.getChildren().stream() + .map(child -> visit(child, context)) + .collect(Collectors.toList()); + for (PlanNode child : visitedChildren) { + newNode.addChild(genExchangeNode(context, child)); + } + context.putNodeDistribution( + newNode.getPlanNodeId(), + new NodeDistribution( + NodeDistributionType.SAME_WITH_SOME_CHILD, context.getMostlyUsedDataRegion())); + return newNode; + } + + private boolean hasDirectInnerTimeJoinChild(List<PlanNode> children) { + return children.stream().anyMatch(child -> child instanceof InnerTimeJoinNode); + } + + private PlanNode processMergeSortWithForcedExchange( + MergeSortNode node, NodeGroupContext context) { + return processMultiChildNodeWithForcedExchange(node, context); + } + @Override public PlanNode visitSlidingWindowAggregation( SlidingWindowAggregationNode node, NodeGroupContext context) {
