This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch udtf-optimize in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit eeb91ecd48f8cbce2a22a372496ec39db22cd752 Merge: d734b704d36 88f48151bd8 Author: Chen YZ <[email protected]> AuthorDate: Mon Feb 24 01:15:54 2025 +0800 save .../db/it/udf/IoTDBUserDefinedTableFunctionIT.java | 5 +- .../specification/ParameterSpecification.java | 1 - .../specification/TableParameterSpecification.java | 18 +------ .../process/function/TableFunctionOperator.java | 3 ++ .../config/executor/ClusterConfigTaskExecutor.java | 18 +++++++ .../relational/analyzer/StatementAnalyzer.java | 17 ------- .../tablefunction/TableArgumentAnalysis.java | 23 +-------- .../plan/relational/planner/OrderingScheme.java | 13 +---- .../plan/relational/planner/QueryPlanner.java | 1 - .../plan/relational/planner/RelationPlanner.java | 1 - .../distribute/TableDistributedPlanGenerator.java | 56 ++++++++++++---------- .../rule/ImplementTableFunctionSource.java | 43 +---------------- .../rule/PruneTableFunctionProcessorColumns.java | 1 - .../PruneTableFunctionProcessorSourceColumns.java | 1 - .../relational/planner/node/TableFunctionNode.java | 11 ----- .../planner/node/TableFunctionProcessorNode.java | 18 ------- .../optimizations/LogicalOptimizeFactory.java | 1 - .../planner/optimizations/ParallelizeGrouping.java | 15 ------ .../TransformAggregationToStreamable.java | 5 +- .../optimizations/UnaliasSymbolReferences.java | 3 -- .../sql/ast/TableFunctionTableArgument.java | 15 ++---- .../plan/relational/sql/parser/AstBuilder.java | 17 +------ .../plan/relational/sql/util/SqlFormatter.java | 6 --- .../process/tvf/TableFunctionOperatorTest.java | 5 -- .../builtin/relational/tvf/HOPTableFunction.java | 1 - .../db/relational/grammar/sql/RelationalSql.g4 | 1 - 26 files changed, 68 insertions(+), 231 deletions(-) diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java index 1e953bf8189,1e953bf8189..47e5b9041b0 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java @@@ -186,6 -186,6 +186,9 @@@ public class TableFunctionOperator impl // if there is no proper column, use pass through column's position count positionCount = passThroughIndexBuilder.getPositionCount(); } ++ if (positionCount == 0) { ++ return null; ++ } blockBuilder.declarePositions(positionCount); if (needPassThrough) { // handle pass through column only if needed diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java index 97ad4a78d8e,508408571bc..c86c8bee957 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java @@@ -39,6 -39,6 +39,7 @@@ import static com.google.common.collect import static java.util.Objects.requireNonNull; public class OrderingScheme { ++ private final List<Symbol> orderBy; private final Map<Symbol, SortOrder> orderings; diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 278824a9466,3ac405e9255..8ddc35912bd --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@@ -314,36 -309,6 +314,36 @@@ public class TableDistributedPlanGenera return Collections.singletonList(newTopKNode); } + @Override + public List<PlanNode> visitSortBasedGroup(SortBasedGroupNode node, PlanContext context) { - boolean pushDown = context.pushDownAuxSort; ++ boolean pushDown = context.isPushDownGrouping(); + try { - context.setPushDownAuxSort(node.isEnableParalleled()); ++ context.setPushDownGrouping(node.isEnableParalleled()); + if (node.isEnableParalleled()) { + List<PlanNode> result = new ArrayList<>(); + context.setExpectedOrderingScheme(node.getOrderingScheme()); + List<PlanNode> childrenNodes = node.getChild().accept(this, context); + for (PlanNode child : childrenNodes) { + if (canSortEliminated( + node.getOrderingScheme(), nodeOrderingMap.get(child.getPlanNodeId()))) { + result.add(child); + } else { + SortNode subSortNode = + new SortNode( + queryId.genPlanNodeId(), child, node.getOrderingScheme(), false, false); + result.add(subSortNode); + nodeOrderingMap.put(subSortNode.getPlanNodeId(), subSortNode.getOrderingScheme()); + } + } + return result; + } else { + return visitSort(node, context); + } + } finally { - context.pushDownAuxSort = pushDown; ++ context.setPushDownGrouping(pushDown); + } + } + @Override public List<PlanNode> visitSort(SortNode node, PlanContext context) { context.setExpectedOrderingScheme(node.getOrderingScheme()); @@@ -515,82 -482,8 +515,82 @@@ public List<PlanNode> visitDeviceTableScan( final DeviceTableScanNode node, final PlanContext context) { - if (context.isPushDownAuxSort()) { ++ if (context.isPushDownGrouping()) { + return constructDeviceTableScanByTags(node, context); + } else { + return constructDeviceTableScanByRegionReplicaSet(node, context); + } + } + + private List<PlanNode> constructDeviceTableScanByTags( + final DeviceTableScanNode node, final PlanContext context) { + List<PlanNode> result = new ArrayList<>(); + List<DeviceEntry> crossRegionDevices = new ArrayList<>(); + final Map<TRegionReplicaSet, DeviceTableScanNode> tableScanNodeMap = new HashMap<>(); + final Map<TRegionReplicaSet, Integer> regionDeviceCount = new HashMap<>(); + for (final DeviceEntry deviceEntry : node.getDeviceEntries()) { + final List<TRegionReplicaSet> regionReplicaSets = + analysis.getDataRegionReplicaSetWithTimeFilter( + node.getQualifiedObjectName().getDatabaseName(), + deviceEntry.getDeviceID(), + node.getTimeFilter()); + regionReplicaSets.forEach( + regionReplicaSet -> + regionDeviceCount.put( + regionReplicaSet, regionDeviceCount.getOrDefault(regionReplicaSet, 0) + 1)); + if (regionReplicaSets.size() != 1) { + crossRegionDevices.add(deviceEntry); + continue; + } + final DeviceTableScanNode deviceTableScanNode = + tableScanNodeMap.computeIfAbsent( + regionReplicaSets.get(0), + k -> { + final DeviceTableScanNode scanNode = + new DeviceTableScanNode( + queryId.genPlanNodeId(), + node.getQualifiedObjectName(), + node.getOutputSymbols(), + node.getAssignments(), + new ArrayList<>(), + node.getIdAndAttributeIndexMap(), + node.getScanOrder(), + node.getTimePredicate().orElse(null), + node.getPushDownPredicate(), + node.getPushDownLimit(), + node.getPushDownOffset(), + node.isPushLimitToEachDevice(), + node.containsNonAlignedDevice()); + scanNode.setRegionReplicaSet(regionReplicaSets.get(0)); + return scanNode; + }); + deviceTableScanNode.appendDeviceEntry(deviceEntry); + } + result.addAll(tableScanNodeMap.values()); + if (context.hasSortProperty) { + processSortProperty(node, result, context); + } + context.mostUsedRegion = + regionDeviceCount.entrySet().stream() + .max(Comparator.comparingInt(Map.Entry::getValue)) + .map(Map.Entry::getKey) + .orElse(null); + if (!crossRegionDevices.isEmpty()) { + node.setDeviceEntries(crossRegionDevices); + result.add( + new CollectNode( + queryId.genPlanNodeId(), + constructDeviceTableScanByRegionReplicaSet(node, context), + node.getOutputSymbols())); + } + return result; + } + + private List<PlanNode> constructDeviceTableScanByRegionReplicaSet( + final DeviceTableScanNode node, final PlanContext context) { + final Map<TRegionReplicaSet, DeviceTableScanNode> tableScanNodeMap = new HashMap<>(); for (final DeviceEntry deviceEntry : node.getDeviceEntries()) { final List<TRegionReplicaSet> regionReplicaSets = analysis.getDataRegionReplicaSetWithTimeFilter( @@@ -973,23 -855,16 +973,29 @@@ if (node.getChildren().isEmpty()) { return Collections.singletonList(node); } - boolean canSplitPushDown = - node.isRowSemantic() - || (node.getChild() instanceof SortBasedGroupNode) - && ((SortBasedGroupNode) node.getChild()).isEnableParalleled(); -- List<PlanNode> childrenNodes = node.getChild().accept(this, context); -- if (childrenNodes.size() == 1) { -- node.setChild(childrenNodes.get(0)); - return Collections.singletonList(node); - } else if (!canSplitPushDown) { - } else { -- CollectNode collectNode = -- new CollectNode(queryId.genPlanNodeId(), node.getChildren().get(0).getOutputSymbols()); -- childrenNodes.forEach(collectNode::addChild); -- node.setChild(collectNode); - return Collections.singletonList(node); - } else { - return splitForEachChild(node, childrenNodes); ++ boolean pushDown = context.isPushDownGrouping(); ++ try { ++ context.setPushDownGrouping(node.isRowSemantic()); ++ boolean canSplitPushDown = ++ node.isRowSemantic() ++ || (node.getChild() instanceof SortBasedGroupNode) ++ && ((SortBasedGroupNode) node.getChild()).isEnableParalleled(); ++ List<PlanNode> childrenNodes = node.getChild().accept(this, context); ++ if (childrenNodes.size() == 1) { ++ node.setChild(childrenNodes.get(0)); ++ return Collections.singletonList(node); ++ } else if (!canSplitPushDown) { ++ CollectNode collectNode = ++ new CollectNode(queryId.genPlanNodeId(), node.getChildren().get(0).getOutputSymbols()); ++ childrenNodes.forEach(collectNode::addChild); ++ node.setChild(collectNode); ++ return Collections.singletonList(node); ++ } else { ++ return splitForEachChild(node, childrenNodes); ++ } ++ } finally { ++ context.setPushDownGrouping(pushDown); } - return Collections.singletonList(node); } private void buildRegionNodeMap( @@@ -1388,7 -1263,6 +1394,7 @@@ final Map<PlanNodeId, NodeDistribution> nodeDistributionMap; boolean hasExchangeNode = false; boolean hasSortProperty = false; - boolean pushDownAuxSort = false; ++ boolean pushDownGrouping = false; OrderingScheme expectedOrderingScheme; TRegionReplicaSet mostUsedRegion; @@@ -1409,13 -1283,5 +1415,13 @@@ this.expectedOrderingScheme = expectedOrderingScheme; hasSortProperty = true; } + - public void setPushDownAuxSort(boolean pushDownAuxSort) { - this.pushDownAuxSort = pushDownAuxSort; ++ public void setPushDownGrouping(boolean pushDownGrouping) { ++ this.pushDownGrouping = pushDownGrouping; + } + - public boolean isPushDownAuxSort() { - return pushDownAuxSort; ++ public boolean isPushDownGrouping() { ++ return pushDownGrouping; + } } } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java index ff6d7514c68,d2d4be0dfba..2ca4b286267 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java @@@ -48,45 -45,45 +48,6 @@@ import static org.apache.iotdb.db.query * * <p>It rewrites TableFunctionNode with potentially many sources into a TableFunctionProcessorNode. * The new node has one source being a combination of the original sources. -- * -- * <p>The original sources are combined with joins. The join conditions depend on the prune when -- * empty property, and on the co-partitioning of sources. -- * -- * <p>The resulting source should be partitioned and ordered according to combined schemas from the -- * component sources. -- * -- * <p>Example transformation for two sources, both with set semantics and KEEP WHEN EMPTY property: -- * -- * <pre>{@code -- * - TableFunction foo -- * - source T1(a1, b1) PARTITION BY a1 ORDER BY b1 -- * - source T2(a2, b2) PARTITION BY a2 -- * }</pre> -- * -- * Is transformed into: -- * -- * <pre>{@code -- * - TableFunctionProcessor foo -- * PARTITION BY (a1, a2), ORDER BY combined_row_number -- * - Project -- * marker_1 <= IF(table1_row_number = combined_row_number, table1_row_number, CAST(null AS bigint)) -- * marker_2 <= IF(table2_row_number = combined_row_number, table2_row_number, CAST(null AS bigint)) -- * - Project -- * combined_row_number <= IF(COALESCE(table1_row_number, BIGINT '-1') > COALESCE(table2_row_number, BIGINT '-1'), table1_row_number, table2_row_number) -- * combined_partition_size <= IF(COALESCE(table1_partition_size, BIGINT '-1') > COALESCE(table2_partition_size, BIGINT '-1'), table1_partition_size, table2_partition_size) -- * - FULL Join -- * [table1_row_number = table2_row_number OR -- * table1_row_number > table2_partition_size AND table2_row_number = BIGINT '1' OR -- * table2_row_number > table1_partition_size AND table1_row_number = BIGINT '1'] -- * - Window [PARTITION BY a1 ORDER BY b1] -- * table1_row_number <= row_number() -- * table1_partition_size <= count() -- * - source T1(a1, b1) -- * - Window [PARTITION BY a2] -- * table2_row_number <= row_number() -- * table2_partition_size <= count() -- * - source T2(a2, b2) -- * }</pre> */ public class ImplementTableFunctionSource implements Rule<TableFunctionNode> { @@@ -163,11 -154,9 +123,10 @@@ Optional.ofNullable(sourceProperties.getPassThroughSpecification()), sourceProperties.getRequiredColumns(), sourceProperties.getDataOrganizationSpecification(), + sourceProperties.isRowSemantics(), node.getArguments())); } else { - // TODO(UDF): we dont support multiple source now. + // we don't support multiple source now. throw new IllegalArgumentException("table function does not support multiple source now."); } } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java index b3b8ac1d11c,782a3d64e0e..757ec9236b3 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java @@@ -122,14 -108,6 +114,10 @@@ public class TableFunctionProcessorNod return properOutputs; } + public boolean isRowSemantic() { + return rowSemantic; + } + - public boolean isPruneWhenEmpty() { - return pruneWhenEmpty; - } - public Optional<TableFunctionNode.PassThroughSpecification> getPassThroughSpecification() { return passThroughSpecification; } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java index 48207824795,00000000000..5777e32fe93 mode 100644,000000..100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java @@@ -1,308 -1,0 +1,293 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; +import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; - import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG; +import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanParalleled.ENABLE; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanParalleled.PENDING; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanParalleled.UNABLE; + +/** + * This rule is used to determine whether the SortBasedGroupNode can be parallelized during Logical + * + * <p>Optimization phase: Logical plan planning. + * + * <p>The SortBasedGroupNode can be parallelized if the following conditions are met: + * + * <ul> + * SortingKey is empty and the result child node has been pre-grouped. In the other world, the + * PartitionKey matches the lasted offspring that guarantees the data is grouped by PartitionKey. + * For example: + * <li>SortBasedGroupNode[tag1,tag2] -> SortNode[sort=tag1] + * <li>SortBasedGroupNode[tag1,tag2] -> TopKNode[sort=tag1,tag2] + * <li>SortBasedGroupNode[tag1,tag2] -> AggregationNode[group=tag1] + * <li>SortBasedGroupNode[tag1,tag2] -> TableFunctionNode[partition=tag1] + * </ul> + * + * <ul> + * SortingKey is time column and the lasted offspring that guarantees the data is grouped by + * PartitionKey is TableDeviceScanNode. For example: + * <li>SortBasedGroupNode[device_id,time] -> ... -> TableDeviceScanNode + * </ul> + */ +public class ParallelizeGrouping implements PlanOptimizer { + @Override + public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) { + if (!(context.getAnalysis().isQuery())) { + return plan; + } + // TODO: remove println + System.out.println("before optimize ParallelizeGrouping =========================="); + PlanGraphPrinter.print(plan); + PlanNode res = plan.accept(new Rewriter(context.getAnalysis()), new Context(null, 0)); + System.out.println("after optimize ParallelizeGrouping =========================="); + PlanGraphPrinter.print(res); + return res; + // return plan.accept(new Rewriter(context.getAnalysis()), new Context()); + } + + private static class Rewriter extends PlanVisitor<PlanNode, Context> { + private final Analysis analysis; + + public Rewriter(Analysis analysis) { + this.analysis = analysis; + } + + @Override + public PlanNode visitPlan(PlanNode node, Context context) { + PlanNode newNode = node.clone(); + for (PlanNode child : node.getChildren()) { + newNode.addChild(child.accept(this, context)); + } + return newNode; + } + + /** + * We need to make sure: + * + * <ul> + * <li>(1) All keys in context#orderKey are used for partition. + * <li>(2) childOrderSchema can match the prefix of context#orderKey, so that partition-based + * operation can be pushed down. + * </ul> + */ + private void checkPrefixMatch(Context context, List<Symbol> childOrder) { + if (context.canSkip()) { + return; + } + OrderingScheme prefix = context.sortKey; + if (prefix.getOrderBy().size() != context.partitionKeyCount) { + context.canParalleled = UNABLE; + return; + } + if (prefix.getOrderBy().size() > childOrder.size()) { + context.canParalleled = UNABLE; + return; + } + for (int i = 0; i < prefix.getOrderBy().size(); i++) { + Symbol lhs = prefix.getOrderBy().get(i); + Symbol rhs = childOrder.get(i); + if (!lhs.equals(rhs)) { + context.canParalleled = UNABLE; + return; + } + } + context.canParalleled = ENABLE; + } + + @Override + public PlanNode visitSortBasedGroup(SortBasedGroupNode node, Context context) { + checkPrefixMatch(context, node.getOrderingScheme().getOrderBy()); + Context newContext = new Context(node.getOrderingScheme(), node.getPartitionKeyCount()); + SortBasedGroupNode newNode = (SortBasedGroupNode) node.clone(); + newNode.addChild(node.getChild().accept(this, newContext)); + if (newContext.canParalleled.equals(ENABLE)) { + newNode.setEnableParalleled(true); + } + return newNode; + } + + @Override + public PlanNode visitSort(SortNode node, Context context) { + checkPrefixMatch(context, node.getOrderingScheme().getOrderBy()); + return visitPlan(node, context); + } + + @Override + public PlanNode visitStreamSort(StreamSortNode node, Context context) { + checkPrefixMatch(context, node.getOrderingScheme().getOrderBy()); + return visitPlan(node, context); + } + + @Override + public PlanNode visitTopK(TopKNode node, Context context) { + checkPrefixMatch(context, node.getOrderingScheme().getOrderBy()); + return visitPlan(node, context); + } + + @Override + public PlanNode visitJoin(JoinNode node, Context context) { + context.canParalleled = UNABLE; + return visitPlan(node, context); + } + + @Override + public PlanNode visitCorrelatedJoin(CorrelatedJoinNode node, Context context) { + context.canParalleled = UNABLE; + return visitPlan(node, context); + } + + @Override + public PlanNode visitSemiJoin(SemiJoinNode node, Context context) { + context.canParalleled = UNABLE; + return visitPlan(node, context); + } + + @Override + public PlanNode visitTableFunctionProcessor(TableFunctionProcessorNode node, Context context) { + if (!context.canSkip()) { + if (node.getChildren().isEmpty()) { + // leaf node + context.canParalleled = UNABLE; + return node; + } + Optional<DataOrganizationSpecification> dataOrganizationSpecification = + node.getDataOrganizationSpecification(); + if (!dataOrganizationSpecification.isPresent()) { + context.canParalleled = UNABLE; + } else { + checkPrefixMatch(context, dataOrganizationSpecification.get().getPartitionBy()); + } + } + return visitPlan(node, context); + } + - @Override - public PlanNode visitProject(ProjectNode node, Context context) { - if (!context.canSkip()) { - OrderingScheme sortKey = context.sortKey; - for (int i = 0; i < sortKey.getOrderBy().size(); i++) { - if (!node.getAssignments().contains(sortKey.getOrderBy().get(i))) { - context.canParalleled = UNABLE; - break; - } - } - } - return visitPlan(node, context); - } - + @Override + public PlanNode visitDeviceTableScan(DeviceTableScanNode node, Context context) { + if (!context.canSkip()) { + OrderingScheme sortKey = context.sortKey; + Map<Symbol, ColumnSchema> tableColumnSchema = + analysis.getTableColumnSchema(node.getQualifiedObjectName()); + // 1. It is possible for the last sort key to be a time column + if (sortKey.getOrderBy().size() > context.partitionKeyCount + 1) { + context.canParalleled = UNABLE; + return node; + } else if (sortKey.getOrderBy().size() == context.partitionKeyCount + 1) { + Symbol lastSymbol = sortKey.getOrderBy().get(context.partitionKeyCount); + if (!tableColumnSchema.containsKey(lastSymbol) + || tableColumnSchema.get(lastSymbol).getColumnCategory() != TIME) { + context.canParalleled = UNABLE; + return node; + } + } + // 2. check there are no field in sortKey and all tags in sortKey + Set<Symbol> tagSymbols = + tableColumnSchema.entrySet().stream() + .filter(entry -> entry.getValue().getColumnCategory() == TAG) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + for (int i = 0; i < context.partitionKeyCount; i++) { + Symbol symbol = sortKey.getOrderBy().get(i); + if (!tableColumnSchema.containsKey(symbol)) { + context.canParalleled = UNABLE; + return node; + } + switch (tableColumnSchema.get(symbol).getColumnCategory()) { + case TAG: + tagSymbols.remove(symbol); + break; + case ATTRIBUTE: + // If all tags in partition key, attributes must be the same in one partition. + break; + default: + context.canParalleled = UNABLE; + return node; + } + } + if (!tagSymbols.isEmpty()) { + context.canParalleled = UNABLE; + return node; + } + context.canParalleled = ENABLE; + } + return node; + } + + @Override + public PlanNode visitAggregation(AggregationNode node, Context context) { + checkPrefixMatch(context, node.getGroupingKeys()); + return visitPlan(node, context); + } + + @Override + public PlanNode visitAggregationTableScan(AggregationTableScanNode node, Context context) { + checkPrefixMatch(context, node.getGroupingKeys()); + return node; + } + } + + private static class Context { + private final OrderingScheme sortKey; + private final int partitionKeyCount; + private CanParalleled canParalleled = PENDING; + + private Context(OrderingScheme sortKey, int partitionKeyCount) { + this.sortKey = sortKey; + this.partitionKeyCount = partitionKeyCount; + } + + private boolean canSkip() { + return sortKey == null || canParalleled != PENDING; + } + } + + protected enum CanParalleled { + ENABLE, + UNABLE, + PENDING + } +} diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java index 7e9d27253ba,340d9ebf592..0c2a1cd4a77 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java @@@ -124,22 -121,6 +124,25 @@@ public class TransformAggregationToStre return ImmutableList.of(); } + @Override + public List<Symbol> visitTableFunctionProcessor( + TableFunctionProcessorNode node, GroupContext context) { + if (node.getChildren().isEmpty()) { - // leaf node + return ImmutableList.of(); ++ } else if (node.isRowSemantic()) { ++ return visitPlan(node, context); + } ++ ++ // return ImmutableList.of(); + Optional<DataOrganizationSpecification> dataOrganizationSpecification = + node.getDataOrganizationSpecification(); + return dataOrganizationSpecification + .<List<Symbol>>map( + organizationSpecification -> + ImmutableList.copyOf(organizationSpecification.getPartitionBy())) + .orElseGet(ImmutableList::of); + } + @Override public List<Symbol> visitDeviceTableScan(DeviceTableScanNode node, GroupContext context) { Set<Symbol> expectedGroupingKeys = context.groupingKeys;
