This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fab5c3597 Fix some issues in MPP query prepared for representation 
(#6130)
6fab5c3597 is described below

commit 6fab5c3597ea845363caee17a300a3374a63ae1d
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Thu Jun 2 11:20:08 2022 +0800

    Fix some issues in MPP query prepared for representation (#6130)
---
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  2 +-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  2 +-
 .../plan/planner/distribution/SourceRewriter.java  | 26 +++++++++++---
 .../plan/planner/plan/node/PlanGraphPrinter.java   | 42 ++++++++++++++++++++--
 .../distribution/AggregationDistributionTest.java  |  2 +-
 5 files changed, 64 insertions(+), 10 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 96e9dfdc1c..1b30176d58 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -838,7 +838,7 @@ public class LocalExecutionPlanner {
                     context
                         .getTypeProvider()
                         // get the type of first inputExpression
-                        
.getType(descriptor.getInputExpressions().get(0).toString()),
+                        .getType(inputColumnNames.get(0)),
                     ascending),
                 descriptor.getStep(),
                 inputLocationList));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index ecb7b4e73c..a451623307 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -380,7 +380,7 @@ public class LogicalPlanBuilder {
     return this;
   }
 
-  private void updateTypeProviderByPartialAggregation(
+  public static void updateTypeProviderByPartialAggregation(
       AggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) {
     List<AggregationType> splitAggregations =
         
SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 6c20206302..41cda1a735 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanBuilder;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanNodeRewriter;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
@@ -309,7 +310,10 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
                       AggregationStep.PARTIAL,
                       descriptor.getInputExpressions()));
             });
-
+    leafAggDescriptorList.forEach(
+        d ->
+            LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+                d, analysis.getTypeProvider()));
     List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
     node.getAggregationDescriptorList()
         .forEach(
@@ -482,10 +486,14 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
                 rootAggDescriptorList.add(
                     new AggregationDescriptor(
                         descriptor.getAggregationType(),
-                        context.isRoot ? AggregationStep.FINAL : 
AggregationStep.PARTIAL,
+                        context.isRoot ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE,
                         descriptor.getInputExpressions()));
               });
     }
+    rootAggDescriptorList.forEach(
+        d ->
+            LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+                d, analysis.getTypeProvider()));
     checkArgument(
         sources.size() > 0, "Aggregation sources should not be empty when 
distribution planning");
     SeriesAggregationSourceNode seed = sources.get(0);
@@ -629,6 +637,8 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
         }
         if (keep) {
           descriptorList.add(originalDescriptor);
+          LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+              originalDescriptor, analysis.getTypeProvider());
         }
       }
       handle.setAggregationDescriptorList(descriptorList);
@@ -657,10 +667,11 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
           continue;
         }
         GroupByLevelDescriptor descriptor = originalDescriptor.deepClone();
-        descriptor.setStep(level == 0 ? AggregationStep.FINAL : 
AggregationStep.PARTIAL);
+        descriptor.setStep(level == 0 ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE);
         descriptor.setInputExpressions(descriptorExpression);
-
         descriptorList.add(descriptor);
+        LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+            descriptor, analysis.getTypeProvider());
       }
       handle.setGroupByLevelDescriptors(descriptorList);
     }
@@ -705,7 +716,12 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
       boolean isFinal = false;
       source
           .getAggregationDescriptorList()
-          .forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL : 
AggregationStep.PARTIAL));
+          .forEach(
+              d -> {
+                d.setStep(isFinal ? AggregationStep.FINAL : 
AggregationStep.PARTIAL);
+                LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+                    d, analysis.getTypeProvider());
+              });
     }
     return sources;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
index f6321b7ea5..4e3ce79d52 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggreg
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 
 import org.apache.commons.lang3.Validate;
 
