This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch QueryMetricFix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/QueryMetricFix by this push:
new c0c34d29f96 Change OpeartorContext
c0c34d29f96 is described below
commit c0c34d29f96c992b78d4b24d2fee1c4e78eb92f0
Author: JackieTien97 <[email protected]>
AuthorDate: Sun Nov 24 11:01:38 2024 +0800
Change OpeartorContext
---
.../plan/planner/TableOperatorGenerator.java | 75 ++++++++++++++++------
1 file changed, 54 insertions(+), 21 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index fe0e8671f4a..adce0a3b607 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -383,7 +383,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- TableScanNode.class.getSimpleName());
+ TableScanOperator.class.getSimpleName());
int maxTsBlockLineNum =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
if (context.getTypeProvider().getTemplatedInfo() != null) {
@@ -1132,7 +1132,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- StreamSortNode.class.getSimpleName());
+ TableStreamSortOperator.class.getSimpleName());
List<TSDataType> dataTypes = getOutputColumnTypes(node,
context.getTypeProvider());
int sortItemsCount = node.getOrderingScheme().getOrderBy().size();
@@ -1176,11 +1176,6 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
@Override
public Operator visitJoin(JoinNode node, LocalExecutionPlanContext context) {
- OperatorContext operatorContext =
- context
- .getDriverContext()
- .addOperatorContext(
- context.getNextOperatorId(), node.getPlanNodeId(),
JoinNode.class.getSimpleName());
List<TSDataType> dataTypes = getOutputColumnTypes(node,
context.getTypeProvider());
Operator leftChild = node.getLeftChild().accept(this, context);
@@ -1221,6 +1216,13 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
}
if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.INNER) {
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ TableInnerJoinOperator.class.getSimpleName());
return new TableInnerJoinOperator(
operatorContext,
leftChild,
@@ -1232,6 +1234,13 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
ASC_TIME_COMPARATOR,
dataTypes);
} else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.FULL) {
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ TableFullOuterJoinOperator.class.getSimpleName());
return new TableFullOuterJoinOperator(
operatorContext,
leftChild,
@@ -1357,25 +1366,28 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
@Override
public Operator visitAggregation(AggregationNode node,
LocalExecutionPlanContext context) {
- OperatorContext operatorContext =
- context
- .getDriverContext()
- .addOperatorContext(
- context.getNextOperatorId(),
- node.getPlanNodeId(),
- AggregationNode.class.getSimpleName());
+
Operator child = node.getChild().accept(this, context);
if (node.getGroupingKeys().isEmpty()) {
- return planGlobalAggregation(node, child, context.getTypeProvider(),
operatorContext);
+ return planGlobalAggregation(node, child, context.getTypeProvider(),
context);
}
- return planGroupByAggregation(node, child, context.getTypeProvider(),
operatorContext);
+ return planGroupByAggregation(node, child, context.getTypeProvider(),
context);
}
private Operator planGlobalAggregation(
- AggregationNode node, Operator child, TypeProvider typeProvider,
OperatorContext context) {
-
+ AggregationNode node,
+ Operator child,
+ TypeProvider typeProvider,
+ LocalExecutionPlanContext context) {
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ AggregationOperator.class.getSimpleName());
Map<Symbol, AggregationNode.Aggregation> aggregationMap =
node.getAggregations();
ImmutableList.Builder<TableAggregator> aggregatorBuilder = new
ImmutableList.Builder<>();
Map<Symbol, Integer> childLayout =
@@ -1393,7 +1405,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
typeProvider,
true,
null)));
- return new AggregationOperator(context, child, aggregatorBuilder.build());
+ return new AggregationOperator(operatorContext, child,
aggregatorBuilder.build());
}
// timeColumnName will only be set for AggTableScan.
@@ -1438,7 +1450,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
AggregationNode node,
Operator child,
TypeProvider typeProvider,
- OperatorContext operatorContext) {
+ LocalExecutionPlanContext context) {
Map<Symbol, Integer> childLayout =
makeLayoutFromOutputSymbols(node.getChild().getOutputSymbols());
@@ -1458,6 +1470,13 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
buildAggregator(
childLayout, k, v, node.getStep(), typeProvider,
true, null)));
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ StreamingAggregationOperator.class.getSimpleName());
return new StreamingAggregationOperator(
operatorContext,
child,
@@ -1499,6 +1518,13 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
}
List<Integer> preGroupedChannels = preGroupedChannelsBuilder.build();
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ StreamingHashAggregationOperator.class.getSimpleName());
return new StreamingHashAggregationOperator(
operatorContext,
child,
@@ -1522,6 +1548,13 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
(k, v) ->
aggregatorBuilder.add(
buildGroupByAggregator(childLayout, k, v, node.getStep(),
typeProvider)));
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ HashAggregationOperator.class.getSimpleName());
return new HashAggregationOperator(
operatorContext,
@@ -1739,7 +1772,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- AggregationTableScanNode.class.getSimpleName());
+ TableAggregationTableScanOperator.class.getSimpleName());
SeriesScanOptions.Builder scanOptionsBuilder =
node.getTimePredicate().isPresent()
? getSeriesScanOptionsBuilder(context,
node.getTimePredicate().get())