This is an automated email from the ASF dual-hosted git repository. weihao pushed a commit to branch optimizeLast in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bbc91a12b7831c445687e6d91d738f6bd5aa8091 Author: Weihao Li <[email protected]> AuthorDate: Wed Mar 4 14:35:24 2026 +0800 FE & UT Signed-off-by: Weihao Li <[email protected]> --- .../distribute/TableDistributedPlanGenerator.java | 299 +++++++++++++++++---- .../AlignedAggregationTreeDeviceViewScanNode.java | 113 ++++++++ ...onAlignedAggregationTreeDeviceViewScanNode.java | 113 ++++++++ .../PushAggregationIntoTableScan.java | 4 - .../plan/relational/analyzer/TreeViewTest.java | 82 +++++- .../planner/assertions/PlanMatchPattern.java | 34 +++ 6 files changed, 579 insertions(+), 66 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 7072b5f519f..e6bbf92b4a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -57,6 +57,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; @@ -72,6 +73,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; @@ -1188,70 +1190,19 @@ public class TableDistributedPlanGenerator @Override public List<PlanNode> visitAggregationTableScan( AggregationTableScanNode node, PlanContext context) { - String dbName = - node instanceof AggregationTreeDeviceViewScanNode - ? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName() - : node.getQualifiedObjectName().getDatabaseName(); + String dbName = node.getQualifiedObjectName().getDatabaseName(); DataPartition dataPartition = analysis.getDataPartitionInfo(); if (dbName == null || dataPartition == null) { node.setRegionReplicaSet(NOT_ASSIGNED); return Collections.singletonList(node); } - boolean needSplit = false; - List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>(); - if (dataPartition != null) { - Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> seriesSlotMap = - dataPartition.getDataPartitionMap().get(dbName); - if (seriesSlotMap == null) { - throw new SemanticException( - String.format("Given queried database: %s is not exist!", dbName)); - } - Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new HashMap<>(); - for (DeviceEntry deviceEntry : node.getDeviceEntries()) { - List<TRegionReplicaSet> regionReplicaSets = - getDeviceReplicaSets( - dataPartition, - seriesSlotMap, - deviceEntry.getDeviceID(), - node.getTimeFilter(), - cachedSeriesSlotWithRegions); - if (regionReplicaSets.size() > 1) { - needSplit = true; - context.deviceCrossRegion = true; - queryContext.setNeedUpdateScanNumForLastQuery(node.mayUseLastCache()); - } - regionReplicaSetsList.add(regionReplicaSets); - } - } - - if (regionReplicaSetsList.isEmpty()) { - regionReplicaSetsList = Collections.singletonList(Collections.singletonList(NOT_ASSIGNED)); - } + AggregationDistributionInfo distributionInfo = + prepareAggregationDistribution(node, dbName, dataPartition, context); Map<TRegionReplicaSet, AggregationTableScanNode> regionNodeMap = new HashMap<>(); - // Step is SINGLE and device data in more than one region, we need to final aggregate the result - // from different region here, so split - // this node into two-stage - needSplit = needSplit && node.getStep() == SINGLE; - AggregationNode finalAggregation = null; - if (needSplit) { - Pair<AggregationNode, AggregationTableScanNode> splitResult = - split(node, symbolAllocator, queryId); - finalAggregation = splitResult.left; - AggregationTableScanNode partialAggregation = splitResult.right; - - // cover case: complete push-down + group by + streamable - if (!context.hasSortProperty && finalAggregation.isStreamable()) { - OrderingScheme expectedOrderingSchema = - constructOrderingSchema(node.getPreGroupedSymbols()); - context.setExpectedOrderingScheme(expectedOrderingSchema); - } - - buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, partialAggregation); - } else { - buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, node); - } + buildRegionNodeMap( + node, distributionInfo.regionReplicaSetsList, regionNodeMap, distributionInfo.templateNode); List<PlanNode> resultTableScanNodeList = new ArrayList<>(); TRegionReplicaSet mostUsedDataRegion = null; @@ -1276,6 +1227,165 @@ public class TableDistributedPlanGenerator processSortProperty(node, resultTableScanNodeList, context); } + if (distributionInfo.needSplit) { + AggregationNode finalAggregation = distributionInfo.finalAggregation; + if (resultTableScanNodeList.size() == 1) { + finalAggregation.setChild(resultTableScanNodeList.get(0)); + } else if (resultTableScanNodeList.size() > 1) { + OrderingScheme childOrdering = + nodeOrderingMap.get(resultTableScanNodeList.get(0).getPlanNodeId()); + finalAggregation.setChild( + mergeChildrenViaCollectOrMergeSort(childOrdering, resultTableScanNodeList)); + } else { + throw new IllegalStateException("List<PlanNode>.size should >= 1, but now is 0"); + } + resultTableScanNodeList = Collections.singletonList(finalAggregation); + } + + return resultTableScanNodeList; + } + + @Override + public List<PlanNode> visitAggregationTreeDeviceViewScan( + AggregationTreeDeviceViewScanNode node, PlanContext context) { + String dbName = node.getTreeDBName(); + DataPartition dataPartition = analysis.getDataPartitionInfo(); + if (dbName == null || dataPartition == null) { + node.setRegionReplicaSet(NOT_ASSIGNED); + return Collections.singletonList(node); + } + + AggregationDistributionInfo distributionInfo = + prepareAggregationDistribution(node, dbName, dataPartition, context); + + List<List<TRegionReplicaSet>> regionReplicaSetsList = distributionInfo.regionReplicaSetsList; + AggregationTableScanNode templateNode = distributionInfo.templateNode; + AggregationNode finalAggregation = distributionInfo.finalAggregation; + boolean needSplit = distributionInfo.needSplit; + + Map< + TRegionReplicaSet, + Pair< + AlignedAggregationTreeDeviceViewScanNode, + NonAlignedAggregationTreeDeviceViewScanNode>> + tableScanNodeMap = new HashMap<>(); + + for (int i = 0; i < regionReplicaSetsList.size(); i++) { + DeviceEntry deviceEntry = node.getDeviceEntries().get(i); + List<TRegionReplicaSet> regionReplicaSets = regionReplicaSetsList.get(i); + + for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) { + boolean aligned = deviceEntry instanceof AlignedDeviceEntry; + Pair<AlignedAggregationTreeDeviceViewScanNode, NonAlignedAggregationTreeDeviceViewScanNode> + pair = tableScanNodeMap.computeIfAbsent(regionReplicaSet, k -> new Pair<>(null, null)); + + if (aligned && pair.left == null) { + AlignedAggregationTreeDeviceViewScanNode scanNode = + new AlignedAggregationTreeDeviceViewScanNode( + queryId.genPlanNodeId(), + templateNode.getQualifiedObjectName(), + templateNode.getOutputSymbols(), + templateNode.getAssignments(), + new ArrayList<>(), + templateNode.getTagAndAttributeIndexMap(), + templateNode.getScanOrder(), + templateNode.getTimePredicate().orElse(null), + templateNode.getPushDownPredicate(), + templateNode.getPushDownLimit(), + templateNode.getPushDownOffset(), + templateNode.isPushLimitToEachDevice(), + templateNode.containsNonAlignedDevice(), + templateNode.getProjection(), + templateNode.getAggregations(), + templateNode.getGroupingSets(), + templateNode.getPreGroupedSymbols(), + templateNode.getStep(), + templateNode.getGroupIdSymbol(), + node.getTreeDBName(), + node.getMeasurementColumnNameMap()); + scanNode.setRegionReplicaSet(regionReplicaSet); + pair.left = scanNode; + } + + if (!aligned && pair.right == null) { + NonAlignedAggregationTreeDeviceViewScanNode scanNode = + new NonAlignedAggregationTreeDeviceViewScanNode( + queryId.genPlanNodeId(), + templateNode.getQualifiedObjectName(), + templateNode.getOutputSymbols(), + templateNode.getAssignments(), + new ArrayList<>(), + templateNode.getTagAndAttributeIndexMap(), + templateNode.getScanOrder(), + templateNode.getTimePredicate().orElse(null), + templateNode.getPushDownPredicate(), + templateNode.getPushDownLimit(), + templateNode.getPushDownOffset(), + templateNode.isPushLimitToEachDevice(), + templateNode.containsNonAlignedDevice(), + templateNode.getProjection(), + templateNode.getAggregations(), + templateNode.getGroupingSets(), + templateNode.getPreGroupedSymbols(), + templateNode.getStep(), + templateNode.getGroupIdSymbol(), + node.getTreeDBName(), + node.getMeasurementColumnNameMap()); + scanNode.setRegionReplicaSet(regionReplicaSet); + pair.right = scanNode; + } + + if (aligned) { + pair.left.appendDeviceEntry(deviceEntry); + } else { + pair.right.appendDeviceEntry(deviceEntry); + } + } + } + + if (tableScanNodeMap.isEmpty()) { + node.setRegionReplicaSet(NOT_ASSIGNED); + return Collections.singletonList(node); + } + + List<PlanNode> resultTableScanNodeList = new ArrayList<>(); + TRegionReplicaSet mostUsedDataRegion = null; + int maxDeviceEntrySizeOfTableScan = 0; + for (Map.Entry< + TRegionReplicaSet, + Pair< + AlignedAggregationTreeDeviceViewScanNode, + NonAlignedAggregationTreeDeviceViewScanNode>> + entry : topology.filterReachableCandidates(tableScanNodeMap.entrySet())) { + TRegionReplicaSet regionReplicaSet = entry.getKey(); + Pair<AlignedAggregationTreeDeviceViewScanNode, NonAlignedAggregationTreeDeviceViewScanNode> + pair = entry.getValue(); + int currentDeviceEntrySize = 0; + + if (pair.left != null) { + currentDeviceEntrySize += pair.left.getDeviceEntries().size(); + resultTableScanNodeList.add(pair.left); + } + + if (pair.right != null) { + currentDeviceEntrySize += pair.right.getDeviceEntries().size(); + resultTableScanNodeList.add(pair.right); + } + + if (mostUsedDataRegion == null || currentDeviceEntrySize > maxDeviceEntrySizeOfTableScan) { + mostUsedDataRegion = regionReplicaSet; + maxDeviceEntrySizeOfTableScan = currentDeviceEntrySize; + } + } + if (mostUsedDataRegion == null) { + throw new RootFIPlacementException(tableScanNodeMap.keySet()); + } + context.mostUsedRegion = mostUsedDataRegion; + + if (context.hasSortProperty) { + processSortProperty(node, resultTableScanNodeList, context); + } + if (needSplit) { if (resultTableScanNodeList.size() == 1) { finalAggregation.setChild(resultTableScanNodeList.get(0)); @@ -1293,6 +1403,83 @@ public class TableDistributedPlanGenerator return resultTableScanNodeList; } + private static class AggregationDistributionInfo { + private final List<List<TRegionReplicaSet>> regionReplicaSetsList; + private final AggregationTableScanNode templateNode; + private final AggregationNode finalAggregation; + private final boolean needSplit; + + AggregationDistributionInfo( + List<List<TRegionReplicaSet>> regionReplicaSetsList, + AggregationTableScanNode templateNode, + AggregationNode finalAggregation, + boolean needSplit) { + this.regionReplicaSetsList = regionReplicaSetsList; + this.templateNode = templateNode; + this.finalAggregation = finalAggregation; + this.needSplit = needSplit; + } + } + + private AggregationDistributionInfo prepareAggregationDistribution( + AggregationTableScanNode node, + String dbName, + DataPartition dataPartition, + PlanContext context) { + boolean needSplit = false; + List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>(); + + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> seriesSlotMap = + dataPartition.getDataPartitionMap().get(dbName); + if (seriesSlotMap == null) { + throw new SemanticException( + String.format("Given queried database: %s is not exist!", dbName)); + } + + Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new HashMap<>(); + for (DeviceEntry deviceEntry : node.getDeviceEntries()) { + List<TRegionReplicaSet> regionReplicaSets = + getDeviceReplicaSets( + dataPartition, + seriesSlotMap, + deviceEntry.getDeviceID(), + node.getTimeFilter(), + cachedSeriesSlotWithRegions); + if (regionReplicaSets.size() > 1) { + needSplit = true; + context.deviceCrossRegion = true; + queryContext.setNeedUpdateScanNumForLastQuery(node.mayUseLastCache()); + } + regionReplicaSetsList.add(regionReplicaSets); + } + + if (regionReplicaSetsList.isEmpty()) { + regionReplicaSetsList = Collections.singletonList(Collections.singletonList(NOT_ASSIGNED)); + } + + AggregationTableScanNode templateNode = node; + AggregationNode finalAggregation = null; + // Step is SINGLE and device data in more than one region, we need to final aggregate the result + // from different region here, so split this node into two-stage + needSplit = needSplit && node.getStep() == SINGLE; + if (needSplit) { + Pair<AggregationNode, AggregationTableScanNode> splitResult = + split(node, symbolAllocator, queryId); + finalAggregation = splitResult.left; + templateNode = splitResult.right; + + // cover case: complete push-down + group by + streamable + if (!context.hasSortProperty && finalAggregation.isStreamable()) { + OrderingScheme expectedOrderingSchema = + constructOrderingSchema(node.getPreGroupedSymbols()); + context.setExpectedOrderingScheme(expectedOrderingSchema); + } + } + + return new AggregationDistributionInfo( + regionReplicaSetsList, templateNode, finalAggregation, needSplit); + } + private List<TRegionReplicaSet> getDeviceReplicaSets( DataPartition dataPartition, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> seriesSlotMap, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java new file mode 100644 index 00000000000..9c879717321 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class AlignedAggregationTreeDeviceViewScanNode extends AggregationTreeDeviceViewScanNode { + + public AlignedAggregationTreeDeviceViewScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List<Symbol> outputSymbols, + Map<Symbol, ColumnSchema> assignments, + List<DeviceEntry> deviceEntries, + Map<Symbol, Integer> tagAndAttributeIndexMap, + Ordering scanOrder, + Expression timePredicate, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset, + boolean pushLimitToEachDevice, + boolean containsNonAlignedDevice, + Assignments projection, + Map<Symbol, AggregationNode.Aggregation> aggregations, + AggregationNode.GroupingSetDescriptor groupingSets, + List<Symbol> preGroupedSymbols, + AggregationNode.Step step, + Optional<Symbol> groupIdSymbol, + String treeDBName, + Map<String, String> measurementColumnNameMap) { + super( + id, + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice, + projection, + aggregations, + groupingSets, + preGroupedSymbols, + step, + groupIdSymbol, + treeDBName, + measurementColumnNameMap); + } + + @Override + public AlignedAggregationTreeDeviceViewScanNode clone() { + return new AlignedAggregationTreeDeviceViewScanNode( + getPlanNodeId(), + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice, + projection, + aggregations, + groupingSets, + preGroupedSymbols, + step, + groupIdSymbol, + getTreeDBName(), + getMeasurementColumnNameMap()); + } + + @Override + public String toString() { + return "AlignedAggregationTreeDeviceViewScanNode-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java new file mode 100644 index 00000000000..2649023b906 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class NonAlignedAggregationTreeDeviceViewScanNode extends AggregationTreeDeviceViewScanNode { + + public NonAlignedAggregationTreeDeviceViewScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List<Symbol> outputSymbols, + Map<Symbol, ColumnSchema> assignments, + List<DeviceEntry> deviceEntries, + Map<Symbol, Integer> tagAndAttributeIndexMap, + Ordering scanOrder, + Expression timePredicate, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset, + boolean pushLimitToEachDevice, + boolean containsNonAlignedDevice, + Assignments projection, + Map<Symbol, AggregationNode.Aggregation> aggregations, + AggregationNode.GroupingSetDescriptor groupingSets, + List<Symbol> preGroupedSymbols, + AggregationNode.Step step, + Optional<Symbol> groupIdSymbol, + String treeDBName, + Map<String, String> measurementColumnNameMap) { + super( + id, + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice, + projection, + aggregations, + groupingSets, + preGroupedSymbols, + step, + groupIdSymbol, + treeDBName, + measurementColumnNameMap); + } + + @Override + public NonAlignedAggregationTreeDeviceViewScanNode clone() { + return new NonAlignedAggregationTreeDeviceViewScanNode( + getPlanNodeId(), + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice, + projection, + aggregations, + groupingSets, + preGroupedSymbols, + step, + groupIdSymbol, + getTreeDBName(), + getMeasurementColumnNameMap()); + } + + @Override + public String toString() { + return "NonAlignedAggregationTreeDeviceViewScanNode-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java index a47794fd329..3bcb6f1935b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java @@ -105,10 +105,6 @@ public class PushAggregationIntoTableScan implements PlanOptimizer { return node; } - if (tableScanNode.containsNonAlignedDevice()) { - return node; - } - PushDownLevel pushDownLevel = calculatePushDownLevel( node.getAggregations().values(), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java index 351bcb7f605..1dcd7d5487a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java @@ -140,7 +140,7 @@ public class TreeViewTest { public void aggregationQueryTest() { PlanTester planTester = new PlanTester(); - // has non-aligned DeviceEntry, no push-down + // has non-aligned DeviceEntry LogicalQueryPlan logicalQueryPlan = planTester.createPlan( "select tag1, count(s1) from " @@ -149,14 +149,83 @@ public class TreeViewTest { PlanMatchPattern expectedPlanPattern = output( aggregation( - ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("s1"))), - treeDeviceViewTableScan( + ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("count_0"))), + aggregationTreeDeviceViewTableScan( + singleGroupingSet("tag1"), + ImmutableList.of("tag1"), + Optional.empty(), + PARTIAL, DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME, - ImmutableList.of("tag1", "s1"), + ImmutableList.of("tag1", "count_0"), ImmutableSet.of("tag1", "s1")))); assertPlan(logicalQueryPlan, expectedPlanPattern); - // only aligned DeviceEntry, do push-down + assertPlan( + planTester.getFragmentPlan(0), + output( + aggregation( + ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("count_1"))), + FINAL, + mergeSort(exchange(), exchange(), exchange(), exchange())))); + + assertPlan( + planTester.getFragmentPlan(1), + aggregation( + ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("count_0"))), + INTERMEDIATE, + aggregationTreeDeviceViewTableScan( + singleGroupingSet("tag1"), + ImmutableList.of("tag1"), + Optional.empty(), + PARTIAL, + DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME, + ImmutableList.of("tag1", "count_0"), + ImmutableSet.of("tag1", "s1"), + true))); + assertPlan( + planTester.getFragmentPlan(2), + aggregation( + ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("count_0"))), + INTERMEDIATE, + aggregationTreeDeviceViewTableScan( + singleGroupingSet("tag1"), + ImmutableList.of("tag1"), + Optional.empty(), + PARTIAL, + DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME, + ImmutableList.of("tag1", "count_0"), + ImmutableSet.of("tag1", "s1"), + false))); + assertPlan( + planTester.getFragmentPlan(3), + aggregation( + ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("count_0"))), + INTERMEDIATE, + aggregationTreeDeviceViewTableScan( + singleGroupingSet("tag1"), + ImmutableList.of("tag1"), + Optional.empty(), + PARTIAL, + DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME, + ImmutableList.of("tag1", "count_0"), + ImmutableSet.of("tag1", "s1"), + true))); + assertPlan( + planTester.getFragmentPlan(4), + aggregation( + ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("count_0"))), + INTERMEDIATE, + aggregationTreeDeviceViewTableScan( + singleGroupingSet("tag1"), + ImmutableList.of("tag1"), + Optional.empty(), + PARTIAL, + DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME, + ImmutableList.of("tag1", "count_0"), + ImmutableSet.of("tag1", "s1"), + false))); + + // only aligned DeviceEntry logicalQueryPlan = planTester.createPlan( "select tag1, count(s1) from " @@ -199,7 +268,8 @@ public class TreeViewTest { PARTIAL, DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME, ImmutableList.of("tag1", "count_0"), - ImmutableSet.of("tag1", "s1")))); + ImmutableSet.of("tag1", "s1"), + true))); } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java index 03f79fd2ec2..2ca57e5296a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.GroupRe import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; @@ -43,6 +44,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; @@ -429,6 +431,38 @@ public final class PlanMatchPattern { return result; } + public static PlanMatchPattern aggregationTreeDeviceViewTableScan( + GroupingSetDescriptor groupingSets, + List<String> preGroupedSymbols, + Optional<Symbol> groupId, + AggregationNode.Step step, + String expectedTableName, + List<String> outputSymbols, + Set<String> assignmentsKeys, + boolean aligned) { + PlanMatchPattern result = + aligned + ? node(AlignedAggregationTreeDeviceViewScanNode.class) + : node(NonAlignedAggregationTreeDeviceViewScanNode.class); + + result.with( + new AggregationDeviceTableScanMatcher( + groupingSets, + preGroupedSymbols, + ImmutableList.of(), + groupId, + step, + expectedTableName, + Optional.empty(), + outputSymbols, + assignmentsKeys)); + + outputSymbols.forEach( + outputSymbol -> + result.withAlias(outputSymbol, new ColumnReference(expectedTableName, outputSymbol))); + return result; + } + // Attention: Now we only pass aliases according to outputSymbols, but we don't verify the output // column if exists in Table and their order because there maybe partial Agg-result. public static PlanMatchPattern aggregationTableScan(
