This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch tsbs/iot
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/tsbs/iot by this push:
     new 456f6110290 Add aggregation template align by device optimization
456f6110290 is described below

commit 456f6110290d364a0d39d1ed46c0edbd00f9f4a5
Author: Beyyes <[email protected]>
AuthorDate: Wed May 15 10:14:06 2024 +0800

    Add aggregation template align by device optimization
---
 .../db/queryengine/plan/analyze/Analysis.java      |  16 +-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |   8 +-
 .../plan/analyze/ExpressionTypeAnalyzer.java       |  25 ++-
 .../plan/analyze/TemplatedAggregationAnalyze.java  | 229 ++++++++++++++++++++
 .../queryengine/plan/analyze/TemplatedAnalyze.java | 148 ++++++++-----
 .../db/queryengine/plan/analyze/TemplatedInfo.java |  99 +++++++--
 .../plan/optimization/AggregationPushDown.java     | 132 +++++++++++-
 .../plan/optimization/PredicatePushDown.java       |  16 +-
 .../plan/planner/LogicalPlanBuilder.java           |   7 +-
 .../plan/planner/LogicalPlanVisitor.java           |   2 +-
 .../plan/planner/OperatorTreeGenerator.java        | 114 +++++++---
 .../plan/planner/SubPlanTypeExtractor.java         |   9 +
 .../plan/planner/TemplatedLogicalPlan.java         | 240 ++++++++++++++++++++-
 .../plan/planner/TemplatedLogicalPlanBuilder.java  |  54 ++++-
 .../plan/planner/plan/PlanFragment.java            |   6 +-
 .../plan/planner/plan/node/PlanGraphPrinter.java   |   2 +-
 .../plan/planner/plan/node/PlanNodeType.java       |   8 +-
 .../planner/plan/node/process/DeviceViewNode.java  |  47 ++++
 .../plan/planner/plan/node/process/FilterNode.java |  59 ++++-
 .../plan/node/process/RawDataAggregationNode.java  |  20 ++
 .../plan/node/process/SingleDeviceViewNode.java    |   2 +-
 .../source/AlignedSeriesAggregationScanNode.java   |  36 ++++
 .../plan/parameter/AggregationDescriptor.java      |   2 +-
 .../plan/parameter/GroupByTimeParameter.java       |   4 +
 .../plan/optimization/TestPlanBuilder.java         |   3 +-
 .../logical/DataQueryLogicalPlannerTest.java       |  18 +-
 .../planner/node/process/FilterNodeSerdeTest.java  |   3 +-
 27 files changed, 1148 insertions(+), 161 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index be5c98525bb..5897f78b0dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -293,7 +293,7 @@ public class Analysis implements IAnalysis {
   private Template deviceTemplate;
   // when deviceTemplate is not empty and all expressions in this query are 
templated measurements,
   // i.e. no aggregation and arithmetic expression
-  private boolean onlyQueryTemplateMeasurements = true;
+  private boolean noWhereAndAggregation = true;
   // if it is wildcard query in templated align by device query
   private boolean templateWildCardQuery;
   // all queried measurementList and schemaList in deviceTemplate.
@@ -437,8 +437,8 @@ public class Analysis implements IAnalysis {
       return null;
     }
 
-    if (isAllDevicesInOneTemplate()
-        && (isOnlyQueryTemplateMeasurements() || expression instanceof 
TimeSeriesOperand)) {
+    if (allDevicesInOneTemplate()
+        && (noWhereAndAggregation() || expression instanceof 
TimeSeriesOperand)) {
       TimeSeriesOperand seriesOperand = (TimeSeriesOperand) expression;
       return 
deviceTemplate.getSchemaMap().get(seriesOperand.getPath().getMeasurement()).getType();
     }
@@ -921,7 +921,7 @@ public class Analysis implements IAnalysis {
   // All Queries Devices Set In One Template
   
/////////////////////////////////////////////////////////////////////////////////////////////////
 
-  public boolean isAllDevicesInOneTemplate() {
+  public boolean allDevicesInOneTemplate() {
     return this.deviceTemplate != null;
   }
 
@@ -933,12 +933,12 @@ public class Analysis implements IAnalysis {
     this.deviceTemplate = template;
   }
 
-  public boolean isOnlyQueryTemplateMeasurements() {
-    return onlyQueryTemplateMeasurements;
+  public boolean noWhereAndAggregation() {
+    return noWhereAndAggregation;
   }
 
-  public void setOnlyQueryTemplateMeasurements(boolean 
onlyQueryTemplateMeasurements) {
-    this.onlyQueryTemplateMeasurements = onlyQueryTemplateMeasurements;
+  public void setNoWhereAndAggregation(boolean value) {
+    this.noWhereAndAggregation = value;
   }
 
   public List<String> getMeasurementList() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index a7448a93529..39a377303c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -1851,7 +1851,8 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
           && rightExpression instanceof ConstantOperand)) {
         throw new SemanticException(
             String.format(
-                "Please check the keep condition ([%s]),it need to be a 
constant or a compare expression constructed by 'keep' and a long number.",
+                "Please check the keep condition ([%s]), "
+                    + "it need to be a constant or a compare expression 
constructed by 'keep' and a long number.",
                 keepExpression.getExpressionString()));
       }
       return;
@@ -1859,12 +1860,13 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     if (!(keepExpression instanceof ConstantOperand)) {
       throw new SemanticException(
           String.format(
-              "Please check the keep condition ([%s]),it need to be a constant 
or a compare expression constructed by 'keep' and a long number.",
+              "Please check the keep condition ([%s]), "
+                  + "it need to be a constant or a compare expression 
constructed by 'keep' and a long number.",
               keepExpression.getExpressionString()));
     }
   }
 
-  private void analyzeGroupByTime(Analysis analysis, QueryStatement 
queryStatement) {
+  static void analyzeGroupByTime(Analysis analysis, QueryStatement 
queryStatement) {
     if (!queryStatement.isGroupByTime()) {
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
index 49904d6532c..6868b612274 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.analyze;
 
+import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.NodeRef;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
@@ -65,7 +66,10 @@ public class ExpressionTypeAnalyzer {
   public static TSDataType analyzeExpression(Analysis analysis, Expression 
expression) {
     if (!analysis.getExpressionTypes().containsKey(NodeRef.of(expression))) {
       ExpressionTypeAnalyzer analyzer = new ExpressionTypeAnalyzer();
-      analyzer.analyze(expression, null);
+
+      Map<String, IMeasurementSchema> context =
+          analysis.allDevicesInOneTemplate() ? 
analysis.getDeviceTemplate().getSchemaMap() : null;
+      analyzer.analyze(expression, context);
 
       addExpressionTypes(analysis, analyzer);
     }
@@ -96,7 +100,16 @@ public class ExpressionTypeAnalyzer {
       Expression expression,
       TemplatedInfo templatedInfo) {
     ExpressionTypeAnalyzer analyzer = new ExpressionTypeAnalyzer();
-    analyzer.analyze(expression, templatedInfo.getSchemaMap());
+
+    Map<String, IMeasurementSchema> schemaMap = templatedInfo.getSchemaMap();
+    if (schemaMap == null) {
+      schemaMap = new LinkedHashMap<>();
+      for (int i = 0; i < templatedInfo.getMeasurementList().size(); i++) {
+        schemaMap.put(
+            templatedInfo.getMeasurementList().get(i), 
templatedInfo.getSchemaList().get(i));
+      }
+    }
+    analyzer.analyze(expression, schemaMap);
 
     types.putAll(analyzer.getExpressionTypes());
   }
@@ -346,6 +359,14 @@ public class ExpressionTypeAnalyzer {
         return setExpressionType(
             timeSeriesOperand, 
context.get(timeSeriesOperand.getOutputSymbol()).getType());
       }
+
+      if (context != null
+          && !(timeSeriesOperand.getPath() instanceof MeasurementPath)
+          && context.containsKey(timeSeriesOperand.getPath().getFullPath())) {
+        return setExpressionType(
+            timeSeriesOperand, 
context.get(timeSeriesOperand.getPath().getFullPath()).getType());
+      }
+
       return setExpressionType(timeSeriesOperand, 
timeSeriesOperand.getPath().getSeriesType());
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java
new file mode 100644
index 00000000000..cf19a791482
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.analyze;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
+import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.schemaengine.template.Template;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.DEVICE_EXPRESSION;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.END_TIME_EXPRESSION;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeExpressionType;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeGroupByTime;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeOutput;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.normalizeExpression;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchAggregationExpressions;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDataPartition;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceToWhere;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceViewInput;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceViewOutput;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeFrom;
+import static 
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetInGroupByTimeForDevice;
+import static 
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice;
+
+/** Methods in this class are used for aggregation, templated with align by 
device situation. */
+public class TemplatedAggregationAnalyze {
+
+  static boolean canBuildAggregationPlanUseTemplate(
+      Analysis analysis,
+      QueryStatement queryStatement,
+      IPartitionFetcher partitionFetcher,
+      ISchemaTree schemaTree,
+      MPPQueryContext context,
+      Template template) {
+
+    analysis.setNoWhereAndAggregation(false);
+
+    List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
+
+    if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) {
+      // remove the device which won't appear in resultSet after limit/offset
+      deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList, 
queryStatement);
+    }
+
+    List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
+    boolean valid = analyzeSelect(queryStatement, analysis, outputExpressions, 
template);
+    if (!valid) {
+      return false;
+    }
+
+    analyzeDeviceToWhere(analysis, queryStatement);
+    if (deviceList.isEmpty()) {
+      analysis.setFinishQueryAfterAnalyze(true);
+      return true;
+    }
+    analysis.setDeviceList(deviceList);
+
+    if (analysis.getWhereExpression() != null
+        && ConstantOperand.FALSE.equals(analysis.getWhereExpression())) {
+      analyzeOutput(analysis, queryStatement, outputExpressions);
+      analysis.setFinishQueryAfterAnalyze(true);
+      return true;
+    }
+
+    analyzeHaving(analysis, queryStatement, schemaTree, deviceList);
+
+    analyzeDeviceToAggregation(analysis);
+    analyzeDeviceToSourceTransform(analysis);
+    analyzeDeviceToSource(analysis);
+
+    analyzeDeviceViewOutput(analysis, queryStatement);
+    analyzeDeviceViewInput(analysis, queryStatement);
+
+    // generate result set header according to output expressions
+    analyzeOutput(analysis, queryStatement, outputExpressions);
+
+    analyzeGroupByTime(analysis, queryStatement);
+    context.generateGlobalTimeFilter(analysis);
+
+    // fetch partition information
+    analyzeDataPartition(analysis, schemaTree, partitionFetcher, 
context.getGlobalTimeFilter());
+    return true;
+  }
+
+  private static boolean analyzeSelect(
+      QueryStatement queryStatement,
+      Analysis analysis,
+      List<Pair<Expression, String>> outputExpressions,
+      Template template) {
+
+    LinkedHashSet<Expression> selectExpressions = new LinkedHashSet<>();
+    selectExpressions.add(DEVICE_EXPRESSION);
+    if (queryStatement.isOutputEndTime()) {
+      selectExpressions.add(END_TIME_EXPRESSION);
+    }
+
+    ColumnPaginationController paginationController =
+        new ColumnPaginationController(
+            queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset());
+
+    Set<Expression> aggregationExpressions = new LinkedHashSet<>();
+    for (ResultColumn resultColumn : 
queryStatement.getSelectComponent().getResultColumns()) {
+      if (paginationController.hasCurOffset()) {
+        paginationController.consumeOffset();
+      } else if (paginationController.hasCurLimit()) {
+        Expression selectExpression = resultColumn.getExpression();
+        outputExpressions.add(new Pair<>(selectExpression, 
resultColumn.getAlias()));
+        selectExpressions.add(selectExpression);
+        aggregationExpressions.add(selectExpression);
+      } else {
+        break;
+      }
+    }
+
+    analysis.setDeviceTemplate(template);
+    List<String> measurementList = new ArrayList<>();
+    List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
+    Set<String> measurementSet = new HashSet<>();
+    for (Expression selectExpression : selectExpressions) {
+      if ("device".equalsIgnoreCase(selectExpression.getOutputSymbol())) {
+        continue;
+      }
+
+      String measurement = 
selectExpression.getExpressions().get(0).getOutputSymbol();
+      if (!template.getSchemaMap().containsKey(measurement)) {
+        analysis.setDeviceTemplate(null);
+        // TODO not support agg(*), agg(s1+1), count_time(*) now
+        return false;
+      }
+
+      // for agg1(s1) + agg2(s1), only record s1 for one time
+      if (!measurementSet.contains(measurement)) {
+        measurementSet.add(measurement);
+        measurementList.add(measurement);
+        measurementSchemaList.add(template.getSchemaMap().get(measurement));
+      }
+
+      analyzeExpressionType(analysis, selectExpression);
+    }
+
+    analysis.setMeasurementList(measurementList);
+    analysis.setMeasurementSchemaList(measurementSchemaList);
+    analysis.setAggregationExpressions(aggregationExpressions);
+    analysis.setOutputExpressions(outputExpressions);
+    analysis.setSelectExpressions(selectExpressions);
+    return true;
+  }
+
+  private static void analyzeHaving(
+      Analysis analysis,
+      QueryStatement queryStatement,
+      ISchemaTree schemaTree,
+      List<PartialPath> deviceSet) {
+    if (!queryStatement.hasHaving()) {
+      return;
+    }
+
+    // TODO not support having count(s1) + sum(s2) expression
+    Set<Expression> aggregationExpressions = 
analysis.getAggregationExpressions();
+
+    Expression havingExpression = 
queryStatement.getHavingCondition().getPredicate();
+
+    // Set<Expression> normalizedAggregationExpressions = new 
LinkedHashSet<>();
+    for (Expression aggregationExpression : 
searchAggregationExpressions(havingExpression)) {
+      Expression normalizedAggregationExpression = 
normalizeExpression(aggregationExpression);
+
+      analyzeExpressionType(analysis, aggregationExpression);
+      analyzeExpressionType(analysis, normalizedAggregationExpression);
+
+      aggregationExpressions.add(aggregationExpression);
+      // normalizedAggregationExpressions.add(normalizedAggregationExpression);
+    }
+
+    TSDataType outputType = analyzeExpressionType(analysis, havingExpression);
+    if (outputType != TSDataType.BOOLEAN) {
+      throw new SemanticException(
+          String.format(
+              "The output type of the expression in HAVING clause should be 
BOOLEAN, actual data type: %s.",
+              outputType));
+    }
+    analysis.setHavingExpression(havingExpression);
+  }
+
+  private static void analyzeDeviceToSourceTransform(Analysis analysis) {
+    // TODO add having into SourceTransform
+    
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());
+  }
+
+  private static void analyzeDeviceToSource(Analysis analysis) {
+    
analysis.setDeviceToSourceExpressions(analysis.getDeviceToSelectExpressions());
+    
analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions());
+  }
+
+  private static void analyzeDeviceToAggregation(Analysis analysis) {
+    // TODO need add having clause?
+    
analysis.setDeviceToAggregationExpressions(analysis.getDeviceToSelectExpressions());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
index 856476eae24..bc14b2c55cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
@@ -70,7 +70,9 @@ import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyz
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTimePartitionSlotList;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchAggregationExpressions;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpressionForTemplatedQuery;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAggregationAnalyze.canBuildAggregationPlanUseTemplate;
 
 /**
  * This class provides accelerated implementation for multiple devices align 
by device query. This
@@ -96,9 +98,7 @@ public class TemplatedAnalyze {
       IPartitionFetcher partitionFetcher,
       ISchemaTree schemaTree,
       MPPQueryContext context) {
-    if (queryStatement.isAggregationQuery()
-        || queryStatement.isGroupBy()
-        || queryStatement.isGroupByTime()
+    if (queryStatement.getGroupByComponent() != null
         || queryStatement.isSelectInto()
         || queryStatement.hasFill()
         || schemaTree.hasNormalTimeSeries()) {
@@ -106,58 +106,61 @@ public class TemplatedAnalyze {
     }
 
     List<Template> templates = schemaTree.getUsingTemplates();
-    if (templates.size() != 1) {
+    if (templates.size() != 1 || templates.get(0) == null) {
       return false;
     }
 
     Template template = templates.get(0);
 
+    if (queryStatement.isAggregationQuery()) {
+      return canBuildAggregationPlanUseTemplate(
+          analysis, queryStatement, partitionFetcher, schemaTree, context, 
template);
+    }
+
     List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
     ColumnPaginationController paginationController =
         new ColumnPaginationController(
             queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset());
-    if (template != null) {
-      for (ResultColumn resultColumn : 
queryStatement.getSelectComponent().getResultColumns()) {
-        Expression expression = resultColumn.getExpression();
-        if ("*".equals(expression.getOutputSymbol())) {
-          for (Map.Entry<String, IMeasurementSchema> entry : 
template.getSchemaMap().entrySet()) {
-            if (paginationController.hasCurOffset()) {
-              paginationController.consumeOffset();
-            } else if (paginationController.hasCurLimit()) {
-              String measurementName = entry.getKey();
-              IMeasurementSchema measurementSchema = entry.getValue();
-              TimeSeriesOperand measurementPath =
-                  new TimeSeriesOperand(
-                      new MeasurementPath(new String[] {measurementName}, 
measurementSchema));
-              outputExpressions.add(new Pair<>(measurementPath, null));
-              paginationController.consumeLimit();
-            } else {
-              break;
-            }
-          }
-          if (queryStatement.getSelectComponent().getResultColumns().size() == 
1
-              && queryStatement.getSeriesOffset() == 0
-              && queryStatement.getSeriesLimit() == 0) {
-            analysis.setTemplateWildCardQuery();
+    for (ResultColumn resultColumn : 
queryStatement.getSelectComponent().getResultColumns()) {
+      Expression expression = resultColumn.getExpression();
+      if ("*".equals(expression.getOutputSymbol())) {
+        for (Map.Entry<String, IMeasurementSchema> entry : 
template.getSchemaMap().entrySet()) {
+          if (paginationController.hasCurOffset()) {
+            paginationController.consumeOffset();
+          } else if (paginationController.hasCurLimit()) {
+            String measurementName = entry.getKey();
+            IMeasurementSchema measurementSchema = entry.getValue();
+            TimeSeriesOperand measurementPath =
+                new TimeSeriesOperand(
+                    new MeasurementPath(new String[] {measurementName}, 
measurementSchema));
+            outputExpressions.add(new Pair<>(measurementPath, null));
+            paginationController.consumeLimit();
+          } else {
+            break;
           }
-        } else if (expression instanceof TimeSeriesOperand) {
-          String measurementName = ((TimeSeriesOperand) 
expression).getPath().getMeasurement();
-          if (template.getSchemaMap().containsKey(measurementName)) {
-            if (paginationController.hasCurOffset()) {
-              paginationController.consumeOffset();
-            } else if (paginationController.hasCurLimit()) {
-              IMeasurementSchema measurementSchema = 
template.getSchemaMap().get(measurementName);
-              TimeSeriesOperand measurementPath =
-                  new TimeSeriesOperand(
-                      new MeasurementPath(new String[] {measurementName}, 
measurementSchema));
-              outputExpressions.add(new Pair<>(measurementPath, 
resultColumn.getAlias()));
-            } else {
-              break;
-            }
+        }
+        if (queryStatement.getSelectComponent().getResultColumns().size() == 1
+            && queryStatement.getSeriesOffset() == 0
+            && queryStatement.getSeriesLimit() == 0) {
+          analysis.setTemplateWildCardQuery();
+        }
+      } else if (expression instanceof TimeSeriesOperand) {
+        String measurementName = ((TimeSeriesOperand) 
expression).getPath().getMeasurement();
+        if (template.getSchemaMap().containsKey(measurementName)) {
+          if (paginationController.hasCurOffset()) {
+            paginationController.consumeOffset();
+          } else if (paginationController.hasCurLimit()) {
+            IMeasurementSchema measurementSchema = 
template.getSchemaMap().get(measurementName);
+            TimeSeriesOperand measurementPath =
+                new TimeSeriesOperand(
+                    new MeasurementPath(new String[] {measurementName}, 
measurementSchema));
+            outputExpressions.add(new Pair<>(measurementPath, 
resultColumn.getAlias()));
+          } else {
+            break;
           }
-        } else {
-          return false;
         }
+      } else {
+        return false;
       }
     }
 
@@ -188,7 +191,7 @@ public class TemplatedAnalyze {
     analyzeDeviceToSource(analysis);
 
     analyzeDeviceViewOutput(analysis, queryStatement);
-    analyzeDeviceViewInput(analysis);
+    analyzeDeviceViewInput(analysis, queryStatement);
 
     analyzeFill(analysis, queryStatement);
 
@@ -228,8 +231,7 @@ public class TemplatedAnalyze {
     analysis.setMeasurementSchemaList(measurementSchemaList);
   }
 
-  private static List<PartialPath> analyzeFrom(
-      QueryStatement queryStatement, ISchemaTree schemaTree) {
+  static List<PartialPath> analyzeFrom(QueryStatement queryStatement, 
ISchemaTree schemaTree) {
     // device path patterns in FROM clause
     List<PartialPath> devicePatternList = 
queryStatement.getFromComponent().getPrefixPaths();
 
@@ -246,12 +248,12 @@ public class TemplatedAnalyze {
         : 
deviceSet.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
   }
 
-  private static void analyzeDeviceToWhere(Analysis analysis, QueryStatement 
queryStatement) {
+  static void analyzeDeviceToWhere(Analysis analysis, QueryStatement 
queryStatement) {
     if (!queryStatement.hasWhere()) {
       return;
     }
 
-    analysis.setOnlyQueryTemplateMeasurements(false);
+    analysis.setNoWhereAndAggregation(false);
     Expression wherePredicate =
         new TemplatedConcatRemoveUnExistentMeasurementVisitor()
             .process(
@@ -325,25 +327,55 @@ public class TemplatedAnalyze {
     
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());
   }
 
-  private static void analyzeDeviceViewOutput(Analysis analysis, 
QueryStatement queryStatement) {
+  static void analyzeDeviceViewOutput(Analysis analysis, QueryStatement 
queryStatement) {
     Set<Expression> selectExpressions = analysis.getSelectExpressions();
-    // TODO if no order by, just set deviceViewOutputExpressions as 
selectExpressions
-    Set<Expression> deviceViewOutputExpressions = new 
LinkedHashSet<>(selectExpressions);
-    if (queryStatement.hasOrderByExpression()) {
-      deviceViewOutputExpressions.addAll(analysis.getOrderByExpressions());
+    // if no order by, just set deviceViewOutputExpressions as 
selectExpressions
+    Set<Expression> deviceViewOutputExpressions = new LinkedHashSet<>();
+
+    if (queryStatement.isAggregationQuery()) {
+      deviceViewOutputExpressions.add(DEVICE_EXPRESSION);
+      if (queryStatement.isOutputEndTime()) {
+        deviceViewOutputExpressions.add(END_TIME_EXPRESSION);
+      }
+      for (Expression selectExpression : selectExpressions) {
+        
deviceViewOutputExpressions.addAll(searchAggregationExpressions(selectExpression));
+      }
+      if (queryStatement.hasHaving()) {
+        deviceViewOutputExpressions.addAll(
+            searchAggregationExpressions(analysis.getHavingExpression()));
+      }
+      if (queryStatement.hasOrderByExpression()) {
+        for (Expression orderByExpression : analysis.getOrderByExpressions()) {
+          
deviceViewOutputExpressions.addAll(searchAggregationExpressions(orderByExpression));
+        }
+      }
+    } else {
+      deviceViewOutputExpressions.addAll(selectExpressions);
+      if (queryStatement.hasOrderByExpression()) {
+        deviceViewOutputExpressions.addAll(analysis.getOrderByExpressions());
+      }
     }
+
     analysis.setDeviceViewOutputExpressions(deviceViewOutputExpressions);
     analysis.setDeviceViewSpecialProcess(
         analyzeDeviceViewSpecialProcess(deviceViewOutputExpressions, 
queryStatement, analysis));
   }
 
-  private static void analyzeDeviceViewInput(Analysis analysis) {
+  static void analyzeDeviceViewInput(Analysis analysis, QueryStatement 
queryStatement) {
     List<Integer> indexes = new ArrayList<>();
 
-    // index-0 is `Device`
-    for (int i = 1; i < analysis.getSelectExpressions().size(); i++) {
-      indexes.add(i);
+    if (queryStatement.isAggregationQuery()) {
+      // TODO verify the rightness of order
+      for (int i = 1; i <= analysis.getAggregationExpressions().size(); i++) {
+        indexes.add(i);
+      }
+    } else {
+      for (int i = 1; i < analysis.getSelectExpressions().size(); i++) {
+        indexes.add(i);
+      }
     }
+
+    // TODO only store once
     Map<String, List<Integer>> deviceViewInputIndexesMap = new HashMap<>();
     for (PartialPath devicePath : analysis.getDeviceList()) {
       deviceViewInputIndexesMap.put(devicePath.getFullPath(), indexes);
@@ -356,7 +388,7 @@ public class TemplatedAnalyze {
     
analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions());
   }
 
-  private static void analyzeDataPartition(
+  static void analyzeDataPartition(
       Analysis analysis,
       ISchemaTree schemaTree,
       IPartitionFetcher partitionFetcher,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java
index bd624f25634..2ae63c12a90 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 
@@ -58,7 +60,7 @@ public class TemplatedInfo {
   private final boolean queryAllSensors;
 
   // variables used in DeviceViewOperator
-  private final List<String> selectMeasurements;
+  private final List<String> deviceViewOutputNames;
   private final List<Integer> deviceToMeasurementIndexes;
 
   // variables related to LIMIT/OFFSET push down
@@ -71,33 +73,44 @@ public class TemplatedInfo {
 
   // utils variables, not serialize
   private Map<String, IMeasurementSchema> schemaMap;
-  private Map<String, List<InputLocation>> layoutMap;
+  private Map<String, List<InputLocation>> filterLayoutMap;
   private int maxTsBlockLineNum = -1;
 
   // variables related to predicate push down
+  // TODO when to init pushDownPredicate in agg situation?
   private Expression pushDownPredicate;
 
+  // variables related to aggregation
+  public List<AggregationDescriptor> aggregationDescriptorList;
+  public GroupByTimeParameter groupByTimeParameter;
+  public boolean outputEndTime;
+
+  private Expression havingExpression;
+
   public TemplatedInfo(
       List<String> measurementList,
       List<IMeasurementSchema> schemaList,
       List<TSDataType> dataTypes,
       Ordering scanOrder,
       boolean queryAllSensors,
-      List<String> selectMeasurements,
+      List<String> deviceViewOutputNames,
       List<Integer> deviceToMeasurementIndexes,
       long offsetValue,
       long limitValue,
       Expression predicate,
       boolean keepNull,
       Map<String, IMeasurementSchema> schemaMap,
-      Map<String, List<InputLocation>> layoutMap,
-      Expression pushDownPredicate) {
+      Map<String, List<InputLocation>> filterLayoutMap,
+      Expression pushDownPredicate,
+      List<AggregationDescriptor> aggregationDescriptorList,
+      GroupByTimeParameter groupByTimeParameter,
+      boolean outputEndTime) {
     this.measurementList = measurementList;
     this.schemaList = schemaList;
     this.dataTypes = dataTypes;
     this.scanOrder = scanOrder;
     this.queryAllSensors = queryAllSensors;
-    this.selectMeasurements = selectMeasurements;
+    this.deviceViewOutputNames = deviceViewOutputNames;
     this.deviceToMeasurementIndexes = deviceToMeasurementIndexes;
     this.offsetValue = offsetValue;
     this.limitValue = limitValue;
@@ -105,9 +118,13 @@ public class TemplatedInfo {
     if (predicate != null) {
       this.keepNull = keepNull;
       this.schemaMap = schemaMap;
-      this.layoutMap = layoutMap;
+      this.filterLayoutMap = filterLayoutMap;
     }
     this.pushDownPredicate = pushDownPredicate;
+
+    this.aggregationDescriptorList = aggregationDescriptorList;
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.outputEndTime = outputEndTime;
   }
 
   public List<String> getMeasurementList() {
@@ -130,8 +147,8 @@ public class TemplatedInfo {
     return this.queryAllSensors;
   }
 
-  public List<String> getSelectMeasurements() {
-    return this.selectMeasurements;
+  public List<String> getDeviceViewOutputNames() {
+    return this.deviceViewOutputNames;
   }
 
   public long getOffsetValue() {
@@ -158,8 +175,8 @@ public class TemplatedInfo {
     return this.schemaMap;
   }
 
-  public Map<String, List<InputLocation>> getLayoutMap() {
-    return this.layoutMap;
+  public Map<String, List<InputLocation>> getFilterLayoutMap() {
+    return this.filterLayoutMap;
   }
 
   public Expression getPushDownPredicate() {
@@ -215,8 +232,8 @@ public class TemplatedInfo {
     ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
     ReadWriteIOUtils.write(queryAllSensors, byteBuffer);
 
-    ReadWriteIOUtils.write(selectMeasurements.size(), byteBuffer);
-    for (String selectMeasurement : selectMeasurements) {
+    ReadWriteIOUtils.write(deviceViewOutputNames.size(), byteBuffer);
+    for (String selectMeasurement : deviceViewOutputNames) {
       ReadWriteIOUtils.write(selectMeasurement, byteBuffer);
     }
 
@@ -242,6 +259,20 @@ public class TemplatedInfo {
     } else {
       ReadWriteIOUtils.write((byte) 0, byteBuffer);
     }
+
+    if (aggregationDescriptorList != null) {
+      ReadWriteIOUtils.write(aggregationDescriptorList.size(), byteBuffer);
+      aggregationDescriptorList.forEach(d -> d.serialize(byteBuffer));
+    } else {
+      ReadWriteIOUtils.write(0, byteBuffer);
+    }
+
+    if (groupByTimeParameter != null) {
+      ReadWriteIOUtils.write((byte) 1, byteBuffer);
+      groupByTimeParameter.serialize(byteBuffer);
+    } else {
+      ReadWriteIOUtils.write((byte) 0, byteBuffer);
+    }
   }
 
   public void serialize(DataOutputStream stream) throws IOException {
@@ -258,8 +289,8 @@ public class TemplatedInfo {
     ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
     ReadWriteIOUtils.write(queryAllSensors, stream);
 
-    ReadWriteIOUtils.write(selectMeasurements.size(), stream);
-    for (String selectMeasurement : selectMeasurements) {
+    ReadWriteIOUtils.write(deviceViewOutputNames.size(), stream);
+    for (String selectMeasurement : deviceViewOutputNames) {
       ReadWriteIOUtils.write(selectMeasurement, stream);
     }
 
@@ -285,6 +316,22 @@ public class TemplatedInfo {
     } else {
       ReadWriteIOUtils.write((byte) 0, stream);
     }
+
+    if (aggregationDescriptorList != null) {
+      ReadWriteIOUtils.write(aggregationDescriptorList.size(), stream);
+      for (AggregationDescriptor descriptor : aggregationDescriptorList) {
+        descriptor.serialize(stream);
+      }
+    } else {
+      ReadWriteIOUtils.write(0, stream);
+    }
+
+    if (groupByTimeParameter != null) {
+      ReadWriteIOUtils.write((byte) 1, stream);
+      groupByTimeParameter.serialize(stream);
+    } else {
+      ReadWriteIOUtils.write((byte) 0, stream);
+    }
   }
 
   public static TemplatedInfo deserialize(ByteBuffer byteBuffer) {
@@ -350,6 +397,23 @@ public class TemplatedInfo {
       pushDownPredicate = Expression.deserialize(byteBuffer);
     }
 
+    List<AggregationDescriptor> aggregationDescriptorList = null;
+    listSize = ReadWriteIOUtils.readInt(byteBuffer);
+    if (listSize > 0) {
+      aggregationDescriptorList = new ArrayList<>(listSize);
+      while (listSize-- > 0) {
+        
aggregationDescriptorList.add(AggregationDescriptor.deserialize(byteBuffer));
+      }
+    }
+
+    byte hasGroupByTime = ReadWriteIOUtils.readByte(byteBuffer);
+    GroupByTimeParameter groupByTimeParameter = null;
+    if (hasGroupByTime == 1) {
+      groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
+    }
+
+    // TODO add outputEndTime serialization and deserialization
+
     return new TemplatedInfo(
         measurementList,
         measurementSchemaList,
@@ -364,6 +428,9 @@ public class TemplatedInfo {
         keepNull,
         currentSchemaMap,
         layoutMap,
-        pushDownPredicate);
+        pushDownPredicate,
+        aggregationDescriptorList,
+        groupByTimeParameter,
+        false);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
index cf4b3461955..2e65e750193 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
@@ -64,11 +64,13 @@ import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
@@ -93,6 +95,12 @@ public class AggregationPushDown implements PlanOptimizer {
   private boolean cannotUseStatistics(QueryStatement queryStatement, Analysis 
analysis) {
     boolean isAlignByDevice = queryStatement.isAlignByDevice();
     if (isAlignByDevice) {
+      if (analysis.allDevicesInOneTemplate()) {
+        // TODO agg+template situation, how about the 
SourceTransformExpressions
+        return cannotUseStatistics(
+            analysis.getAggregationExpressions(), 
analysis.getAggregationExpressions());
+      }
+
       // check any of the devices
       String device = analysis.getDeviceList().get(0).toString();
       return cannotUseStatistics(
@@ -173,6 +181,10 @@ public class AggregationPushDown implements PlanOptimizer {
       List<PlanNode> rewrittenChildren = new ArrayList<>();
       for (int i = 0; i < node.getDevices().size(); i++) {
         context.setCurDevice(node.getDevices().get(i));
+        if (context.analysis.allDevicesInOneTemplate()) {
+          context.setCurDevicePath(context.analysis.getDeviceList().get(i));
+        }
+
         rewrittenChildren.add(node.getChildren().get(i).accept(this, context));
       }
       node.setChildren(rewrittenChildren);
@@ -263,14 +275,26 @@ public class AggregationPushDown implements PlanOptimizer 
{
               sourceToCountTimeAggregationsMap);
         }
 
-        List<PlanNode> sourceNodeList =
-            constructSourceNodeFromAggregationDescriptors(
-                sourceToAscendingAggregationsMap,
-                sourceToDescendingAggregationsMap,
-                sourceToCountTimeAggregationsMap,
-                node.getScanOrder(),
-                node.getGroupByTimeParameter(),
-                context);
+        List<PlanNode> sourceNodeList;
+        if (context.analysis.allDevicesInOneTemplate()) {
+          sourceNodeList =
+              constructSourceNodeFromTemplateAggregationDescriptors(
+                  sourceToAscendingAggregationsMap,
+                  sourceToDescendingAggregationsMap,
+                  sourceToCountTimeAggregationsMap,
+                  node.getScanOrder(),
+                  node.getGroupByTimeParameter(),
+                  context);
+        } else {
+          sourceNodeList =
+              constructSourceNodeFromAggregationDescriptors(
+                  sourceToAscendingAggregationsMap,
+                  sourceToDescendingAggregationsMap,
+                  sourceToCountTimeAggregationsMap,
+                  node.getScanOrder(),
+                  node.getGroupByTimeParameter(),
+                  context);
+        }
 
         if (isSingleSource && ((SeriesScanSourceNode) 
child).getPushDownPredicate() != null) {
           Expression pushDownPredicate = ((SeriesScanSourceNode) 
child).getPushDownPredicate();
@@ -364,7 +388,6 @@ public class AggregationPushDown implements PlanOptimizer {
         GroupByTimeParameter groupByTimeParameter,
         RewriterContext context) {
       List<PlanNode> sourceNodeList = new ArrayList<>();
-      boolean needCheckAscending = groupByTimeParameter == null;
       Map<PartialPath, List<AggregationDescriptor>> 
groupedAscendingAggregations = null;
       if (!countTimeAggregations.isEmpty()) {
         groupedAscendingAggregations = countTimeAggregations;
@@ -383,6 +406,7 @@ public class AggregationPushDown implements PlanOptimizer {
                 context));
       }
 
+      boolean needCheckAscending = groupByTimeParameter == null;
       if (needCheckAscending) {
         Map<PartialPath, List<AggregationDescriptor>> 
groupedDescendingAggregations =
             MetaUtils.groupAlignedAggregations(descendingAggregations);
@@ -400,6 +424,85 @@ public class AggregationPushDown implements PlanOptimizer {
       return sourceNodeList;
     }
 
+    private List<PlanNode> 
constructSourceNodeFromTemplateAggregationDescriptors(
+        Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
+        Map<PartialPath, List<AggregationDescriptor>> descendingAggregations,
+        Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations,
+        Ordering scanOrder,
+        GroupByTimeParameter groupByTimeParameter,
+        RewriterContext context) {
+
+      // keySet of ascendingAggregations is measurement,
+      // valueSet of ascendingAggregations is aggDescriptors such as 
count(s1), avg(s1)
+
+      List<PlanNode> sourceNodeList = new ArrayList<>();
+      PartialPath devicePath = context.curDevicePath;
+      List<String> measurementList = context.analysis.getMeasurementList();
+      List<IMeasurementSchema> measurementSchemaList = 
context.analysis.getMeasurementSchemaList();
+      boolean needCheckAscending = groupByTimeParameter == null;
+
+      if (context.analysis.getDeviceTemplate().isDirectAligned()) {
+        AlignedPath alignedPath = new AlignedPath(devicePath);
+        alignedPath.setMeasurementList(measurementList);
+        alignedPath.addSchemas(measurementSchemaList);
+
+        List<AggregationDescriptor> aggregationDescriptors =
+            ascendingAggregations.values().stream()
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+        if (!aggregationDescriptors.isEmpty()) {
+          sourceNodeList.add(
+              createAggregationScanNode(
+                  alignedPath, aggregationDescriptors, scanOrder, 
groupByTimeParameter, context));
+        }
+
+        if (needCheckAscending && !descendingAggregations.isEmpty()) {
+          aggregationDescriptors =
+              descendingAggregations.values().stream()
+                  .flatMap(Collection::stream)
+                  .collect(Collectors.toList());
+          sourceNodeList.add(
+              createAggregationScanNode(
+                  alignedPath, aggregationDescriptors, scanOrder, 
groupByTimeParameter, context));
+        }
+      } else {
+        // TODO verify the rightness of non-aligned series
+        for (int i = 0; i < measurementList.size(); i++) {
+          MeasurementPath measurementPath =
+              new MeasurementPath(
+                  devicePath.concatNode(measurementList.get(i)), 
measurementSchemaList.get(i));
+          for (List<AggregationDescriptor> aggregationDescriptorList :
+              descendingAggregations.values()) {
+            sourceNodeList.add(
+                createAggregationScanNode(
+                    measurementPath,
+                    aggregationDescriptorList,
+                    scanOrder,
+                    groupByTimeParameter,
+                    context));
+          }
+
+          if (needCheckAscending) {
+            for (List<AggregationDescriptor> aggregationDescriptorList :
+                descendingAggregations.values()) {
+              sourceNodeList.add(
+                  createAggregationScanNode(
+                      measurementPath,
+                      aggregationDescriptorList,
+                      scanOrder,
+                      groupByTimeParameter,
+                      context));
+            }
+          }
+        }
+      }
+
+      // TODO count(s1+s2) is not supported
+      // TODO count_time is not supported
+
+      return sourceNodeList;
+    }
+
     private SeriesAggregationSourceNode createAggregationScanNode(
         PartialPath selectPath,
         List<AggregationDescriptor> aggregationDescriptorList,
@@ -454,6 +557,7 @@ public class AggregationPushDown implements PlanOptimizer {
     private final boolean isAlignByDevice;
 
     private String curDevice;
+    private PartialPath curDevicePath;
 
     public RewriterContext(Analysis analysis, MPPQueryContext context, boolean 
isAlignByDevice) {
       this.analysis = analysis;
@@ -473,9 +577,17 @@ public class AggregationPushDown implements PlanOptimizer {
       this.curDevice = curDevice;
     }
 
+    public void setCurDevicePath(PartialPath devicePath) {
+      this.curDevicePath = devicePath;
+    }
+
     public Set<Expression> getAggregationExpressions() {
       if (isAlignByDevice) {
-        return analysis.getDeviceToAggregationExpressions().get(curDevice);
+        if (analysis.allDevicesInOneTemplate()) {
+          return analysis.getAggregationExpressions();
+        } else {
+          return analysis.getDeviceToAggregationExpressions().get(curDevice);
+        }
       }
       return analysis.getAggregationExpressions();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java
index 2207b94f6b8..65a7023bb31 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java
@@ -164,7 +164,10 @@ public class PredicatePushDown implements PlanOptimizer {
       if (!cannotPushDownConjuncts.isEmpty()) {
         resultNode =
             planFilter(
-                resultNode, 
PredicateUtils.combineConjuncts(cannotPushDownConjuncts), context);
+                resultNode,
+                PredicateUtils.combineConjuncts(cannotPushDownConjuncts),
+                context,
+                true);
       } else {
         resultNode = planTransform(resultNode, context);
         resultNode = planProject(resultNode, context);
@@ -249,7 +252,8 @@ public class PredicatePushDown implements PlanOptimizer {
       return resultNode;
     }
 
-    private PlanNode planFilter(PlanNode child, Expression predicate, 
RewriterContext context) {
+    private PlanNode planFilter(
+        PlanNode child, Expression predicate, RewriterContext context, boolean 
isFromWhere) {
       FilterNode pushDownFilterNode = context.getPushDownFilterNode();
       return new FilterNode(
           context.genPlanNodeId(),
@@ -257,7 +261,8 @@ public class PredicatePushDown implements PlanOptimizer {
           pushDownFilterNode.getOutputExpressions(),
           predicate,
           pushDownFilterNode.isKeepNull(),
-          pushDownFilterNode.getScanOrder());
+          pushDownFilterNode.getScanOrder(),
+          isFromWhere);
     }
 
     @Override
@@ -330,7 +335,8 @@ public class PredicatePushDown implements PlanOptimizer {
         resultNode = planProject(resultNode, context);
         return resultNode;
       } else {
-        return planFilter(node, 
PredicateUtils.combineConjuncts(cannotPushDownConjuncts), context);
+        return planFilter(
+            node, PredicateUtils.combineConjuncts(cannotPushDownConjuncts), 
context, true);
       }
     }
 
@@ -402,7 +408,7 @@ public class PredicatePushDown implements PlanOptimizer {
     private RewriterContext(Analysis analysis, MPPQueryContext context, 
boolean isAlignByDevice) {
       this.queryId = context.getQueryId();
       this.isAlignByDevice = isAlignByDevice;
-      this.isBuildPlanUseTemplate = analysis.isAllDevicesInOneTemplate();
+      this.isBuildPlanUseTemplate = analysis.allDevicesInOneTemplate();
       this.templatedInfo = context.getTypeProvider().getTemplatedInfo();
       this.filterNodeFromWhereChecker = analysis::fromWhere;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index b1786c85112..b696501a417 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -746,7 +746,7 @@ public class LogicalPlanBuilder {
     return this;
   }
 
-  private PlanNode createSlidingWindowAggregationNode(
+  protected PlanNode createSlidingWindowAggregationNode(
       PlanNode child,
       Set<Expression> aggregationExpressions,
       GroupByTimeParameter groupByTimeParameter,
@@ -851,7 +851,7 @@ public class LogicalPlanBuilder {
             .collect(Collectors.toList()));
   }
 
-  private List<AggregationDescriptor> constructAggregationDescriptorList(
+  protected List<AggregationDescriptor> constructAggregationDescriptorList(
       Set<Expression> aggregationExpressions, AggregationStep curStep) {
     return aggregationExpressions.stream()
         .map(
@@ -883,7 +883,8 @@ public class LogicalPlanBuilder {
             selectExpressions.toArray(new Expression[0]),
             filterExpression,
             isGroupByTime,
-            scanOrder);
+            scanOrder,
+            fromWhere);
     if (fromWhere) {
       analysis.setFromWhere(filterNode);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 08990c5df47..3bec19321d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -134,7 +134,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
 
   @Override
   public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext 
context) {
-    if (analysis.isAllDevicesInOneTemplate()) {
+    if (analysis.allDevicesInOneTemplate()) {
       return new TemplatedLogicalPlan(analysis, queryStatement, 
context).visitQuery();
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index c271e8ec2da..6b2ecccfd65 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -447,7 +447,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             seriesScanOperator,
             templatedInfo.getProjectExpressions(),
             templatedInfo.getDataTypes(),
-            templatedInfo.getLayoutMap(),
+            templatedInfo.getFilterLayoutMap(),
             templatedInfo.isKeepNull(),
             node.getPlanNodeId(),
             templatedInfo.getScanOrder(),
@@ -494,10 +494,18 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     List<String> inputColumnNames;
     List<String> outputColumnNames = node.getOutputColumnNames();
     if (outputColumnNames == null) {
-      outputColumnNames = 
context.getTypeProvider().getTemplatedInfo().getSelectMeasurements();
-      // skip device column
-      outputColumnNames = outputColumnNames.subList(1, 
outputColumnNames.size());
-      inputColumnNames = 
context.getTypeProvider().getTemplatedInfo().getMeasurementList();
+      if 
(context.getTypeProvider().getTemplatedInfo().aggregationDescriptorList != 
null) {
+        // TODO fix it
+        // outputColumnNames is aggregation expression
+        outputColumnNames = 
context.getTypeProvider().getTemplatedInfo().getDeviceViewOutputNames();
+        outputColumnNames = outputColumnNames.subList(1, 
outputColumnNames.size());
+        inputColumnNames = outputColumnNames;
+      } else {
+        outputColumnNames = 
context.getTypeProvider().getTemplatedInfo().getDeviceViewOutputNames();
+        // skip device column
+        outputColumnNames = outputColumnNames.subList(1, 
outputColumnNames.size());
+        inputColumnNames = 
context.getTypeProvider().getTemplatedInfo().getMeasurementList();
+      }
     } else {
       inputColumnNames = node.getChild().getOutputColumnNames();
     }
@@ -591,28 +599,62 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
   @Override
   public Operator visitAlignedSeriesAggregationScan(
       AlignedSeriesAggregationScanNode node, LocalExecutionPlanContext 
context) {
-    AlignedPath seriesPath = node.getAlignedPath();
-    boolean ascending = node.getScanOrder() == Ordering.ASC;
+    if (context.isBuildPlanUseTemplate()) {
+      return constructAlignedSeriesAggregationScanOperator(
+          node.getPlanNodeId(),
+          node.getAlignedPath(),
+          context.getTemplatedInfo().aggregationDescriptorList,
+          context.getTemplatedInfo().getPushDownPredicate(),
+          context.getTemplatedInfo().getScanOrder(),
+          context.getTemplatedInfo().groupByTimeParameter,
+          context.getTemplatedInfo().outputEndTime,
+          context);
+    }
+
+    return constructAlignedSeriesAggregationScanOperator(
+        node.getPlanNodeId(),
+        node.getAlignedPath(),
+        node.getAggregationDescriptorList(),
+        node.getPushDownPredicate(),
+        node.getScanOrder(),
+        node.getGroupByTimeParameter(),
+        node.isOutputEndTime(),
+        context);
+  }
+
+  private Operator constructAlignedSeriesAggregationScanOperator(
+      PlanNodeId planNodeId,
+      AlignedPath alignedPath,
+      List<AggregationDescriptor> aggregationDescriptorList,
+      Expression pushDownPredicate,
+      Ordering scanOrder,
+      GroupByTimeParameter groupByTimeParameter,
+      boolean outputEndTime,
+      LocalExecutionPlanContext context) {
+    boolean ascending = scanOrder == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
-    for (AggregationDescriptor descriptor : 
node.getAggregationDescriptorList()) {
+    for (AggregationDescriptor descriptor : aggregationDescriptorList) {
       checkArgument(
           descriptor.getInputExpressions().size() == 1,
           "descriptor's input expression size is not 1");
+
       Expression expression = descriptor.getInputExpressions().get(0);
       if (expression instanceof TimeSeriesOperand) {
+        // TODO for template_agg, no need use getPath.getMeasurement
         String inputSeries =
             ((TimeSeriesOperand) (descriptor.getInputExpressions().get(0)))
                 .getPath()
                 .getMeasurement();
-        int seriesIndex = seriesPath.getMeasurementList().indexOf(inputSeries);
+        int seriesIndex = 
alignedPath.getMeasurementList().indexOf(inputSeries);
         TSDataType seriesDataType =
-            
seriesPath.getMeasurementSchema().getSubMeasurementsTSDataTypeList().get(seriesIndex);
+            
alignedPath.getMeasurementSchema().getSubMeasurementsTSDataTypeList().get(seriesIndex);
         aggregators.add(
             new Aggregator(
                 AccumulatorFactory.createAccumulator(
                     descriptor.getAggregationFuncName(),
                     descriptor.getAggregationType(),
                     Collections.singletonList(seriesDataType),
+                    // TODO inputExpression must be devicePath+measurement
                     descriptor.getInputExpressions(),
                     descriptor.getInputAttributes(),
                     ascending,
@@ -627,6 +669,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                     descriptor.getAggregationFuncName(),
                     descriptor.getAggregationType(),
                     Collections.singletonList(TSDataType.INT64),
+                    // TODO inputExpression must be devicePath+measurement
                     descriptor.getInputExpressions(),
                     descriptor.getInputAttributes(),
                     ascending,
@@ -640,23 +683,22 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
       }
     }
 
-    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
     ITimeRangeIterator timeRangeIterator =
         initTimeRangeIterator(groupByTimeParameter, ascending, true);
     long maxReturnSize =
         AggregationUtil.calculateMaxAggregationResultSize(
-            node.getAggregationDescriptorList(), timeRangeIterator, 
context.getTypeProvider());
+            aggregationDescriptorList, timeRangeIterator, 
context.getTypeProvider());
 
     SeriesScanOptions.Builder scanOptionsBuilder = 
getSeriesScanOptionsBuilder(context);
-    scanOptionsBuilder.withAllSensors(new 
HashSet<>(seriesPath.getMeasurementList()));
+    scanOptionsBuilder.withAllSensors(new 
HashSet<>(alignedPath.getMeasurementList()));
 
-    Expression pushDownPredicate = node.getPushDownPredicate();
     if (pushDownPredicate != null) {
       
checkArgument(PredicateUtils.predicateCanPushIntoScan(pushDownPredicate));
       scanOptionsBuilder.withPushDownFilter(
           convertPredicateToFilter(
               pushDownPredicate,
-              node.getAlignedPath().getMeasurementList(),
+              alignedPath.getMeasurementList(),
+              // TODO what's the meaning of isBuildPlanUseTemplate
               context.getTypeProvider().getTemplatedInfo() != null,
               context.getTypeProvider()));
     }
@@ -666,14 +708,14 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             .getDriverContext()
             .addOperatorContext(
                 context.getNextOperatorId(),
-                node.getPlanNodeId(),
+                planNodeId,
                 AlignedSeriesAggregationScanOperator.class.getSimpleName());
     AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
         new AlignedSeriesAggregationScanOperator(
-            node.getPlanNodeId(),
-            seriesPath,
-            node.getScanOrder(),
-            node.isOutputEndTime(),
+            planNodeId,
+            alignedPath,
+            scanOrder,
+            outputEndTime,
             scanOptionsBuilder.build(),
             operatorContext,
             aggregators,
@@ -683,7 +725,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
 
     ((DataDriverContext) context.getDriverContext())
         .addSourceOperator(seriesAggregationScanOperator);
-    ((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
+    ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
     context.getDriverContext().setInputDriver(true);
     return seriesAggregationScanOperator;
   }
@@ -1395,25 +1437,32 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
 
   @Override
   public Operator visitFilter(FilterNode node, LocalExecutionPlanContext 
context) {
-    if (context.isBuildPlanUseTemplate()) {
+    if (context.isBuildPlanUseTemplate() && node.isFromWhere()) {
       TemplatedInfo templatedInfo = context.getTemplatedInfo();
       return constructFilterOperator(
           node.getPredicate(),
           generateOnlyChildOperator(node, context),
           templatedInfo.getProjectExpressions(),
           templatedInfo.getDataTypes(),
-          templatedInfo.getLayoutMap(),
+          templatedInfo.getFilterLayoutMap(),
           templatedInfo.isKeepNull(),
           node.getPlanNodeId(),
           templatedInfo.getScanOrder(),
           context);
     }
 
+    // 1. not use template
+    // 2. use template but the FilterNode is not generated by FilterNode
+    // the inputDataTypes should be generated by the outputColumns of children
     return constructFilterOperator(
         node.getPredicate(),
         generateOnlyChildOperator(node, context),
         node.getOutputExpressions(),
-        getInputColumnTypes(node, context.getTypeProvider()),
+        node.getChildren().stream()
+            .map(PlanNode::getOutputColumnNames)
+            .flatMap(List::stream)
+            .map(context.getTypeProvider()::getType)
+            .collect(Collectors.toList()),
         makeLayout(node),
         node.isKeepNull(),
         node.getPlanNodeId(),
@@ -1778,13 +1827,28 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
   @Override
   public Operator visitRawDataAggregation(
       RawDataAggregationNode node, LocalExecutionPlanContext context) {
+    // TODO optimize serialize and deserialize method in template situation
+    Map<String, List<InputLocation>> layout;
+    if (context.isBuildPlanUseTemplate()) {
+      // in template situation, output columns of ProjectNode is not stored, 
it's same as its
+      // children
+      layout = context.getTemplatedInfo().getFilterLayoutMap();
+    } else {
+      layout = makeLayout(node);
+    }
+    return createRawDataAggregationOperator(node, context, layout);
+  }
+
+  private RawDataAggregationOperator createRawDataAggregationOperator(
+      RawDataAggregationNode node,
+      LocalExecutionPlanContext context,
+      Map<String, List<InputLocation>> layout) {
     checkArgument(
         !node.getAggregationDescriptorList().isEmpty(),
         "Aggregation descriptorList cannot be empty");
     Operator child = node.getChild().accept(this, context);
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
-    Map<String, List<InputLocation>> layout = makeLayout(node);
     List<AggregationDescriptor> aggregationDescriptors = 
node.getAggregationDescriptorList();
     for (AggregationDescriptor descriptor : 
node.getAggregationDescriptorList()) {
       List<InputLocation[]> inputLocationList = 
calcInputLocationList(descriptor, layout);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
index b8efa1e3b3d..34d48ea8a72 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
@@ -85,6 +85,13 @@ public class SubPlanTypeExtractor {
     @Override
     public Void visitAlignedSeriesAggregationScan(
         AlignedSeriesAggregationScanNode node, Void context) {
+      // if TemplateInfo is not empty, all type infos used by 
AlignedSeriesAggregationScanNode have
+      // been stored
+      // in TemplateInfo
+      if (typeProvider.getTemplatedInfo() != null) {
+        return null;
+      }
+
       AlignedPath alignedPath = node.getAlignedPath();
       for (int i = 0; i < alignedPath.getColumnNum(); i++) {
         String sourcePath = 
alignedPath.getPathWithMeasurement(i).getFullPath();
@@ -168,6 +175,7 @@ public class SubPlanTypeExtractor {
       if (typeProvider.getTemplatedInfo() != null) {
         return null;
       }
+
       return visitPlan(node, context);
     }
 
@@ -178,6 +186,7 @@ public class SubPlanTypeExtractor {
       if (typeProvider.getTemplatedInfo() != null) {
         return null;
       }
+
       return visitPlan(node, context);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
index 75222efb6fc..a00aee711d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
@@ -25,13 +25,19 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
+import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 
+import org.apache.commons.lang3.Validate;
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -39,9 +45,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE;
+import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchSourceExpressions;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo.makeLayout;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder.updateTypeProviderByPartialAggregation;
 import static 
org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanVisitor.pushDownLimitToScanNode;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode.getDeduplicatedDescriptors;
 
 /**
  * This class provides accelerated implementation for multiple devices align 
by device query. This
@@ -70,6 +80,10 @@ public class TemplatedLogicalPlan {
 
   private Map<String, List<InputLocation>> filterLayoutMap;
 
+  List<AggregationDescriptor> aggregationDescriptorList;
+
+  List<AggregationDescriptor> deduplicatedDescriptors;
+
   public TemplatedLogicalPlan(
       Analysis analysis, QueryStatement queryStatement, MPPQueryContext 
context) {
     this.analysis = analysis;
@@ -86,10 +100,70 @@ public class TemplatedLogicalPlan {
     this.whereExpression = analysis.getWhereExpression();
 
     // for align by device query with template, most used variables are same
-    initCommonVariables();
+    if (queryStatement.isAggregationQuery()) {
+      initAggQueryCommonVariables();
+    } else {
+      initNonAggQueryCommonVariables();
+    }
   }
 
-  private void initCommonVariables() {
+  private void initAggQueryCommonVariables() {
+    if (whereExpression != null) {
+      newMeasurementList = new ArrayList<>(measurementList);
+      newSchemaList = new ArrayList<>(schemaList);
+      Set<String> selectMeasurements = new HashSet<>(measurementList);
+      List<Expression> whereSourceExpressions = 
searchSourceExpressions(whereExpression);
+      for (Expression expression : whereSourceExpressions) {
+        if (expression instanceof TimeSeriesOperand) {
+          String measurement = ((TimeSeriesOperand) 
expression).getPath().getMeasurement();
+          if 
(!analysis.getDeviceTemplate().getSchemaMap().containsKey(measurement)) {
+            continue;
+          }
+          if (!selectMeasurements.contains(measurement)) {
+            newMeasurementList.add(measurement);
+            
newSchemaList.add(analysis.getDeviceTemplate().getSchema(measurement));
+          }
+        }
+      }
+
+      // TODO fix aggregation filterLayoutMap
+      filterLayoutMap = makeLayout(newMeasurementList);
+
+      analysis
+          .getExpressionTypes()
+          .forEach(
+              (key, value) ->
+                  
context.getTypeProvider().setType(key.getNode().getOutputSymbol(), value));
+    }
+
+    context
+        .getTypeProvider()
+        .setTemplatedInfo(
+            new TemplatedInfo(
+                newMeasurementList,
+                newSchemaList,
+                newSchemaList.stream()
+                    .map(IMeasurementSchema::getType)
+                    .collect(Collectors.toList()),
+                queryStatement.getResultTimeOrder(),
+                analysis.isLastLevelUseWildcard(),
+                analysis.getDeviceViewOutputExpressions().stream()
+                    .map(Expression::getExpressionString)
+                    .collect(Collectors.toList()),
+                
analysis.getDeviceViewInputIndexesMap().values().iterator().next(),
+                OFFSET_VALUE,
+                limitValue,
+                whereExpression,
+                queryStatement.isGroupByTime(),
+                analysis.getDeviceTemplate().getSchemaMap(),
+                filterLayoutMap,
+                null,
+                null,
+                analysis.getGroupByTimeParameter(),
+                queryStatement.isOutputEndTime()));
+  }
+
+  private void initNonAggQueryCommonVariables() {
     if (whereExpression != null) {
       if (!analysis.isTemplateWildCardQuery()) {
         newMeasurementList = new ArrayList<>(measurementList);
@@ -141,10 +215,17 @@ public class TemplatedLogicalPlan {
                 queryStatement.isGroupByTime(),
                 analysis.getDeviceTemplate().getSchemaMap(),
                 filterLayoutMap,
-                null));
+                null,
+                null,
+                analysis.getGroupByTimeParameter(),
+                queryStatement.isOutputEndTime()));
   }
 
   public PlanNode visitQuery() {
+    if (queryStatement.isAggregationQuery()) {
+      return visitAggregation();
+    }
+
     LogicalPlanBuilder planBuilder =
         new TemplatedLogicalPlanBuilder(analysis, context, measurementList, 
schemaList);
 
@@ -213,4 +294,157 @@ public class TemplatedLogicalPlan {
 
     return planBuilder.getRoot();
   }
+
+  // ============== Methods below are used for templated aggregation 
======================
+
+  private PlanNode visitAggregation() {
+    boolean outputPartial =
+        queryStatement.isGroupByLevel()
+            || queryStatement.isGroupByTag()
+            || (queryStatement.isGroupByTime() && 
analysis.getGroupByTimeParameter().hasOverlap());
+    AggregationStep curStep = outputPartial ? AggregationStep.PARTIAL : 
AggregationStep.SINGLE;
+
+    if (queryStatement.isGroupByTime() && 
analysis.getGroupByTimeParameter().hasOverlap()) {
+      curStep =
+          (queryStatement.isGroupByLevel() || queryStatement.isGroupByTag())
+              ? AggregationStep.INTERMEDIATE
+              : AggregationStep.FINAL;
+    }
+
+    aggregationDescriptorList =
+        
constructAggregationDescriptorList(analysis.getAggregationExpressions(), 
curStep);
+    updateTypeProvider(analysis.getAggregationExpressions());
+    if (curStep.isOutputPartial()) {
+      aggregationDescriptorList.forEach(
+          aggregationDescriptor ->
+              updateTypeProviderByPartialAggregation(
+                  aggregationDescriptor, context.getTypeProvider()));
+    }
+
+    context.getTypeProvider().getTemplatedInfo().aggregationDescriptorList =
+        aggregationDescriptorList;
+
+    LogicalPlanBuilder planBuilder =
+        new TemplatedLogicalPlanBuilder(analysis, context, measurementList, 
schemaList);
+    Map<String, PlanNode> deviceToSubPlanMap = new LinkedHashMap<>();
+    deduplicatedDescriptors = 
getDeduplicatedDescriptors(aggregationDescriptorList);
+    for (PartialPath devicePath : analysis.getDeviceList()) {
+      String deviceName = devicePath.getFullPath();
+      PlanNode rootNode = visitDeviceAggregationBody(devicePath, curStep);
+
+      LogicalPlanBuilder subPlanBuilder =
+          new TemplatedLogicalPlanBuilder(analysis, context, measurementList, 
schemaList)
+              .withNewRoot(rootNode);
+
+      deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot());
+    }
+
+    // convert to ALIGN BY DEVICE view
+    planBuilder =
+        planBuilder.planDeviceView(
+            deviceToSubPlanMap,
+            analysis.getDeviceViewOutputExpressions(),
+            analysis.getDeviceViewInputIndexesMap(),
+            analysis.getSelectExpressions(),
+            queryStatement,
+            analysis);
+
+    planBuilder =
+        planBuilder.planHavingAndTransform(
+            analysis.getHavingExpression(),
+            analysis.getSelectExpressions(),
+            analysis.getOrderByExpressions(),
+            queryStatement.isGroupByTime(),
+            queryStatement.getResultTimeOrder());
+
+    if (!queryStatement.needPushDownSort()) {
+      planBuilder = planBuilder.planOrderBy(queryStatement, analysis);
+    }
+
+    planBuilder =
+        planBuilder
+            .planFill(analysis.getFillDescriptor(), 
queryStatement.getResultTimeOrder())
+            .planOffset(queryStatement.getRowOffset());
+
+    if (!analysis.isUseTopKNode() || queryStatement.hasOffset()) {
+      planBuilder = planBuilder.planLimit(queryStatement.getRowLimit());
+    }
+
+    return planBuilder.getRoot();
+  }
+
+  private PlanNode visitDeviceAggregationBody(PartialPath devicePath, 
AggregationStep curStep) {
+    TemplatedLogicalPlanBuilder planBuilder =
+        new TemplatedLogicalPlanBuilder(analysis, context, newMeasurementList, 
newSchemaList);
+
+    planBuilder =
+        planBuilder
+            .planRawDataSource(
+                devicePath,
+                queryStatement.getResultTimeOrder(),
+                OFFSET_VALUE,
+                limitValue,
+                analysis.isLastLevelUseWildcard())
+            .planFilter(
+                whereExpression,
+                queryStatement.isGroupByTime(),
+                queryStatement.getResultTimeOrder());
+
+    planBuilder =
+        planBuilder.planRawDataAggregation(
+            analysis.getAggregationExpressions(),
+            null,
+            analysis.getGroupByTimeParameter(),
+            analysis.getGroupByParameter(),
+            queryStatement.isOutputEndTime(),
+            curStep,
+            queryStatement.getResultTimeOrder(),
+            deduplicatedDescriptors);
+
+    if (queryStatement.isGroupByTime() && 
analysis.getGroupByTimeParameter().hasOverlap()) {
+      planBuilder =
+          planBuilder.planSlidingWindowAggregation(
+              analysis.getSelectExpressions(),
+              analysis.getGroupByTimeParameter(),
+              curStep,
+              queryStatement.getResultTimeOrder());
+    }
+
+    // no group by level and group by tag
+    return planBuilder.getRoot();
+  }
+
+  private List<AggregationDescriptor> constructAggregationDescriptorList(
+      Set<Expression> aggregationExpressions, AggregationStep curStep) {
+    return aggregationExpressions.stream()
+        .map(
+            expression -> {
+              Validate.isTrue(expression instanceof FunctionExpression);
+              return new AggregationDescriptor(
+                  ((FunctionExpression) expression).getFunctionName(),
+                  curStep,
+                  expression.getExpressions(),
+                  ((FunctionExpression) expression).getFunctionAttributes());
+            })
+        .collect(Collectors.toList());
+  }
+
+  void updateTypeProvider(Collection<Expression> expressions) {
+    if (expressions == null) {
+      return;
+    }
+    expressions.forEach(
+        expression -> {
+          if (!expression.getExpressionString().equals(DEVICE)
+              && !expression.getExpressionString().equals(ENDTIME)) {
+            context
+                .getTypeProvider()
+                .setType(expression.getExpressionString(), 
getPreAnalyzedType(expression));
+          }
+        });
+  }
+
+  private TSDataType getPreAnalyzedType(Expression expression) {
+    return analysis.getType(expression);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
index b1107ae8fab..3ee79317753 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
@@ -27,14 +27,20 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByParameter;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 /**
  * This class provides accelerated implementation for multiple devices align 
by device query. This
@@ -42,6 +48,7 @@ import java.util.List;
  * unnecessary judgements.
  */
 public class TemplatedLogicalPlanBuilder extends LogicalPlanBuilder {
+
   private final MPPQueryContext context;
 
   private final Analysis analysis;
@@ -121,7 +128,8 @@ public class TemplatedLogicalPlanBuilder extends 
LogicalPlanBuilder {
             null,
             filterExpression,
             isGroupByTime,
-            scanOrder);
+            scanOrder,
+            true);
     analysis.setFromWhere(filterNode);
 
     this.root = filterNode;
@@ -129,6 +137,50 @@ public class TemplatedLogicalPlanBuilder extends 
LogicalPlanBuilder {
     return this;
   }
 
+  // ===================== Methods below are used for aggregation 
=============================
+
+  public TemplatedLogicalPlanBuilder planRawDataAggregation(
+      Set<Expression> aggregationExpressions,
+      Expression groupByExpression,
+      GroupByTimeParameter groupByTimeParameter,
+      GroupByParameter groupByParameter,
+      boolean outputEndTime,
+      AggregationStep curStep,
+      Ordering scanOrder,
+      List<AggregationDescriptor> deduplicatedAggregationDescriptorList) {
+    if (aggregationExpressions == null) {
+      return this;
+    }
+
+    this.root =
+        new RawDataAggregationNode(
+            context.getQueryId().genPlanNodeId(),
+            this.getRoot(),
+            deduplicatedAggregationDescriptorList,
+            groupByTimeParameter,
+            groupByParameter,
+            groupByExpression,
+            outputEndTime,
+            scanOrder,
+            true);
+    return this;
+  }
+
+  public TemplatedLogicalPlanBuilder planSlidingWindowAggregation(
+      Set<Expression> aggregationExpressions,
+      GroupByTimeParameter groupByTimeParameter,
+      AggregationStep curStep,
+      Ordering scanOrder) {
+    if (aggregationExpressions == null) {
+      return this;
+    }
+
+    this.root =
+        createSlidingWindowAggregationNode(
+            this.getRoot(), aggregationExpressions, groupByTimeParameter, 
curStep, scanOrder);
+    return this;
+  }
+
   @Override
   public TemplatedLogicalPlanBuilder withNewRoot(PlanNode newRoot) {
     this.root = newRoot;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
index 4276cba4290..3b32f2932cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.SubPlanTypeExtractor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.VirtualSourceNode;
 
@@ -152,7 +153,7 @@ public class PlanFragment {
     } else {
       ReadWriteIOUtils.write((byte) 1, stream);
 
-      // templated device, the serialized attribute basically same,
+      // templated align by device query, the serialized attributes are same,
       // so there is no need to serialize all the SeriesScanNode repeated
       if (typeProvider.getTemplatedInfo() != null) {
         typeProvider.serialize(stream);
@@ -183,7 +184,8 @@ public class PlanFragment {
     PlanNode root;
     if (typeProvider != null && typeProvider.getTemplatedInfo() != null) {
       root = PlanNodeType.deserializeWithTemplate(byteBuffer, typeProvider);
-      if (root instanceof AlignedSeriesScanNode) {
+      if (root instanceof AlignedSeriesScanNode
+          || root instanceof AlignedSeriesAggregationScanNode) {
         return root;
       }
     } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 09fe6c92960..730edb747b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -452,7 +452,7 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     List<String> outputColumns = node.getOutputColumnNames();
     if (outputColumns == null) {
       checkArgument(context.getTemplatedInfo() != null);
-      outputColumns = context.getTemplatedInfo().getSelectMeasurements();
+      outputColumns = context.getTemplatedInfo().getDeviceViewOutputNames();
       // skip device column
       outputColumns = outputColumns.subList(1, outputColumns.size());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 0fd420ce1bb..2b5e665a755 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -470,14 +470,18 @@ public enum PlanNodeType {
   public static PlanNode deserializeWithTemplate(ByteBuffer buffer, 
TypeProvider typeProvider) {
     short nodeType = buffer.getShort();
     switch (nodeType) {
+      case 1:
+        return DeviceViewNode.deserializeUseTemplate(buffer, typeProvider);
       case 3:
         return FilterNode.deserializeUseTemplate(buffer, typeProvider);
+      case 32:
+        return ProjectNode.deserializeUseTemplate(buffer, typeProvider);
       case 33:
         return AlignedSeriesScanNode.deserializeUseTemplate(buffer, 
typeProvider);
+      case 34:
+        return AlignedSeriesAggregationScanNode.deserializeUseTemplate(buffer, 
typeProvider);
       case 65:
         return SingleDeviceViewNode.deserializeUseTemplate(buffer, 
typeProvider);
-      case 32:
-        return ProjectNode.deserializeUseTemplate(buffer, typeProvider);
       default:
         return deserialize(buffer, nodeType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
index b1f18c555a6..dcb29879365 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
 
+import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -175,18 +176,21 @@ public class DeviceViewNode extends MultiChildProcessNode 
{
 
   public static DeviceViewNode deserialize(ByteBuffer byteBuffer) {
     OrderByParameter mergeOrderParameter = 
OrderByParameter.deserialize(byteBuffer);
+
     int columnSize = ReadWriteIOUtils.readInt(byteBuffer);
     List<String> outputColumnNames = new ArrayList<>();
     while (columnSize > 0) {
       outputColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
       columnSize--;
     }
+
     int devicesSize = ReadWriteIOUtils.readInt(byteBuffer);
     List<String> devices = new ArrayList<>();
     while (devicesSize > 0) {
       devices.add(ReadWriteIOUtils.readString(byteBuffer));
       devicesSize--;
     }
+
     int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
     Map<String, List<Integer>> deviceToMeasurementIndexesMap = new 
HashMap<>(mapSize);
     while (mapSize > 0) {
@@ -205,6 +209,49 @@ public class DeviceViewNode extends MultiChildProcessNode {
         planNodeId, mergeOrderParameter, outputColumnNames, devices, 
deviceToMeasurementIndexesMap);
   }
 
+  @Override
+  public void serializeUseTemplate(DataOutputStream stream, TypeProvider 
typeProvider)
+      throws IOException {
+    PlanNodeType.DEVICE_VIEW.serialize(stream);
+    id.serialize(stream);
+    mergeOrderParameter.serializeAttributes(stream);
+    ReadWriteIOUtils.write(devices.size(), stream);
+    for (String deviceName : devices) {
+      ReadWriteIOUtils.write(deviceName, stream);
+    }
+
+    ReadWriteIOUtils.write(getChildren().size(), stream);
+    for (PlanNode planNode : getChildren()) {
+      planNode.serializeUseTemplate(stream, typeProvider);
+    }
+  }
+
+  public static DeviceViewNode deserializeUseTemplate(
+      ByteBuffer byteBuffer, TypeProvider typeProvider) {
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    OrderByParameter mergeOrderParameter = 
OrderByParameter.deserialize(byteBuffer);
+
+    int devicesSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<String> devices = new ArrayList<>(devicesSize);
+    while (devicesSize > 0) {
+      devices.add(ReadWriteIOUtils.readString(byteBuffer));
+      devicesSize--;
+    }
+
+    Map<String, List<Integer>> deviceToMeasurementIndexesMap = new 
HashMap<>(devices.size());
+    for (String deviceName : devices) {
+      deviceToMeasurementIndexesMap.put(
+          deviceName, 
typeProvider.getTemplatedInfo().getDeviceToMeasurementIndexes());
+    }
+
+    return new DeviceViewNode(
+        planNodeId,
+        mergeOrderParameter,
+        typeProvider.getTemplatedInfo().getDeviceViewOutputNames(),
+        devices,
+        deviceToMeasurementIndexesMap);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/FilterNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/FilterNode.java
index f57466c98a0..f121eec96f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/FilterNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/FilterNode.java
@@ -37,15 +37,21 @@ public class FilterNode extends TransformNode {
 
   private final Expression predicate;
 
+  // whether this node is generated by where clause
+  // fromWhere equals false, means it is generated by having clause
+  private final boolean fromWhere;
+
   public FilterNode(
       PlanNodeId id,
       PlanNode childPlanNode,
       Expression[] outputExpressions,
       Expression predicate,
       boolean keepNull,
-      Ordering scanOrder) {
+      Ordering scanOrder,
+      boolean fromWhere) {
     super(id, childPlanNode, outputExpressions, keepNull, scanOrder);
     this.predicate = predicate;
+    this.fromWhere = fromWhere;
   }
 
   /** This construction method is only used in inner of class `FilterNode`. */
@@ -54,9 +60,11 @@ public class FilterNode extends TransformNode {
       Expression[] outputExpressions,
       Expression predicate,
       boolean keepNull,
-      Ordering scanOrder) {
+      Ordering scanOrder,
+      boolean fromWhere) {
     super(id, outputExpressions, keepNull, scanOrder);
     this.predicate = predicate;
+    this.fromWhere = fromWhere;
   }
 
   @Override
@@ -71,7 +79,8 @@ public class FilterNode extends TransformNode {
 
   @Override
   public PlanNode clone() {
-    return new FilterNode(getPlanNodeId(), outputExpressions, predicate, 
keepNull, scanOrder);
+    return new FilterNode(
+        getPlanNodeId(), outputExpressions, predicate, keepNull, scanOrder, 
fromWhere);
   }
 
   @Override
@@ -108,7 +117,7 @@ public class FilterNode extends TransformNode {
     boolean keepNull = ReadWriteIOUtils.readBool(byteBuffer);
     Ordering scanOrder = 
Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new FilterNode(planNodeId, outputExpressions, predicate, keepNull, 
scanOrder);
+    return new FilterNode(planNodeId, outputExpressions, predicate, keepNull, 
scanOrder, false);
   }
 
   @Override
@@ -116,6 +125,17 @@ public class FilterNode extends TransformNode {
       throws IOException {
     PlanNodeType.FILTER.serialize(stream);
     id.serialize(stream);
+    ReadWriteIOUtils.write(fromWhere, stream);
+    // for FilterNode generated by having clause, it must be serialized totally
+    if (!fromWhere) {
+      ReadWriteIOUtils.write(outputExpressions.length, stream);
+      for (Expression expression : outputExpressions) {
+        Expression.serialize(expression, stream);
+      }
+      Expression.serialize(predicate, stream);
+      ReadWriteIOUtils.write(keepNull, stream);
+      ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
+    }
     ReadWriteIOUtils.write(getChildren().size(), stream);
     for (PlanNode planNode : getChildren()) {
       planNode.serializeUseTemplate(stream, typeProvider);
@@ -125,13 +145,26 @@ public class FilterNode extends TransformNode {
   public static FilterNode deserializeUseTemplate(
       ByteBuffer byteBuffer, TypeProvider typeProvider) {
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-
-    return new FilterNode(
-        planNodeId,
-        null,
-        typeProvider.getTemplatedInfo().getPredicate(),
-        typeProvider.getTemplatedInfo().isKeepNull(),
-        typeProvider.getTemplatedInfo().getScanOrder());
+    boolean fromWhere = ReadWriteIOUtils.readBool(byteBuffer);
+    if (!fromWhere) {
+      int outputExpressionsLength = ReadWriteIOUtils.readInt(byteBuffer);
+      Expression[] outputExpressions = new Expression[outputExpressionsLength];
+      for (int i = 0; i < outputExpressionsLength; ++i) {
+        outputExpressions[i] = Expression.deserialize(byteBuffer);
+      }
+      Expression predicate = Expression.deserialize(byteBuffer);
+      boolean keepNull = ReadWriteIOUtils.readBool(byteBuffer);
+      Ordering scanOrder = 
Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+      return new FilterNode(planNodeId, outputExpressions, predicate, 
keepNull, scanOrder, false);
+    } else {
+      return new FilterNode(
+          planNodeId,
+          null,
+          typeProvider.getTemplatedInfo().getPredicate(),
+          typeProvider.getTemplatedInfo().isKeepNull(),
+          typeProvider.getTemplatedInfo().getScanOrder(),
+          true);
+    }
   }
 
   public Expression getPredicate() {
@@ -142,6 +175,10 @@ public class FilterNode extends TransformNode {
     this.outputExpressions = outputExpressions;
   }
 
+  public boolean isFromWhere() {
+    return fromWhere;
+  }
+
   @Override
   public String toString() {
     return "FilterNode-" + this.getPlanNodeId();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
index 345d2977088..dafcbdb9409 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
@@ -124,6 +124,26 @@ public class RawDataAggregationNode extends 
SingleChildProcessNode {
     this.outputEndTime = outputEndTime;
   }
 
+  // to avoid the repeated invoking of getDeduplicatedDescriptors method
+  public RawDataAggregationNode(
+      PlanNodeId id,
+      PlanNode child,
+      List<AggregationDescriptor> deduplicatedAggregationDescriptorList,
+      @Nullable GroupByTimeParameter groupByTimeParameter,
+      @Nullable GroupByParameter groupByParameter,
+      Expression groupByExpression,
+      boolean outputEndTime,
+      Ordering scanOrder,
+      boolean useDeduplicatedDescriptors) {
+    super(id, child);
+    this.aggregationDescriptorList = deduplicatedAggregationDescriptorList;
+    this.scanOrder = scanOrder;
+    this.groupByParameter = groupByParameter;
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.groupByExpression = groupByExpression;
+    this.outputEndTime = outputEndTime;
+  }
+
   public List<AggregationDescriptor> getAggregationDescriptorList() {
     return aggregationDescriptorList;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/SingleDeviceViewNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/SingleDeviceViewNode.java
index c1b3898ed28..263223c5d5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/SingleDeviceViewNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/SingleDeviceViewNode.java
@@ -190,7 +190,7 @@ public class SingleDeviceViewNode extends 
SingleChildProcessNode {
     return new SingleDeviceViewNode(
         planNodeId,
         cacheOutputColumnNames,
-        typeProvider.getTemplatedInfo().getSelectMeasurements(),
+        typeProvider.getTemplatedInfo().getDeviceViewOutputNames(),
         device,
         typeProvider.getTemplatedInfo().getDeviceToMeasurementIndexes());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
index 2b69cdb0bc4..7bdbea3a577 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -259,6 +260,41 @@ public class AlignedSeriesAggregationScanNode extends 
SeriesAggregationSourceNod
         null);
   }
 
+  @Override
+  public void serializeUseTemplate(DataOutputStream stream, TypeProvider 
typeProvider)
+      throws IOException {
+    PlanNodeType.ALIGNED_SERIES_AGGREGATE_SCAN.serialize(stream);
+    id.serialize(stream);
+    ReadWriteIOUtils.write(alignedPath.getNodes().length, stream);
+    for (String node : alignedPath.getNodes()) {
+      ReadWriteIOUtils.write(node, stream);
+    }
+  }
+
+  public static AlignedSeriesAggregationScanNode deserializeUseTemplate(
+      ByteBuffer byteBuffer, TypeProvider typeProvider) {
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+
+    int nodeSize = ReadWriteIOUtils.readInt(byteBuffer);
+    String[] nodes = new String[nodeSize];
+    for (int i = 0; i < nodeSize; i++) {
+      nodes[i] = ReadWriteIOUtils.readString(byteBuffer);
+    }
+    AlignedPath alignedPath = new AlignedPath(new PartialPath(nodes));
+    
alignedPath.setMeasurementList(typeProvider.getTemplatedInfo().getMeasurementList());
+    alignedPath.addSchemas(typeProvider.getTemplatedInfo().getSchemaList());
+
+    return new AlignedSeriesAggregationScanNode(
+        planNodeId,
+        alignedPath,
+        typeProvider.getTemplatedInfo().aggregationDescriptorList,
+        typeProvider.getTemplatedInfo().getScanOrder(),
+        typeProvider.getTemplatedInfo().outputEndTime,
+        typeProvider.getTemplatedInfo().getPushDownPredicate(),
+        typeProvider.getTemplatedInfo().groupByTimeParameter,
+        null);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
index d266de50aaa..c7758007d36 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -265,7 +265,7 @@ public class AggregationDescriptor {
 
   protected String getInputString(List<Expression> expressions) {
     StringBuilder builder = new StringBuilder();
-    if (!(expressions.size() == 0)) {
+    if (!(expressions.isEmpty())) {
       builder.append(expressions.get(0).getExpressionString());
       for (int i = 1; i < expressions.size(); ++i) {
         builder.append(", ").append(expressions.get(i).getExpressionString());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/GroupByTimeParameter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/GroupByTimeParameter.java
index 9de54f85c0a..d4b979f91a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/GroupByTimeParameter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/GroupByTimeParameter.java
@@ -162,4 +162,8 @@ public class GroupByTimeParameter {
   public int hashCode() {
     return Objects.hash(startTime, endTime, interval, slidingStep, 
leftCRightO);
   }
+
+  public GroupByTimeParameter clone() {
+    return new GroupByTimeParameter(startTime, endTime, interval, slidingStep, 
leftCRightO);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
index 0ca53b7b84c..372b803d954 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
@@ -365,7 +365,8 @@ public class TestPlanBuilder {
             expressions.toArray(new Expression[0]),
             predicate,
             isGroupByTime,
-            Ordering.ASC);
+            Ordering.ASC,
+            true);
     return this;
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java
index adb25ea1a00..2a14c0cf3b2 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java
@@ -194,7 +194,8 @@ public class DataQueryLogicalPlannerTest {
             new Expression[] {new 
TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))},
             predicate,
             false,
-            Ordering.DESC);
+            Ordering.DESC,
+            true);
 
     OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(), 
filterNode, 100);
     LimitNode limitNode = new LimitNode(queryId.genPlanNodeId(), offsetNode, 
100);
@@ -249,7 +250,8 @@ public class DataQueryLogicalPlannerTest {
             },
             predicate1,
             false,
-            Ordering.DESC);
+            Ordering.DESC,
+            true);
 
     List<PlanNode> sourceNodeList2 = new ArrayList<>();
     sourceNodeList2.add(
@@ -287,7 +289,8 @@ public class DataQueryLogicalPlannerTest {
             },
             predicate2,
             false,
-            Ordering.DESC);
+            Ordering.DESC,
+            true);
 
     Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
     deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2, 3));
@@ -742,7 +745,8 @@ public class DataQueryLogicalPlannerTest {
             },
             predicate,
             false,
-            Ordering.DESC);
+            Ordering.DESC,
+            true);
 
     RawDataAggregationNode aggregationNode =
         new RawDataAggregationNode(
@@ -867,7 +871,8 @@ public class DataQueryLogicalPlannerTest {
             },
             predicate1,
             false,
-            Ordering.DESC);
+            Ordering.DESC,
+            true);
 
     RawDataAggregationNode aggregationNode1 =
         new RawDataAggregationNode(
@@ -922,7 +927,8 @@ public class DataQueryLogicalPlannerTest {
             },
             predicate2,
             false,
-            Ordering.DESC);
+            Ordering.DESC,
+            true);
 
     RawDataAggregationNode aggregationNode2 =
         new RawDataAggregationNode(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/FilterNodeSerdeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/FilterNodeSerdeTest.java
index ad5e28f27b3..998df88719b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/FilterNodeSerdeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/FilterNodeSerdeTest.java
@@ -52,7 +52,8 @@ public class FilterNodeSerdeTest {
                 new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")),
                 new ConstantOperand(TSDataType.INT64, "100")),
             false,
-            Ordering.ASC);
+            Ordering.ASC,
+            true);
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     filterNode.serialize(byteBuffer);

Reply via email to