This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch calc_commons in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e78fb691dc7acb56330740a32ad2e721e18508ee Author: shuwenwei <[email protected]> AuthorDate: Fri Apr 17 10:46:38 2026 +0800 add ITableOperatorGeneratorContext --- .../planner/ITableOperatorGeneratorContext.java | 30 ++ .../plan/planner/TableOperatorGenerator.java | 401 ++++++--------------- .../planner/DataNodeTableOperatorGenerator.java | 128 ++----- .../plan/planner/LocalExecutionPlanContext.java | 3 +- 4 files changed, 188 insertions(+), 374 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/plan/planner/ITableOperatorGeneratorContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/plan/planner/ITableOperatorGeneratorContext.java new file mode 100644 index 00000000000..0948e39cb6a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/plan/planner/ITableOperatorGeneratorContext.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.calc_commons.plan.planner; + +import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; + +import java.time.ZoneId; + +public interface ITableOperatorGeneratorContext { + TypeProvider getTypeProvider(); + + ZoneId getZoneId(); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/plan/planner/TableOperatorGenerator.java index 87e4266c04e..59da75bfe1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/plan/planner/TableOperatorGenerator.java @@ -161,7 +161,6 @@ import org.apache.iotdb.db.node_commons.plan.relational.sql.ast.Literal; import org.apache.iotdb.db.node_commons.plan.relational.sql.ast.SymbolReference; import org.apache.iotdb.db.node_commons.plan.relational.type.InternalTypeManager; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; -import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanContext; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.utils.datastructure.SortKey; import org.apache.iotdb.udf.api.relational.TableFunction; @@ -234,8 +233,8 @@ import static org.apache.tsfile.read.common.type.StringType.STRING; import static org.apache.tsfile.read.common.type.TimestampType.TIMESTAMP; /** This Visitor is responsible for transferring Table PlanNode Tree to Table Operator Tree. */ -public abstract class TableOperatorGenerator - implements ICoreQueryPlanVisitor<Operator, LocalExecutionPlanContext> { +public abstract class TableOperatorGenerator<C extends ITableOperatorGeneratorContext> + implements ICoreQueryPlanVisitor<Operator, C> { protected final Metadata metadata; @@ -244,7 +243,7 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitPlan(PlanNode node, LocalExecutionPlanContext context) { + public Operator visitPlan(PlanNode node, C context) { throw new UnsupportedOperationException("should call the concrete visitXX() method"); } @@ -278,7 +277,7 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitFilter(FilterNode node, LocalExecutionPlanContext context) { + public Operator visitFilter(FilterNode node, C context) { TypeProvider typeProvider = context.getTypeProvider(); Optional<Expression> predicate = Optional.of(node.getPredicate()); Operator inputOperator = node.getChild().accept(this, context); @@ -302,7 +301,7 @@ public abstract class TableOperatorGenerator List<TSDataType> inputDataTypes, Map<Symbol, List<InputLocation>> inputLocations, PlanNodeId planNodeId, - LocalExecutionPlanContext context) { + C context) { final List<TSDataType> filterOutputDataTypes = new ArrayList<>(inputDataTypes); @@ -365,12 +364,7 @@ public abstract class TableOperatorGenerator } final OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - planNodeId, - FilterAndProjectOperator.class.getSimpleName()); + addOperatorContext(context, planNodeId, FilterAndProjectOperator.class.getSimpleName()); // Project expressions don't contain Non-Mappable UDF, TransformOperator is not needed return new FilterAndProjectOperator( @@ -387,7 +381,7 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitProject(ProjectNode node, LocalExecutionPlanContext context) { + public Operator visitProject(ProjectNode node, C context) { TypeProvider typeProvider = context.getTypeProvider(); Optional<Expression> predicate; Operator inputOperator; @@ -426,7 +420,7 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitGapFill(GapFillNode node, LocalExecutionPlanContext context) { + public Operator visitGapFill(GapFillNode node, C context) { Operator child = node.getChild().accept(this, context); List<TSDataType> inputDataTypes = getOutputColumnTypes(node.getChild(), context.getTypeProvider()); @@ -434,12 +428,8 @@ public abstract class TableOperatorGenerator if (node.getGapFillGroupingKeys().isEmpty()) { // without group keys if (node.getMonthDuration() == 0) { // without month interval OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - GapFillWoGroupWoMoOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), GapFillWoGroupWoMoOperator.class.getSimpleName()); return new GapFillWoGroupWoMoOperator( operatorContext, child, @@ -450,12 +440,8 @@ public abstract class TableOperatorGenerator node.getNonMonthDuration()); } else { // with month interval OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - GapFillWoGroupWMoOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), GapFillWoGroupWMoOperator.class.getSimpleName()); return new GapFillWoGroupWMoOperator( operatorContext, child, @@ -474,12 +460,8 @@ public abstract class TableOperatorGenerator node.getGapFillGroupingKeys(), node, inputDataTypes, groupingKeysIndexSet); if (node.getMonthDuration() == 0) { // without month interval OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - GapFillWGroupWoMoOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), GapFillWGroupWoMoOperator.class.getSimpleName()); return new GapFillWGroupWoMoOperator( operatorContext, child, @@ -492,12 +474,8 @@ public abstract class TableOperatorGenerator node.getNonMonthDuration()); } else { // with month interval OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - GapFillWGroupWMoOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), GapFillWGroupWMoOperator.class.getSimpleName()); return new GapFillWGroupWMoOperator( operatorContext, child, @@ -514,7 +492,7 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitPreviousFill(PreviousFillNode node, LocalExecutionPlanContext context) { + public Operator visitPreviousFill(PreviousFillNode node, C context) { Operator child = node.getChild().accept(this, context); List<TSDataType> inputDataTypes = @@ -533,12 +511,8 @@ public abstract class TableOperatorGenerator if (node.getGroupingKeys().isPresent()) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - PreviousFillWithGroupOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), PreviousFillWithGroupOperator.class.getSimpleName()); return new PreviousFillWithGroupOperator( operatorContext, fillArray, @@ -549,12 +523,8 @@ public abstract class TableOperatorGenerator inputDataTypes); } else { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - TableFillOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), TableFillOperator.class.getSimpleName()); return new TableFillOperator(operatorContext, fillArray, child, helperColumnIndex); } } @@ -598,7 +568,7 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitLinearFill(LinearFillNode node, LocalExecutionPlanContext context) { + public Operator visitLinearFill(LinearFillNode node, C context) { Operator child = node.getChild().accept(this, context); List<TSDataType> inputDataTypes = @@ -609,12 +579,10 @@ public abstract class TableOperatorGenerator if (node.getGroupingKeys().isPresent()) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - TableLinearFillWithGroupOperator.class.getSimpleName()); + addOperatorContext( + context, + node.getPlanNodeId(), + TableLinearFillWithGroupOperator.class.getSimpleName()); return new TableLinearFillWithGroupOperator( operatorContext, fillArray, @@ -625,26 +593,17 @@ public abstract class TableOperatorGenerator inputDataTypes); } else { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - TableLinearFillOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), TableLinearFillOperator.class.getSimpleName()); return new TableLinearFillOperator(operatorContext, fillArray, child, helperColumnIndex); } } @Override - public Operator visitValueFill(ValueFillNode node, LocalExecutionPlanContext context) { + public Operator visitValueFill(ValueFillNode node, C context) { Operator child = node.getChild().accept(this, context); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - TableFillOperator.class.getSimpleName()); + addOperatorContext(context, node.getPlanNodeId(), TableFillOperator.class.getSimpleName()); List<TSDataType> inputDataTypes = getOutputColumnTypes(node.getChild(), context.getTypeProvider()); int inputColumnCount = inputDataTypes.size(); @@ -657,10 +616,7 @@ public abstract class TableOperatorGenerator } private IFill[] getValueFill( - int inputColumnCount, - List<TSDataType> inputDataTypes, - Literal filledValue, - LocalExecutionPlanContext context) { + int inputColumnCount, List<TSDataType> inputDataTypes, Literal filledValue, C context) { IFill[] constantFill = new IFill[inputColumnCount]; for (int i = 0; i < inputColumnCount; i++) { switch (inputDataTypes.get(i)) { @@ -747,47 +703,32 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitLimit(LimitNode node, LocalExecutionPlanContext context) { + public Operator visitLimit(LimitNode node, C context) { Operator child = node.getChild().accept(this, context); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - LimitOperator.class.getSimpleName()); + addOperatorContext(context, node.getPlanNodeId(), LimitOperator.class.getSimpleName()); return new LimitOperator(operatorContext, node.getCount(), child); } @Override - public Operator visitOffset(OffsetNode node, LocalExecutionPlanContext context) { + public Operator visitOffset(OffsetNode node, C context) { Operator child = node.getChild().accept(this, context); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - OffsetOperator.class.getSimpleName()); + addOperatorContext(context, node.getPlanNodeId(), OffsetOperator.class.getSimpleName()); return new OffsetOperator(operatorContext, node.getCount(), child); } @Override - public Operator visitOutput(OutputNode node, LocalExecutionPlanContext context) { + public Operator visitOutput(OutputNode node, C context) { return node.getChild().accept(this, context); } @Override - public Operator visitCollect(CollectNode node, LocalExecutionPlanContext context) { + public Operator visitCollect(CollectNode node, C context) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - CollectOperator.class.getSimpleName()); + addOperatorContext(context, node.getPlanNodeId(), CollectOperator.class.getSimpleName()); List<Operator> children = new ArrayList<>(node.getChildren().size()); for (PlanNode child : node.getChildren()) { children.add(this.process(child, context)); @@ -796,14 +737,10 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitMergeSort(MergeSortNode node, LocalExecutionPlanContext context) { + public Operator visitMergeSort(MergeSortNode node, C context) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - TableMergeSortOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), TableMergeSortOperator.class.getSimpleName()); List<Operator> children = new ArrayList<>(node.getChildren().size()); for (PlanNode child : node.getChildren()) { children.add(this.process(child, context)); @@ -829,14 +766,9 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitSort(SortNode node, LocalExecutionPlanContext context) { + public Operator visitSort(SortNode node, C context) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - TableSortOperator.class.getSimpleName()); + addOperatorContext(context, node.getPlanNodeId(), TableSortOperator.class.getSimpleName()); List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider()); int sortItemsCount = node.getOrderingScheme().getOrderBy().size(); @@ -862,15 +794,13 @@ public abstract class TableOperatorGenerator protected abstract String getSortTmpDir(OperatorContext operatorContext); + protected abstract OperatorContext addOperatorContext( + C context, PlanNodeId planNodeId, String operatorType); + @Override - public Operator visitTopK(TopKNode node, LocalExecutionPlanContext context) { + public Operator visitTopK(TopKNode node, C context) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - TableTopKOperator.class.getSimpleName()); + addOperatorContext(context, node.getPlanNodeId(), TableTopKOperator.class.getSimpleName()); List<Operator> children = new ArrayList<>(node.getChildren().size()); for (PlanNode child : node.getChildren()) { children.add(this.process(child, context)); @@ -929,14 +859,10 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitStreamSort(StreamSortNode node, LocalExecutionPlanContext context) { + public Operator visitStreamSort(StreamSortNode node, C context) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - TableStreamSortOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), TableStreamSortOperator.class.getSimpleName()); List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider()); int sortItemsCount = node.getOrderingScheme().getOrderBy().size(); @@ -968,7 +894,7 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitGroup(GroupNode node, LocalExecutionPlanContext context) { + public Operator visitGroup(GroupNode node, C context) { if (node.getPartitionKeyCount() == 0) { SortNode sortNode = new SortNode( @@ -988,7 +914,7 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitJoin(JoinNode node, LocalExecutionPlanContext context) { + public Operator visitJoin(JoinNode node, C context) { List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider()); Operator leftChild = node.getLeftChild().accept(this, context); @@ -1023,12 +949,10 @@ public abstract class TableOperatorGenerator // cross join does not need time column if (node.isCrossJoin()) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - SimpleNestedLoopCrossJoinOperator.class.getSimpleName()); + addOperatorContext( + context, + node.getPlanNodeId(), + SimpleNestedLoopCrossJoinOperator.class.getSimpleName()); return new SimpleNestedLoopCrossJoinOperator( operatorContext, leftChild, @@ -1094,12 +1018,10 @@ public abstract class TableOperatorGenerator if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.INNER) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - AsofMergeSortInnerJoinOperator.class.getSimpleName()); + addOperatorContext( + context, + node.getPlanNodeId(), + AsofMergeSortInnerJoinOperator.class.getSimpleName()); return new AsofMergeSortInnerJoinOperator( operatorContext, leftChild, @@ -1116,12 +1038,8 @@ public abstract class TableOperatorGenerator dataTypes); } else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.LEFT) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - AsofMergeSortLeftJoinOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), AsofMergeSortLeftJoinOperator.class.getSimpleName()); return new AsofMergeSortLeftJoinOperator( operatorContext, leftChild, @@ -1143,12 +1061,8 @@ public abstract class TableOperatorGenerator if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.INNER) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - MergeSortInnerJoinOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), MergeSortInnerJoinOperator.class.getSimpleName()); return new MergeSortInnerJoinOperator( operatorContext, leftChild, @@ -1161,12 +1075,8 @@ public abstract class TableOperatorGenerator dataTypes); } else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.FULL) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - MergeSortFullOuterJoinOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), MergeSortFullOuterJoinOperator.class.getSimpleName()); return new MergeSortFullOuterJoinOperator( operatorContext, leftChild, @@ -1180,12 +1090,8 @@ public abstract class TableOperatorGenerator joinKeyTypes.stream().map(this::buildUpdateLastRowFunction).collect(Collectors.toList())); } else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.LEFT) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - MergeSortLeftJoinOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), MergeSortLeftJoinOperator.class.getSimpleName()); return new MergeSortLeftJoinOperator( operatorContext, leftChild, @@ -1251,7 +1157,7 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitSemiJoin(SemiJoinNode node, LocalExecutionPlanContext context) { + public Operator visitSemiJoin(SemiJoinNode node, C context) { List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider()); Operator leftChild = node.getLeftChild().accept(this, context); @@ -1287,12 +1193,8 @@ public abstract class TableOperatorGenerator context.getTypeProvider().getTableModelType(node.getFilteringSourceJoinSymbol())); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - MergeSortSemiJoinOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), MergeSortSemiJoinOperator.class.getSimpleName()); return new MergeSortSemiJoinOperator( operatorContext, leftChild, @@ -1315,36 +1217,27 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitEnforceSingleRow( - EnforceSingleRowNode node, LocalExecutionPlanContext context) { + public Operator visitEnforceSingleRow(EnforceSingleRowNode node, C context) { Operator child = node.getChild().accept(this, context); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - EnforceSingleRowOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), EnforceSingleRowOperator.class.getSimpleName()); return new EnforceSingleRowOperator(operatorContext, child); } @Override - public Operator visitAssignUniqueId(AssignUniqueId node, LocalExecutionPlanContext context) { + public Operator visitAssignUniqueId(AssignUniqueId node, C context) { Operator child = node.getChild().accept(this, context); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - EnforceSingleRowOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), EnforceSingleRowOperator.class.getSimpleName()); return new AssignUniqueIdOperator(operatorContext, child); } @Override - public Operator visitAggregation(AggregationNode node, LocalExecutionPlanContext context) { + public Operator visitAggregation(AggregationNode node, C context) { Operator child = node.getChild().accept(this, context); @@ -1356,17 +1249,10 @@ public abstract class TableOperatorGenerator } private Operator planGlobalAggregation( - AggregationNode node, - Operator child, - TypeProvider typeProvider, - LocalExecutionPlanContext context) { + AggregationNode node, Operator child, TypeProvider typeProvider, C context) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - AggregationOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), AggregationOperator.class.getSimpleName()); Map<Symbol, AggregationNode.Aggregation> aggregationMap = node.getAggregations(); ImmutableList.Builder<TableAggregator> aggregatorBuilder = new ImmutableList.Builder<>(); Map<Symbol, Integer> childLayout = @@ -1438,10 +1324,7 @@ public abstract class TableOperatorGenerator } protected Operator planGroupByAggregation( - AggregationNode node, - Operator child, - TypeProvider typeProvider, - LocalExecutionPlanContext context) { + AggregationNode node, Operator child, TypeProvider typeProvider, C context) { Map<Symbol, Integer> childLayout = makeLayoutFromOutputSymbols(node.getChild().getOutputSymbols()); @@ -1470,12 +1353,8 @@ public abstract class TableOperatorGenerator Collections.emptySet()))); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - StreamingAggregationOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), StreamingAggregationOperator.class.getSimpleName()); return new StreamingAggregationOperator( operatorContext, child, @@ -1518,12 +1397,10 @@ public abstract class TableOperatorGenerator List<Integer> preGroupedChannels = preGroupedChannelsBuilder.build(); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - StreamingHashAggregationOperator.class.getSimpleName()); + addOperatorContext( + context, + node.getPlanNodeId(), + StreamingHashAggregationOperator.class.getSimpleName()); return new StreamingHashAggregationOperator( operatorContext, child, @@ -1548,12 +1425,8 @@ public abstract class TableOperatorGenerator aggregatorBuilder.add( buildGroupByAggregator(childLayout, k, v, node.getStep(), typeProvider))); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - HashAggregationOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), HashAggregationOperator.class.getSimpleName()); return new HashAggregationOperator( operatorContext, @@ -1627,8 +1500,7 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitTableFunctionProcessor( - TableFunctionProcessorNode node, LocalExecutionPlanContext context) { + public Operator visitTableFunctionProcessor(TableFunctionProcessorNode node, C context) { TableFunction tableFunction = metadata.getTableFunction(node.getName()); TableFunctionProcessorProvider processorProvider = tableFunction.getProcessorProvider(node.getTableFunctionHandle()); @@ -1639,22 +1511,14 @@ public abstract class TableOperatorGenerator .map(InternalTypeManager::getTSDataType) .collect(Collectors.toList()); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - TableFunctionLeafOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), TableFunctionLeafOperator.class.getSimpleName()); return new TableFunctionLeafOperator(operatorContext, processorProvider, outputDataTypes); } else { Operator operator = node.getChild().accept(this, context); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - TableFunctionOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), TableFunctionOperator.class.getSimpleName()); List<TSDataType> inputDataTypes = node.getChild().getOutputSymbols().stream() @@ -1732,15 +1596,10 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitPatternRecognition( - PatternRecognitionNode node, LocalExecutionPlanContext context) { + public Operator visitPatternRecognition(PatternRecognitionNode node, C context) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - PatternRecognitionOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), PatternRecognitionOperator.class.getSimpleName()); Operator child = node.getChild().accept(this, context); @@ -2097,15 +1956,11 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitMarkDistinct(MarkDistinctNode node, LocalExecutionPlanContext context) { + public Operator visitMarkDistinct(MarkDistinctNode node, C context) { Operator child = node.getChild().accept(this, context); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - MarkDistinctOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), MarkDistinctOperator.class.getSimpleName()); TypeProvider typeProvider = context.getTypeProvider(); Map<Symbol, Integer> childLayout = @@ -2122,16 +1977,12 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitWindowFunction(WindowNode node, LocalExecutionPlanContext context) { + public Operator visitWindowFunction(WindowNode node, C context) { TypeProvider typeProvider = context.getTypeProvider(); Operator child = node.getChild().accept(this, context); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - TableWindowOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), TableWindowOperator.class.getSimpleName()); Map<Symbol, Integer> childLayout = makeLayoutFromOutputSymbols(node.getChild().getOutputSymbols()); @@ -2286,18 +2137,14 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitUnion(UnionNode node, LocalExecutionPlanContext context) { + public Operator visitUnion(UnionNode node, C context) { List<Operator> children = node.getChildren().stream() .map(child -> child.accept(this, context)) .collect(Collectors.toList()); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - MappingCollectOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), MappingCollectOperator.class.getSimpleName()); int size = children.size(); List<List<Integer>> mappings = new ArrayList<>(size); @@ -2316,14 +2163,10 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitValuesNode(ValuesNode node, LocalExecutionPlanContext context) { + public Operator visitValuesNode(ValuesNode node, C context) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - MappingCollectOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), MappingCollectOperator.class.getSimpleName()); // Currently we only support empty values operator assert node.getRowCount() == 0; @@ -2331,15 +2174,11 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitRowNumber(RowNumberNode node, LocalExecutionPlanContext context) { + public Operator visitRowNumber(RowNumberNode node, C context) { Operator child = node.getChild().accept(this, context); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - MappingCollectOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), MappingCollectOperator.class.getSimpleName()); List<Symbol> partitionBySymbols = node.getPartitionBy(); Map<Symbol, Integer> childLayout = @@ -2372,15 +2211,11 @@ public abstract class TableOperatorGenerator } @Override - public Operator visitTopKRanking(TopKRankingNode node, LocalExecutionPlanContext context) { + public Operator visitTopKRanking(TopKRankingNode node, C context) { Operator child = node.getChild().accept(this, context); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - MappingCollectOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), MappingCollectOperator.class.getSimpleName()); List<Symbol> partitionBySymbols = node.getSpecification().getPartitionBy(); Map<Symbol, Integer> childLayout = @@ -2438,5 +2273,5 @@ public abstract class TableOperatorGenerator Optional.empty()); } - protected abstract SessionInfo getSessionInfo(LocalExecutionPlanContext context); + protected abstract SessionInfo getSessionInfo(C context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index 5d33c254838..48ba79cb922 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.node_commons.common.SessionInfo; import org.apache.iotdb.db.node_commons.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.node_commons.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.node_commons.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.node_commons.plan.relational.planner.Symbol; import org.apache.iotdb.db.node_commons.plan.relational.planner.node.AggregationNode; @@ -195,7 +196,8 @@ import static org.apache.iotdb.db.utils.constant.SqlConstant.MIN; import static org.apache.iotdb.db.utils.constant.SqlConstant.SUM; import static org.apache.tsfile.read.common.type.TimestampType.TIMESTAMP; -public class DataNodeTableOperatorGenerator extends TableOperatorGenerator +public class DataNodeTableOperatorGenerator + extends TableOperatorGenerator<LocalExecutionPlanContext> implements PlanVisitor<Operator, LocalExecutionPlanContext> { private static final MPPDataExchangeManager MPP_DATA_EXCHANGE_MANAGER = @@ -217,15 +219,18 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator + File.separator; } + @Override + protected OperatorContext addOperatorContext( + LocalExecutionPlanContext context, PlanNodeId planNodeId, String operatorType) { + return context + .getDriverContext() + .addOperatorContext(context.getNextOperatorId(), planNodeId, operatorType); + } + @Override public Operator visitCteScan(CteScanNode node, LocalExecutionPlanContext context) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - CteScanOperator.class.getSimpleName()); + addOperatorContext(context, node.getPlanNodeId(), CteScanOperator.class.getSimpleName()); return new CteScanOperator( operatorContext, node.getPlanNodeId(), @@ -237,12 +242,8 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator public Operator visitIdentitySink(IdentitySinkNode node, LocalExecutionPlanContext context) { context.addExchangeSumNum(1); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - IdentitySinkOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), IdentitySinkOperator.class.getSimpleName()); String downStreamPlanNodeId = node.getDownStreamChannelLocationList().stream() .map(DownStreamChannelLocation::getRemotePlanNodeId) @@ -280,12 +281,7 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator @Override public Operator visitTableExchange(ExchangeNode node, LocalExecutionPlanContext context) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - ExchangeOperator.class.getSimpleName()); + addOperatorContext(context, node.getPlanNodeId(), ExchangeOperator.class.getSimpleName()); FragmentInstanceId localInstanceId = context.getInstanceContext().getId(); FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId(); @@ -377,12 +373,8 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator // can be obtained, so an empty result set is returned. if (!containsFieldColumn || node.getDeviceEntries().isEmpty()) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - EmptyDataOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), EmptyDataOperator.class.getSimpleName()); return new EmptyDataOperator(operatorContext); } String treePrefixPath = DataNodeTreeViewSchemaUtils.getPrefixPath(tsTable); @@ -447,10 +439,7 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator boolean isSingleColumn = measurementSchemas.size() == 1; - OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext(context.getNextOperatorId(), node.getPlanNodeId(), className); + OperatorContext operatorContext = addOperatorContext(context, node.getPlanNodeId(), className); Set<String> allSensors = new HashSet<>(measurementColumnNames); @@ -1037,10 +1026,7 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator seriesScanOptions.setTTLForTableView(viewTTL); seriesScanOptions.setIsTableViewForTreeModel(node instanceof TreeDeviceViewScanNode); - OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext(context.getNextOperatorId(), node.getPlanNodeId(), className); + OperatorContext operatorContext = addOperatorContext(context, node.getPlanNodeId(), className); int maxTsBlockLineNum = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(); if (context.getTypeProvider().getTemplatedInfo() != null) { @@ -1085,12 +1071,8 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator TreeDeviceViewScanNode node, LocalExecutionPlanContext context) { if (node.getDeviceEntries().isEmpty() || node.getTreeDBName() == null) { OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - EmptyDataOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), EmptyDataOperator.class.getSimpleName()); return new EmptyDataOperator(operatorContext); } throw new IllegalArgumentException("Valid TreeDeviceViewScanNode is not expected here."); @@ -1138,12 +1120,10 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator public Operator visitInformationSchemaTableScan( final InformationSchemaTableScanNode node, final LocalExecutionPlanContext context) { final OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - InformationSchemaTableScanOperator.class.getSimpleName()); + addOperatorContext( + context, + node.getPlanNodeId(), + InformationSchemaTableScanOperator.class.getSimpleName()); final List<TSDataType> dataTypes = node.getOutputSymbols().stream() @@ -1160,12 +1140,7 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator public Operator visitCountMerge( final CountSchemaMergeNode node, final LocalExecutionPlanContext context) { final OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - CountMergeOperator.class.getSimpleName()); + addOperatorContext(context, node.getPlanNodeId(), CountMergeOperator.class.getSimpleName()); final List<Operator> children = new ArrayList<>(node.getChildren().size()); for (final PlanNode child : node.getChildren()) { children.add(this.process(child, context)); @@ -1177,12 +1152,8 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator public Operator visitTableDeviceFetch( final TableDeviceFetchNode node, final LocalExecutionPlanContext context) { final OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - SchemaQueryScanOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), SchemaQueryScanOperator.class.getSimpleName()); return new SchemaQueryScanOperator<>( node.getPlanNodeId(), operatorContext, @@ -1202,12 +1173,8 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator final SchemaQueryScanOperator<IDeviceSchemaInfo> operator = new SchemaQueryScanOperator<>( node.getPlanNodeId(), - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - SchemaQueryScanOperator.class.getSimpleName()), + addOperatorContext( + context, node.getPlanNodeId(), SchemaQueryScanOperator.class.getSimpleName()), SchemaSourceFactory.getTableDeviceQuerySource( node.getDatabase(), table, @@ -1236,12 +1203,8 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator final List<LeafColumnTransformer> filterLeafColumnTransformerList = new ArrayList<>(); return new SchemaCountOperator<>( node.getPlanNodeId(), - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - SchemaCountOperator.class.getSimpleName()), + addOperatorContext( + context, node.getPlanNodeId(), SchemaCountOperator.class.getSimpleName()), SchemaSourceFactory.getTableDeviceQuerySource( database, table, @@ -1532,9 +1495,7 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator } final OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext(context.getNextOperatorId(), node.getPlanNodeId(), className); + addOperatorContext(context, node.getPlanNodeId(), className); SeriesScanOptions seriesScanOptions = buildSeriesScanOptions( context, @@ -1883,12 +1844,8 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator public Operator visitExplainAnalyze(ExplainAnalyzeNode node, LocalExecutionPlanContext context) { Operator operator = node.getChild().accept(this, context); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - ExplainAnalyzeOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), ExplainAnalyzeOperator.class.getSimpleName()); return new ExplainAnalyzeOperator( operatorContext, operator, node.getQueryId(), node.isVerbose(), node.getTimeout()); } @@ -1912,12 +1869,8 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator Operator operator = childNode.accept(this, context); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - TableCopyToOperator.class.getSimpleName()); + addOperatorContext( + context, node.getPlanNodeId(), TableCopyToOperator.class.getSimpleName()); return new TableCopyToOperator( operatorContext, operator, @@ -1931,12 +1884,7 @@ public class DataNodeTableOperatorGenerator extends TableOperatorGenerator public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) { Operator child = node.getChild().accept(this, context); OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - TableIntoOperator.class.getSimpleName()); + addOperatorContext(context, node.getPlanNodeId(), TableIntoOperator.class.getSimpleName()); PartialPath targetTable = new PartialPath(node.getTable(), false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java index 1f930344ca7..a410817decd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.planner; import org.apache.iotdb.db.calc_commons.execution.operator.Operator; +import org.apache.iotdb.db.calc_commons.plan.planner.ITableOperatorGeneratorContext; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.node_commons.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.node_commons.plan.planner.plan.node.PlanNodeId; @@ -64,7 +65,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; // Attention: We should use thread-safe data structure for members that are shared by all pipelines -public class LocalExecutionPlanContext { +public class LocalExecutionPlanContext implements ITableOperatorGeneratorContext { private static final Logger LOGGER = LoggerFactory.getLogger(LocalExecutionPlanContext.class); // Save operators in this pipeline, a new one will be created when creating another pipeline