@@ -81,7 +82,10 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
   public List<String> visitAlignedSeriesScan(AlignedSeriesScanNode node, 
GraphContext context) {
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("AlignedSeriesScan-%s", 
node.getPlanNodeId().getId()));
-    boxValue.add(String.format("Series: %s", node.getAlignedPath()));
+    boxValue.add(
+        String.format(
+            "Series: %s%s",
+            node.getAlignedPath().getDevice(), 
node.getAlignedPath().getMeasurementList()));
     boxValue.add(String.format("PartitionId: %s", 
node.getRegionReplicaSet().getRegionId().id));
     return render(node, boxValue, context);
   }
@@ -92,6 +96,12 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("SeriesAggregationScan-%s", 
node.getPlanNodeId().getId()));
     boxValue.add(String.format("Series: %s", node.getSeriesPath()));
+    for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) {
+      AggregationDescriptor descriptor = 
node.getAggregationDescriptorList().get(i);
+      boxValue.add(
+          String.format(
+              "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), 
descriptor.getStep()));
+    }
     boxValue.add(String.format("PartitionId: %s", 
node.getRegionReplicaSet().getRegionId().id));
     return render(node, boxValue, context);
   }
@@ -101,7 +111,16 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
       AlignedSeriesAggregationScanNode node, GraphContext context) {
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("AlignedSeriesAggregationScan-%s", 
node.getPlanNodeId().getId()));
-    boxValue.add(String.format("Series: %s", node.getAlignedPath()));
+    boxValue.add(
+        String.format(
+            "Series: %s%s",
+            node.getAlignedPath().getDevice(), 
node.getAlignedPath().getMeasurementList()));
+    for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) {
+      AggregationDescriptor descriptor = 
node.getAggregationDescriptorList().get(i);
+      boxValue.add(
+          String.format(
+              "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), 
descriptor.getStep()));
+    }
     boxValue.add(String.format("PartitionId: %s", 
node.getRegionReplicaSet().getRegionId().id));
     return render(node, boxValue, context);
   }
@@ -149,6 +168,13 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
   public List<String> visitGroupByLevel(GroupByLevelNode node, GraphContext 
context) {
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("GroupByLevel-%s", 
node.getPlanNodeId().getId()));
+    for (int i = 0; i < node.getGroupByLevelDescriptors().size(); i++) {
+      AggregationDescriptor descriptor = 
node.getGroupByLevelDescriptors().get(i);
+      boxValue.add(
+          String.format(
+              "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), 
descriptor.getStep()));
+      boxValue.add(String.format("Output-%d: %s", i, 
descriptor.getOutputColumnNames()));
+    }
     return render(node, boxValue, context);
   }
 
@@ -157,6 +183,12 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
       SlidingWindowAggregationNode node, GraphContext context) {
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("SlidingWindowAggregation-%s", 
node.getPlanNodeId().getId()));
+    for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) {
+      AggregationDescriptor descriptor = 
node.getAggregationDescriptorList().get(i);
+      boxValue.add(
+          String.format(
+              "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), 
descriptor.getStep()));
+    }
     return render(node, boxValue, context);
   }
 
@@ -172,6 +204,12 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
   public List<String> visitRowBasedSeriesAggregate(AggregationNode node, 
GraphContext context) {
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("Aggregation-%s", 
node.getPlanNodeId().getId()));
+    for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) {
+      AggregationDescriptor descriptor = 
node.getAggregationDescriptorList().get(i);
+      boxValue.add(
+          String.format(
+              "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), 
descriptor.getStep()));
+    }
     return render(node, boxValue, context);
   }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index e47068a7ff..fabc041d97 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -151,7 +151,7 @@ public class AggregationDistributionTest {
                 .get(0);
     aggregationNode
         .getAggregationDescriptorList()
-        .forEach(d -> Assert.assertEquals(AggregationStep.PARTIAL, 
d.getStep()));
+        .forEach(d -> Assert.assertEquals(AggregationStep.INTERMEDIATE, 
d.getStep()));
   }
 
   @Test

Reply via email to