This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 20804df90e [IOTDB-3334] Fix mismatched data columns and headers bug in 
AlignByDevice query (#6104)
20804df90e is described below

commit 20804df90ebfdaeb2997c09acf12184ef34f1983
Author: liuminghui233 <[email protected]>
AuthorDate: Wed Jun 1 12:31:30 2022 +0800

    [IOTDB-3334] Fix mismatched data columns and headers bug in AlignByDevice 
query (#6104)
---
 .../apache/iotdb/db/metadata/utils/MetaUtils.java  |  22 +++
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |   8 +-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    | 177 +++++++++++++++++----
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  |  62 ++++++--
 .../db/mpp/plan/plan/QueryLogicalPlanUtil.java     |  14 +-
 5 files changed, 226 insertions(+), 57 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
index 96189b8551..1ca7540395 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
@@ -91,6 +91,28 @@ public class MetaUtils {
     return result;
   }
 
+  public static List<PartialPath> groupAlignedSeries(List<PartialPath> 
fullPaths) {
+    List<PartialPath> result = new ArrayList<>();
+    Map<String, AlignedPath> deviceToAlignedPathMap = new HashMap<>();
+    for (PartialPath path : fullPaths) {
+      MeasurementPath measurementPath = (MeasurementPath) path;
+      if (!measurementPath.isUnderAlignedEntity()) {
+        result.add(measurementPath);
+      } else {
+        String deviceName = measurementPath.getDevice();
+        if (!deviceToAlignedPathMap.containsKey(deviceName)) {
+          AlignedPath alignedPath = new AlignedPath(measurementPath);
+          deviceToAlignedPathMap.put(deviceName, alignedPath);
+        } else {
+          AlignedPath alignedPath = deviceToAlignedPathMap.get(deviceName);
+          alignedPath.addMeasurement(measurementPath);
+        }
+      }
+    }
+    result.addAll(deviceToAlignedPathMap.values());
+    return result;
+  }
+
   @TestOnly
   public static List<String> getMultiFullPaths(IMNode node) {
     if (node == null) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index cb63f3e607..8418241072 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -229,10 +229,10 @@ public class Analyzer {
                   .distinct()
                   .collect(Collectors.toList());
           for (String deviceName : deviceToMeasurementsMap.keySet()) {
-            List<String> measurementsUnderDeivce =
+            List<String> measurementsUnderDevice =
                 new ArrayList<>(deviceToMeasurementsMap.get(deviceName));
             List<Integer> indexes = new ArrayList<>();
-            for (String measurement : measurementsUnderDeivce) {
+            for (String measurement : measurementsUnderDevice) {
               indexes.add(
                   allMeasurements.indexOf(measurement) + 1); // add 1 to skip 
the device column
             }
@@ -249,8 +249,8 @@ public class Analyzer {
           Map<String, Set<Expression>> deviceToAggregationTransformExpressions 
= new HashMap<>();
           for (String deviceName : deviceToTransformExpressions.keySet()) {
             Set<Expression> transformExpressions = 
deviceToTransformExpressions.get(deviceName);
-            Set<Expression> aggregationExpressions = new HashSet<>();
-            Set<Expression> aggregationTransformExpressions = new HashSet<>();
+            Set<Expression> aggregationExpressions = new LinkedHashSet<>();
+            Set<Expression> aggregationTransformExpressions = new 
LinkedHashSet<>();
 
             boolean isHasRawDataInputAggregation = false;
             if (queryStatement.isAggregationQuery()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index d789e58892..ecb7b4e73c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -63,6 +63,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggreg
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
@@ -87,6 +88,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 
 public class LogicalPlanBuilder {
@@ -115,7 +117,7 @@ public class LogicalPlanBuilder {
         sourceExpressions.stream()
             .map(expression -> ((TimeSeriesOperand) expression).getPath())
             .collect(Collectors.toList());
-    List<PartialPath> groupedPaths = 
MetaUtils.groupAlignedPaths(selectedPaths);
+    List<PartialPath> groupedPaths = 
MetaUtils.groupAlignedSeries(selectedPaths);
     for (PartialPath path : groupedPaths) {
       if (path instanceof MeasurementPath) { // non-aligned series
         SeriesScanNode seriesScanNode =
@@ -160,46 +162,147 @@ public class LogicalPlanBuilder {
 
   public LogicalPlanBuilder planAggregationSource(
       Set<Expression> sourceExpressions,
+      AggregationStep curStep,
       OrderBy scanOrder,
       Filter timeFilter,
       GroupByTimeParameter groupByTimeParameter,
       Set<Expression> aggregationExpressions,
       Map<Expression, Set<Expression>> groupByLevelExpressions,
       TypeProvider typeProvider) {
-    AggregationStep curStep =
-        (groupByLevelExpressions != null
-                || (groupByTimeParameter != null && 
groupByTimeParameter.hasOverlap()))
-            ? AggregationStep.PARTIAL
-            : AggregationStep.SINGLE;
 
-    List<PlanNode> sourceNodeList = new ArrayList<>();
     boolean needCheckAscending = groupByTimeParameter == null;
     Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new 
HashMap<>();
     Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new 
HashMap<>();
     for (Expression sourceExpression : sourceExpressions) {
-      AggregationType aggregationFunction =
-          AggregationType.valueOf(
-              ((FunctionExpression) 
sourceExpression).getFunctionName().toUpperCase());
+      createAggregationDescriptor(
+          (FunctionExpression) sourceExpression,
+          curStep,
+          scanOrder,
+          needCheckAscending,
+          typeProvider,
+          ascendingAggregations,
+          descendingAggregations);
+    }
+
+    List<PlanNode> sourceNodeList =
+        constructSourceNodeFromAggregationDescriptors(
+            ascendingAggregations,
+            descendingAggregations,
+            scanOrder,
+            timeFilter,
+            groupByTimeParameter);
+
+    return convergeAggregationSource(
+        sourceNodeList,
+        curStep,
+        scanOrder,
+        groupByTimeParameter,
+        aggregationExpressions,
+        groupByLevelExpressions);
+  }
+
+  public LogicalPlanBuilder planAggregationSourceWithIndexAdjust(
+      Set<Expression> sourceExpressions,
+      AggregationStep curStep,
+      OrderBy scanOrder,
+      Filter timeFilter,
+      GroupByTimeParameter groupByTimeParameter,
+      Set<Expression> aggregationExpressions,
+      List<Integer> measurementIndexes,
+      Map<Expression, Set<Expression>> groupByLevelExpressions,
+      TypeProvider typeProvider) {
+    checkArgument(
+        sourceExpressions.size() == measurementIndexes.size(),
+        "Each aggregate should correspond to a column of output.");
+
+    boolean needCheckAscending = groupByTimeParameter == null;
+    Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new 
HashMap<>();
+    Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new 
HashMap<>();
+    Map<AggregationDescriptor, Integer> aggregationToMeasurementIndexMap = new 
HashMap<>();
+
+    int index = 0;
+    for (Expression sourceExpression : sourceExpressions) {
       AggregationDescriptor aggregationDescriptor =
-          new AggregationDescriptor(
-              aggregationFunction, curStep, sourceExpression.getExpressions());
-      if (curStep.isOutputPartial()) {
-        updateTypeProviderByPartialAggregation(aggregationDescriptor, 
typeProvider);
-      }
-      PartialPath selectPath =
-          ((TimeSeriesOperand) 
sourceExpression.getExpressions().get(0)).getPath();
-      if (!needCheckAscending
-          || SchemaUtils.isConsistentWithScanOrder(aggregationFunction, 
scanOrder)) {
-        ascendingAggregations
-            .computeIfAbsent(selectPath, key -> new ArrayList<>())
-            .add(aggregationDescriptor);
-      } else {
-        descendingAggregations
-            .computeIfAbsent(selectPath, key -> new ArrayList<>())
-            .add(aggregationDescriptor);
-      }
+          createAggregationDescriptor(
+              (FunctionExpression) sourceExpression,
+              curStep,
+              scanOrder,
+              needCheckAscending,
+              typeProvider,
+              ascendingAggregations,
+              descendingAggregations);
+      aggregationToMeasurementIndexMap.put(aggregationDescriptor, 
measurementIndexes.get(index));
+      index++;
+    }
+
+    List<PlanNode> sourceNodeList =
+        constructSourceNodeFromAggregationDescriptors(
+            ascendingAggregations,
+            descendingAggregations,
+            scanOrder,
+            timeFilter,
+            groupByTimeParameter);
+
+    if (!curStep.isOutputPartial()) {
+      // update measurementIndexes
+      measurementIndexes.clear();
+      measurementIndexes.addAll(
+          sourceNodeList.stream()
+              .map(
+                  planNode ->
+                      ((SeriesAggregationSourceNode) 
planNode).getAggregationDescriptorList())
+              .flatMap(List::stream)
+              .map(aggregationToMeasurementIndexMap::get)
+              .collect(Collectors.toList()));
+    }
+
+    return convergeAggregationSource(
+        sourceNodeList,
+        curStep,
+        scanOrder,
+        groupByTimeParameter,
+        aggregationExpressions,
+        groupByLevelExpressions);
+  }
+
+  private AggregationDescriptor createAggregationDescriptor(
+      FunctionExpression sourceExpression,
+      AggregationStep curStep,
+      OrderBy scanOrder,
+      boolean needCheckAscending,
+      TypeProvider typeProvider,
+      Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
+      Map<PartialPath, List<AggregationDescriptor>> descendingAggregations) {
+    AggregationType aggregationFunction =
+        
AggregationType.valueOf(sourceExpression.getFunctionName().toUpperCase());
+    AggregationDescriptor aggregationDescriptor =
+        new AggregationDescriptor(aggregationFunction, curStep, 
sourceExpression.getExpressions());
+    if (curStep.isOutputPartial()) {
+      updateTypeProviderByPartialAggregation(aggregationDescriptor, 
typeProvider);
+    }
+    PartialPath selectPath =
+        ((TimeSeriesOperand) 
sourceExpression.getExpressions().get(0)).getPath();
+    if (!needCheckAscending
+        || SchemaUtils.isConsistentWithScanOrder(aggregationFunction, 
scanOrder)) {
+      ascendingAggregations
+          .computeIfAbsent(selectPath, key -> new ArrayList<>())
+          .add(aggregationDescriptor);
+    } else {
+      descendingAggregations
+          .computeIfAbsent(selectPath, key -> new ArrayList<>())
+          .add(aggregationDescriptor);
     }
+    return aggregationDescriptor;
+  }
 
+  private List<PlanNode> constructSourceNodeFromAggregationDescriptors(
+      Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
+      Map<PartialPath, List<AggregationDescriptor>> descendingAggregations,
+      OrderBy scanOrder,
+      Filter timeFilter,
+      GroupByTimeParameter groupByTimeParameter) {
+    List<PlanNode> sourceNodeList = new ArrayList<>();
+    boolean needCheckAscending = groupByTimeParameter == null;
     Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations 
=
         MetaUtils.groupAlignedAggregations(ascendingAggregations);
     for (Map.Entry<PartialPath, List<AggregationDescriptor>> 
pathAggregationsEntry :
@@ -223,11 +326,20 @@ public class LogicalPlanBuilder {
                 pathAggregationsEntry.getKey(),
                 pathAggregationsEntry.getValue(),
                 scanOrder,
-                groupByTimeParameter,
+                null,
                 timeFilter));
       }
     }
+    return sourceNodeList;
+  }
 
+  private LogicalPlanBuilder convergeAggregationSource(
+      List<PlanNode> sourceNodeList,
+      AggregationStep curStep,
+      OrderBy scanOrder,
+      GroupByTimeParameter groupByTimeParameter,
+      Set<Expression> aggregationExpressions,
+      Map<Expression, Set<Expression>> groupByLevelExpressions) {
     if (curStep.isOutputPartial()) {
       if (groupByTimeParameter != null && groupByTimeParameter.hasOverlap()) {
         curStep =
@@ -264,6 +376,7 @@ public class LogicalPlanBuilder {
     } else {
       this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
     }
+
     return this;
   }
 
@@ -419,7 +532,7 @@ public class LogicalPlanBuilder {
         scanOrder);
   }
 
-  private PlanNode createAggregationScanNode(
+  private SeriesAggregationSourceNode createAggregationScanNode(
       PartialPath selectPath,
       List<AggregationDescriptor> aggregationDescriptorList,
       OrderBy scanOrder,
@@ -486,9 +599,9 @@ public class LogicalPlanBuilder {
   }
 
   public LogicalPlanBuilder planTransform(
-      Set<Expression> selectExpressions, boolean isGroupByTime, ZoneId zoneId) 
{
+      Set<Expression> transformExpressions, boolean isGroupByTime, ZoneId 
zoneId) {
     boolean needTransform = false;
-    for (Expression expression : selectExpressions) {
+    for (Expression expression : transformExpressions) {
       if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
         needTransform = true;
         break;
@@ -502,7 +615,7 @@ public class LogicalPlanBuilder {
         new TransformNode(
             context.getQueryId().genPlanNodeId(),
             this.getRoot(),
-            selectExpressions.toArray(new Expression[0]),
+            transformExpressions.toArray(new Expression[0]),
             isGroupByTime,
             zoneId);
     return this;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index b6fa90d6c2..f29a6b17bd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.planner;
 import org.apache.iotdb.db.metadata.utils.TimeseriesVersionUtil;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
@@ -141,6 +142,7 @@ public class LogicalPlanner {
                       analysis.getDeviceToQueryFilter() != null
                           ? analysis.getDeviceToQueryFilter().get(deviceName)
                           : null,
+                      
analysis.getDeviceToMeasurementIndexesMap().get(deviceName),
                       context));
           deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot());
         }
@@ -164,6 +166,7 @@ public class LogicalPlanner {
                     analysis.getAggregationTransformExpressions(),
                     analysis.getTransformExpressions(),
                     analysis.getQueryFilter(),
+                    null,
                     context));
       }
 
@@ -186,6 +189,7 @@ public class LogicalPlanner {
         Set<Expression> aggregationTransformExpressions,
         Set<Expression> transformExpressions,
         Expression queryFilter,
+        List<Integer> measurementIndexes, // only used in ALIGN BY DEVICE
         MPPQueryContext context) {
       LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
 
@@ -272,20 +276,50 @@ public class LogicalPlanner {
           }
         }
       } else {
-        planBuilder =
-            planBuilder
-                .planAggregationSource(
-                    sourceExpressions,
-                    queryStatement.getResultOrder(),
-                    analysis.getGlobalTimeFilter(),
-                    analysis.getGroupByTimeParameter(),
-                    aggregationExpressions,
-                    analysis.getGroupByLevelExpressions(),
-                    analysis.getTypeProvider())
-                .planTransform(
-                    transformExpressions,
-                    queryStatement.isGroupByTime(),
-                    queryStatement.getSelectComponent().getZoneId());
+        AggregationStep curStep =
+            (analysis.getGroupByLevelExpressions() != null
+                    || (analysis.getGroupByTimeParameter() != null
+                        && analysis.getGroupByTimeParameter().hasOverlap()))
+                ? AggregationStep.PARTIAL
+                : AggregationStep.SINGLE;
+
+        boolean needTransform = false;
+        for (Expression expression : transformExpressions) {
+          if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
+            needTransform = true;
+            break;
+          }
+        }
+
+        if (!needTransform && measurementIndexes != null) {
+          planBuilder =
+              planBuilder.planAggregationSourceWithIndexAdjust(
+                  sourceExpressions,
+                  curStep,
+                  queryStatement.getResultOrder(),
+                  analysis.getGlobalTimeFilter(),
+                  analysis.getGroupByTimeParameter(),
+                  aggregationExpressions,
+                  measurementIndexes,
+                  analysis.getGroupByLevelExpressions(),
+                  analysis.getTypeProvider());
+        } else {
+          planBuilder =
+              planBuilder
+                  .planAggregationSource(
+                      sourceExpressions,
+                      curStep,
+                      queryStatement.getResultOrder(),
+                      analysis.getGlobalTimeFilter(),
+                      analysis.getGroupByTimeParameter(),
+                      aggregationExpressions,
+                      analysis.getGroupByLevelExpressions(),
+                      analysis.getTypeProvider())
+                  .planTransform(
+                      transformExpressions,
+                      queryStatement.isGroupByTime(),
+                      queryStatement.getSelectComponent().getZoneId());
+        }
       }
 
       return planBuilder.getRoot();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index f38534b825..9eda86a8d0 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -152,11 +152,6 @@ public class QueryLogicalPlanUtil {
 
     QueryId queryId = new QueryId("test");
     List<PlanNode> sourceNodeList = new ArrayList<>();
-    sourceNodeList.add(
-        new AlignedSeriesScanNode(
-            queryId.genPlanNodeId(),
-            (AlignedPath) schemaMap.get("root.sg.d2.a"),
-            OrderBy.TIMESTAMP_ASC));
     sourceNodeList.add(
         new SeriesScanNode(
             queryId.genPlanNodeId(),
@@ -172,6 +167,11 @@ public class QueryLogicalPlanUtil {
             queryId.genPlanNodeId(),
             (MeasurementPath) schemaMap.get("root.sg.d2.s4"),
             OrderBy.TIMESTAMP_ASC));
+    sourceNodeList.add(
+        new AlignedSeriesScanNode(
+            queryId.genPlanNodeId(),
+            (AlignedPath) schemaMap.get("root.sg.d2.a"),
+            OrderBy.TIMESTAMP_ASC));
     TimeJoinNode timeJoinNode =
         new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, 
sourceNodeList);
     OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(), 
timeJoinNode, 10);
@@ -716,8 +716,8 @@ public class QueryLogicalPlanUtil {
         new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_DESC, 
sourceNodeList2);
 
     Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
-    deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2, 3));
-    deviceToMeasurementIndexesMap.put("root.sg.d2", Arrays.asList(1, 2, 3));
+    deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 3, 2));
+    deviceToMeasurementIndexesMap.put("root.sg.d2", Arrays.asList(1, 3, 2));
     DeviceViewNode deviceViewNode =
         new DeviceViewNode(
             queryId.genPlanNodeId(),

Reply via email to