This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fix_some_issues_0531
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c64c75561ae13d73d10e688917ce90e402fdec0c
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed Jun 1 17:17:27 2022 +0800

    fix some issues in MPP query prepared for representation
---
 .../apache/iotdb/db/metadata/path/AlignedPath.java |  5 ++++
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  2 +-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  2 +-
 .../plan/planner/distribution/SourceRewriter.java  | 26 ++++++++++++++----
 .../plan/planner/plan/node/PlanGraphPrinter.java   | 32 ++++++++++++++++++++++
 5 files changed, 60 insertions(+), 7 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index 4ef32ba2a6..6ca1bd5a2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -311,4 +311,9 @@ public class AlignedPath extends PartialPath {
     }
     return getDevicePath().concatNode(measurementList.get(0));
   }
+
+  @Override
+  public String toString() {
+    return String.format("%s%s", getDevice(), measurementList);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 96e9dfdc1c..1b30176d58 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -838,7 +838,7 @@ public class LocalExecutionPlanner {
                     context
                         .getTypeProvider()
                         // get the type of first inputExpression
-                        
.getType(descriptor.getInputExpressions().get(0).toString()),
+                        .getType(inputColumnNames.get(0)),
                     ascending),
                 descriptor.getStep(),
                 inputLocationList));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index ecb7b4e73c..a451623307 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -380,7 +380,7 @@ public class LogicalPlanBuilder {
     return this;
   }
 
-  private void updateTypeProviderByPartialAggregation(
+  public static void updateTypeProviderByPartialAggregation(
       AggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) {
     List<AggregationType> splitAggregations =
         
SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 6c20206302..41cda1a735 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanBuilder;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanNodeRewriter;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
@@ -309,7 +310,10 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
                       AggregationStep.PARTIAL,
                       descriptor.getInputExpressions()));
             });
-
+    leafAggDescriptorList.forEach(
+        d ->
+            LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+                d, analysis.getTypeProvider()));
     List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
     node.getAggregationDescriptorList()
         .forEach(
@@ -482,10 +486,14 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
                 rootAggDescriptorList.add(
                     new AggregationDescriptor(
                         descriptor.getAggregationType(),
-                        context.isRoot ? AggregationStep.FINAL : 
AggregationStep.PARTIAL,
+                        context.isRoot ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE,
                         descriptor.getInputExpressions()));
               });
     }
+    rootAggDescriptorList.forEach(
+        d ->
+            LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+                d, analysis.getTypeProvider()));
     checkArgument(
         sources.size() > 0, "Aggregation sources should not be empty when 
distribution planning");
     SeriesAggregationSourceNode seed = sources.get(0);
@@ -629,6 +637,8 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
         }
         if (keep) {
           descriptorList.add(originalDescriptor);
+          LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+              originalDescriptor, analysis.getTypeProvider());
         }
       }
       handle.setAggregationDescriptorList(descriptorList);
@@ -657,10 +667,11 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
           continue;
         }
         GroupByLevelDescriptor descriptor = originalDescriptor.deepClone();
-        descriptor.setStep(level == 0 ? AggregationStep.FINAL : 
AggregationStep.PARTIAL);
+        descriptor.setStep(level == 0 ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE);
         descriptor.setInputExpressions(descriptorExpression);
-
         descriptorList.add(descriptor);
+        LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+            descriptor, analysis.getTypeProvider());
       }
       handle.setGroupByLevelDescriptors(descriptorList);
     }
@@ -705,7 +716,12 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
       boolean isFinal = false;
       source
           .getAggregationDescriptorList()
-          .forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL : 
AggregationStep.PARTIAL));
+          .forEach(
+              d -> {
+                d.setStep(isFinal ? AggregationStep.FINAL : 
AggregationStep.PARTIAL);
+                LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+                    d, analysis.getTypeProvider());
+              });
     }
     return sources;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
index f6321b7ea5..657cc121c7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggreg
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 
 import org.apache.commons.lang3.Validate;
 
@@ -92,6 +93,12 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("SeriesAggregationScan-%s", 
node.getPlanNodeId().getId()));
     boxValue.add(String.format("Series: %s", node.getSeriesPath()));
+    for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) {
+      AggregationDescriptor descriptor = 
node.getAggregationDescriptorList().get(i);
+      boxValue.add(
+          String.format(
+              "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), 
descriptor.getStep()));
+    }
     boxValue.add(String.format("PartitionId: %s", 
node.getRegionReplicaSet().getRegionId().id));
     return render(node, boxValue, context);
   }
@@ -102,6 +109,12 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("AlignedSeriesAggregationScan-%s", 
node.getPlanNodeId().getId()));
     boxValue.add(String.format("Series: %s", node.getAlignedPath()));
+    for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) {
+      AggregationDescriptor descriptor = 
node.getAggregationDescriptorList().get(i);
+      boxValue.add(
+          String.format(
+              "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), 
descriptor.getStep()));
+    }
     boxValue.add(String.format("PartitionId: %s", 
node.getRegionReplicaSet().getRegionId().id));
     return render(node, boxValue, context);
   }
@@ -149,6 +162,13 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
   public List<String> visitGroupByLevel(GroupByLevelNode node, GraphContext 
context) {
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("GroupByLevel-%s", 
node.getPlanNodeId().getId()));
+    for (int i = 0; i < node.getGroupByLevelDescriptors().size(); i++) {
+      AggregationDescriptor descriptor = 
node.getGroupByLevelDescriptors().get(i);
+      boxValue.add(
+          String.format(
+              "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), 
descriptor.getStep()));
+      boxValue.add(String.format("Output-%d: %s", i, 
descriptor.getOutputColumnNames()));
+    }
     return render(node, boxValue, context);
   }
 
@@ -157,6 +177,12 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
       SlidingWindowAggregationNode node, GraphContext context) {
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("SlidingWindowAggregation-%s", 
node.getPlanNodeId().getId()));
+    for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) {
+      AggregationDescriptor descriptor = 
node.getAggregationDescriptorList().get(i);
+      boxValue.add(
+          String.format(
+              "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), 
descriptor.getStep()));
+    }
     return render(node, boxValue, context);
   }
 
@@ -172,6 +198,12 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
   public List<String> visitRowBasedSeriesAggregate(AggregationNode node, 
GraphContext context) {
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("Aggregation-%s", 
node.getPlanNodeId().getId()));
+    for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) {
+      AggregationDescriptor descriptor = 
node.getAggregationDescriptorList().get(i);
+      boxValue.add(
+          String.format(
+              "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), 
descriptor.getStep()));
+    }
     return render(node, boxValue, context);
   }
 

Reply via email to