This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6fab5c3597 Fix some issues in MPP query prepared for representation
(#6130)
6fab5c3597 is described below
commit 6fab5c3597ea845363caee17a300a3374a63ae1d
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Thu Jun 2 11:20:08 2022 +0800
Fix some issues in MPP query prepared for representation (#6130)
---
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 2 +-
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 2 +-
.../plan/planner/distribution/SourceRewriter.java | 26 +++++++++++---
.../plan/planner/plan/node/PlanGraphPrinter.java | 42 ++++++++++++++++++++--
.../distribution/AggregationDistributionTest.java | 2 +-
5 files changed, 64 insertions(+), 10 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 96e9dfdc1c..1b30176d58 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -838,7 +838,7 @@ public class LocalExecutionPlanner {
context
.getTypeProvider()
// get the type of first inputExpression
-
.getType(descriptor.getInputExpressions().get(0).toString()),
+ .getType(inputColumnNames.get(0)),
ascending),
descriptor.getStep(),
inputLocationList));
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index ecb7b4e73c..a451623307 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -380,7 +380,7 @@ public class LogicalPlanBuilder {
return this;
}
- private void updateTypeProviderByPartialAggregation(
+ public static void updateTypeProviderByPartialAggregation(
AggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) {
List<AggregationType> splitAggregations =
SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 6c20206302..41cda1a735 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanBuilder;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanNodeRewriter;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
@@ -309,7 +310,10 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
AggregationStep.PARTIAL,
descriptor.getInputExpressions()));
});
-
+ leafAggDescriptorList.forEach(
+ d ->
+ LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+ d, analysis.getTypeProvider()));
List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
node.getAggregationDescriptorList()
.forEach(
@@ -482,10 +486,14 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
rootAggDescriptorList.add(
new AggregationDescriptor(
descriptor.getAggregationType(),
- context.isRoot ? AggregationStep.FINAL :
AggregationStep.PARTIAL,
+ context.isRoot ? AggregationStep.FINAL :
AggregationStep.INTERMEDIATE,
descriptor.getInputExpressions()));
});
}
+ rootAggDescriptorList.forEach(
+ d ->
+ LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+ d, analysis.getTypeProvider()));
checkArgument(
sources.size() > 0, "Aggregation sources should not be empty when
distribution planning");
SeriesAggregationSourceNode seed = sources.get(0);
@@ -629,6 +637,8 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
}
if (keep) {
descriptorList.add(originalDescriptor);
+ LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+ originalDescriptor, analysis.getTypeProvider());
}
}
handle.setAggregationDescriptorList(descriptorList);
@@ -657,10 +667,11 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
continue;
}
GroupByLevelDescriptor descriptor = originalDescriptor.deepClone();
- descriptor.setStep(level == 0 ? AggregationStep.FINAL :
AggregationStep.PARTIAL);
+ descriptor.setStep(level == 0 ? AggregationStep.FINAL :
AggregationStep.INTERMEDIATE);
descriptor.setInputExpressions(descriptorExpression);
-
descriptorList.add(descriptor);
+ LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+ descriptor, analysis.getTypeProvider());
}
handle.setGroupByLevelDescriptors(descriptorList);
}
@@ -705,7 +716,12 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
boolean isFinal = false;
source
.getAggregationDescriptorList()
- .forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL :
AggregationStep.PARTIAL));
+ .forEach(
+ d -> {
+ d.setStep(isFinal ? AggregationStep.FINAL :
AggregationStep.PARTIAL);
+ LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+ d, analysis.getTypeProvider());
+ });
}
return sources;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
index f6321b7ea5..4e3ce79d52 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggreg
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.commons.lang3.Validate;
@@ -81,7 +82,10 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
public List<String> visitAlignedSeriesScan(AlignedSeriesScanNode node,
GraphContext context) {
List<String> boxValue = new ArrayList<>();
boxValue.add(String.format("AlignedSeriesScan-%s",
node.getPlanNodeId().getId()));
- boxValue.add(String.format("Series: %s", node.getAlignedPath()));
+ boxValue.add(
+ String.format(
+ "Series: %s%s",
+ node.getAlignedPath().getDevice(),
node.getAlignedPath().getMeasurementList()));
boxValue.add(String.format("PartitionId: %s",
node.getRegionReplicaSet().getRegionId().id));
return render(node, boxValue, context);
}
@@ -92,6 +96,12 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
List<String> boxValue = new ArrayList<>();
boxValue.add(String.format("SeriesAggregationScan-%s",
node.getPlanNodeId().getId()));
boxValue.add(String.format("Series: %s", node.getSeriesPath()));
+ for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) {
+ AggregationDescriptor descriptor =
node.getAggregationDescriptorList().get(i);
+ boxValue.add(
+ String.format(
+ "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(),
descriptor.getStep()));
+ }
boxValue.add(String.format("PartitionId: %s",
node.getRegionReplicaSet().getRegionId().id));
return render(node, boxValue, context);
}
@@ -101,7 +111,16 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
AlignedSeriesAggregationScanNode node, GraphContext context) {
List<String> boxValue = new ArrayList<>();
boxValue.add(String.format("AlignedSeriesAggregationScan-%s",
node.getPlanNodeId().getId()));
- boxValue.add(String.format("Series: %s", node.getAlignedPath()));
+ boxValue.add(
+ String.format(
+ "Series: %s%s",
+ node.getAlignedPath().getDevice(),
node.getAlignedPath().getMeasurementList()));
+ for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) {
+ AggregationDescriptor descriptor =
node.getAggregationDescriptorList().get(i);
+ boxValue.add(
+ String.format(
+ "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(),
descriptor.getStep()));
+ }
boxValue.add(String.format("PartitionId: %s",
node.getRegionReplicaSet().getRegionId().id));
return render(node, boxValue, context);
}
@@ -149,6 +168,13 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
public List<String> visitGroupByLevel(GroupByLevelNode node, GraphContext
context) {
List<String> boxValue = new ArrayList<>();
boxValue.add(String.format("GroupByLevel-%s",
node.getPlanNodeId().getId()));
+ for (int i = 0; i < node.getGroupByLevelDescriptors().size(); i++) {
+ AggregationDescriptor descriptor =
node.getGroupByLevelDescriptors().get(i);
+ boxValue.add(
+ String.format(
+ "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(),
descriptor.getStep()));
+ boxValue.add(String.format("Output-%d: %s", i,
descriptor.getOutputColumnNames()));
+ }
return render(node, boxValue, context);
}
@@ -157,6 +183,12 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
SlidingWindowAggregationNode node, GraphContext context) {
List<String> boxValue = new ArrayList<>();
boxValue.add(String.format("SlidingWindowAggregation-%s",
node.getPlanNodeId().getId()));
+ for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) {
+ AggregationDescriptor descriptor =
node.getAggregationDescriptorList().get(i);
+ boxValue.add(
+ String.format(
+ "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(),
descriptor.getStep()));
+ }
return render(node, boxValue, context);
}
@@ -172,6 +204,12 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
public List<String> visitRowBasedSeriesAggregate(AggregationNode node,
GraphContext context) {
List<String> boxValue = new ArrayList<>();
boxValue.add(String.format("Aggregation-%s",
node.getPlanNodeId().getId()));
+ for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) {
+ AggregationDescriptor descriptor =
node.getAggregationDescriptorList().get(i);
+ boxValue.add(
+ String.format(
+ "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(),
descriptor.getStep()));
+ }
return render(node, boxValue, context);
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index e47068a7ff..fabc041d97 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -151,7 +151,7 @@ public class AggregationDistributionTest {
.get(0);
aggregationNode
.getAggregationDescriptorList()
- .forEach(d -> Assert.assertEquals(AggregationStep.PARTIAL,
d.getStep()));
+ .forEach(d -> Assert.assertEquals(AggregationStep.INTERMEDIATE,
d.getStep()));
}
@Test