This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/agg_plan_device_cross_region
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to 
refs/heads/beyyes/agg_plan_device_cross_region by this push:
     new b480e5964ff temp
b480e5964ff is described below

commit b480e5964ffd689da6dc0803de63cf6a0dc0b3cb
Author: Beyyes <[email protected]>
AuthorDate: Wed Feb 28 01:00:22 2024 +0800

    temp
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../process/AggregationMergeSortOperator.java      | 138 +++++++++++++++++
 .../plan/planner/OperatorTreeGenerator.java        |  35 +++++
 .../plan/planner/distribution/SourceRewriter.java  | 170 +++++++++++++++++++--
 .../node/process/AggregationMergeSortNode.java     |  22 ++-
 .../planner/plan/node/process/DeviceViewNode.java  |   6 +-
 .../plan/parameter/AggregationDescriptor.java      |  11 +-
 .../datanode1conf/iotdb-common.properties          |   3 +
 .../datanode2conf/iotdb-common.properties          |   3 +
 .../datanode3conf/iotdb-common.properties          |   4 +-
 11 files changed, 368 insertions(+), 28 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 608910104f1..d7f91373dc6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -72,7 +72,7 @@ public class ConfigNodeConfig {
   private int dataReplicationFactor = 1;
 
   /** Number of SeriesPartitionSlots per Database. */
-  private int seriesSlotNum = 1000;
+  private int seriesSlotNum = 5;
 
   /** SeriesPartitionSlot executor class. */
   private String seriesPartitionExecutorClass =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 854401d7b5e..b18a7d42dcb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -585,7 +585,7 @@ public class IoTDBConfig {
   private long cacheFileReaderClearPeriod = 100000;
 
   /** the max executing time of query in ms. Unit: millisecond */
-  private long queryTimeoutThreshold = 60000;
+  private long queryTimeoutThreshold = 60000000;
 
   /** the max time to live of a session in ms. Unit: millisecond */
   private int sessionTimeoutThreshold = 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
new file mode 100644
index 00000000000..c280ab2eb36
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class AggregationMergeSortOperator extends AbstractConsumeAllOperator {
+
+  // private final ITimeRangeIterator timeRangeIterator;
+
+  // Current interval of aggregation window [curStartTime, curEndTime)
+  private TimeRange curTimeRange;
+
+  private final List<TSDataType> dataTypes;
+  private final TsBlockBuilder tsBlockBuilder;
+
+  private final boolean[] noMoreTsBlocks;
+
+  private boolean finished;
+
+  private boolean currentFinished;
+
+  private String currentDevice;
+
+  private long currentTime;
+
+  public AggregationMergeSortOperator(
+      OperatorContext operatorContext, List<Operator> children, 
List<TSDataType> dataTypes) {
+    super(operatorContext, children);
+    this.dataTypes = dataTypes;
+    this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    this.noMoreTsBlocks = new boolean[this.inputOperatorsCount];
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    long startTime = System.nanoTime();
+    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+
+    // 1. fill consumed up TsBlock
+    if (!prepareInput()) {
+      return null;
+    }
+
+    tsBlockBuilder.reset();
+    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
+    for (TsBlock tsBlock : inputTsBlocks) {
+      timeBuilder.writeLong(tsBlock.getTimeColumn().getLong(0));
+      
valueColumnBuilders[0].writeBinary(tsBlock.getValueColumns()[0].getBinary(0));
+    }
+
+    return tsBlockBuilder.build();
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    // TODO the child of DeviceViewNode already calc TimeRange?
+    // return curTimeRange != null || timeRangeIterator.hasNextTimeRange();
+
+    if (finished) {
+      return false;
+    }
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!isEmpty(i)) {
+        return true;
+      } else if (!noMoreTsBlocks[i]) {
+        if (!canCallNext[i] || children.get(i).hasNextWithTimer()) {
+          return true;
+        } else {
+          children.get(i).close();
+          children.set(i, null);
+          noMoreTsBlocks[i] = true;
+          inputTsBlocks[i] = null;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    if (finished) {
+      return true;
+    }
+    finished = true;
+
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] || !isEmpty(i)) {
+        finished = false;
+        break;
+      }
+    }
+    return finished;
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return 0;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 0;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 1b5f9508638..61174b5e776 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -45,6 +45,7 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManage
 import org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.AggregationMergeSortOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.ColumnInjectOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewIntoOperator;
@@ -166,6 +167,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewIntoNode;
@@ -857,6 +859,39 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         MergeSortComparator.getComparator(sortItemList, sortItemIndexList, 
sortItemDataTypeList));
   }
 
+  @Override
+  public Operator visitAggregationMergeSort(
+      AggregationMergeSortNode node, LocalExecutionPlanContext context) {
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                MergeSortOperator.class.getSimpleName());
+    List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+    List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, 
context);
+
+    //    for (Expression expression : selectExpressions) {
+    //      if (expression instanceof FunctionExpression) {
+    //        FunctionExpression functionExpression = (FunctionExpression) 
expression;
+    //        String functionName = functionExpression.getFunctionName();
+    //        expression.getExpressionType();
+    //        Accumulator accumulator = AccumulatorFactory.createAccumulator(
+    //                functionName,
+    //                aggregationType,
+    //
+    // 
Collections.singletonList(context.getTypeProvider().getType(functionExpression.getOutputSymbol())),
+    //                null,
+    //                null,
+    //                true,
+    //                true);
+    //      }
+    //    }
+
+    return new AggregationMergeSortOperator(operatorContext, children, 
dataTypes);
+  }
+
   @Override
   public Operator visitTopK(TopKNode node, LocalExecutionPlanContext context) {
     OperatorContext operatorContext =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 53814483e4c..57c807c8a41 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.BaseSourceRewriter;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -72,8 +73,10 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParame
 import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -84,8 +87,13 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.LAST_VALUE;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder.updateTypeProviderByPartialAggregation;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.AVG;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_VALUE;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.TIME_DURATION;
 
 public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext> {
 
@@ -123,7 +131,6 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
   public List<PlanNode> visitSingleDeviceView(
       SingleDeviceViewNode node, DistributionPlanContext context) {
 
-    // Same process logic as visitDeviceView
     if (analysis.isDeviceViewSpecialProcess()) {
       List<PlanNode> rewroteChildren = rewrite(node.getChild(), context);
       if (rewroteChildren.size() != 1) {
@@ -190,9 +197,10 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
       if (regionReplicaSets.size() > 1) {
         // specialProcess and existDeviceCrossRegion, use the old aggregation 
logic
         analysis.setExistDeviceCrossRegion();
-        if (analysis.isDeviceViewSpecialProcess()) {
-          return processSpecialDeviceView(node, context);
-        }
+        // TODO group by session, variation, count, count_if no not use old 
logic
+        // if (analysis.isDeviceViewSpecialProcess()) {
+        //  return processSpecialDeviceView(node, context);
+        // }
       }
       deviceViewSplits.add(new DeviceViewSplit(outputDevice, child, 
regionReplicaSets));
       relatedDataRegions.addAll(regionReplicaSets);
@@ -217,13 +225,78 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
       return deviceViewNodeList;
     }
 
-    MergeSortNode mergeSortNode =
-        new MergeSortNode(
-            context.queryContext.getQueryId().genPlanNodeId(),
-            node.getMergeOrderParameter(),
-            node.getOutputColumnNames());
-    deviceViewNodeList.forEach(mergeSortNode::addChild);
-    return Collections.singletonList(mergeSortNode);
+    if (analysis.isExistDeviceCrossRegion() && 
analysis.isDeviceViewSpecialProcess()) {
+      // return processSpecialDeviceView(node, context);
+
+      // TODO 1. generate old and new measurement idx relationship 2. generate 
new outputColumns for
+      // each subDeviceView
+//      Map<Integer, List<Integer>> newMeasurementIdxMap = new HashMap<>();
+//      List<String> newPartialOutputColumns = new ArrayList<>();
+//
+      Set<Expression> selectExpressions = analysis.getSelectExpressions();
+//
+//      int i = 0, idxSum = 0;
+//      for (Expression expression : selectExpressions) {
+//        if (i == 0) {
+//          // device
+//          newPartialOutputColumns.add(expression.getOutputSymbol());
+//          i++;
+//          idxSum++;
+//          continue;
+//        }
+//        FunctionExpression aggExpression = (FunctionExpression) expression;
+//        List<String> actualPartialAggregationNames =
+//            
getActualPartialAggregationNames(aggExpression.getFunctionName());
+//        for (String actualAggName : actualPartialAggregationNames) {
+//          newPartialOutputColumns.add(
+//              new FunctionExpression(
+//                      actualAggName,
+//                      aggExpression.getFunctionAttributes(),
+//                      aggExpression.getExpressions())
+//                  .getOutputSymbol());
+//        }
+//        // TODO need update typeProvider?
+//        if (actualPartialAggregationNames.size() > 1) {
+//          newMeasurementIdxMap.put(i, Arrays.asList(idxSum++, idxSum++));
+//        } else {
+//          newMeasurementIdxMap.put(i, Collections.singletonList(idxSum++));
+//        }
+//        i++;
+//      }
+
+//      for (String device : node.getDevices()) {
+//        List<Integer> oldMeasurementList =
+//                node.getDeviceToMeasurementIndexesMap().get(device);
+//        List<Integer> newMeasurementList = new ArrayList<>();
+//        for (int idx : oldMeasurementList) {
+//          newMeasurementList.addAll(newMeasurementIdxMap.get(idx));
+//        }
+//        node.getDeviceToMeasurementIndexesMap().put(device, 
newMeasurementList);
+//      }
+
+      for (PlanNode planNode : deviceViewNodeList) {
+        DeviceViewNode deviceViewNode = (DeviceViewNode) planNode;
+        // deviceViewNode.setOutputColumnNames(newPartialOutputColumns);
+        transferAggregatorsRecursively2(planNode, context);
+      }
+
+      AggregationMergeSortNode mergeSortNode =
+          new AggregationMergeSortNode(
+              context.queryContext.getQueryId().genPlanNodeId(),
+              node.getMergeOrderParameter(),
+              node.getOutputColumnNames(),
+              selectExpressions);
+      deviceViewNodeList.forEach(mergeSortNode::addChild);
+      return Collections.singletonList(mergeSortNode);
+    } else {
+      MergeSortNode mergeSortNode =
+          new MergeSortNode(
+              context.queryContext.getQueryId().genPlanNodeId(),
+              node.getMergeOrderParameter(),
+              node.getOutputColumnNames());
+      deviceViewNodeList.forEach(mergeSortNode::addChild);
+      return Collections.singletonList(mergeSortNode);
+    }
   }
 
   private void constructDeviceViewNodeListWithCrossRegion(
@@ -286,9 +359,74 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
     }
   }
 
+  public List<String> getActualPartialAggregationNames(String aggregationType) 
{
+    List<String> outputAggregationNames = new ArrayList<>();
+    switch (aggregationType) {
+      case AVG:
+        outputAggregationNames.add(SqlConstant.COUNT);
+        outputAggregationNames.add(SqlConstant.SUM);
+        break;
+      case FIRST_VALUE:
+        outputAggregationNames.add(FIRST_VALUE);
+        outputAggregationNames.add(SqlConstant.MIN_TIME);
+        break;
+      case LAST_VALUE:
+        outputAggregationNames.add(SqlConstant.LAST_VALUE);
+        outputAggregationNames.add(SqlConstant.MAX_TIME);
+        break;
+      case TIME_DURATION:
+        outputAggregationNames.add(SqlConstant.MAX_TIME);
+        outputAggregationNames.add(SqlConstant.MIN_TIME);
+        break;
+      default:
+        // TODO how about UDAF?
+        outputAggregationNames.add(aggregationType);
+    }
+    return outputAggregationNames;
+  }
+
+  private void transferAggregatorsRecursively2(PlanNode planNode, 
DistributionPlanContext context) {
+    if (planNode instanceof SeriesAggregationSourceNode) {
+      SeriesAggregationSourceNode scanSourceNode = 
(SeriesAggregationSourceNode) planNode;
+      for (AggregationDescriptor descriptor : 
scanSourceNode.getAggregationDescriptorList()) {
+        descriptor.setStep(AggregationStep.PARTIAL);
+        updateTypeProviderByPartialAggregation(descriptor, 
context.queryContext.getTypeProvider());
+      }
+    }
+
+    for (PlanNode child : planNode.getChildren()) {
+      transferAggregatorsRecursively2(child, context);
+    }
+  }
+
+  private void transferAggregatorsRecursively(PlanNode planNode) {
+    for (PlanNode child : planNode.getChildren()) {
+      transferAggregatorsRecursively(child);
+
+      if (child instanceof SeriesAggregationSourceNode) {
+        SeriesAggregationSourceNode scanSourceNode = 
(SeriesAggregationSourceNode) child;
+        List<AggregationDescriptor> newDescriptorList = new ArrayList<>();
+        for (AggregationDescriptor descriptor : 
scanSourceNode.getAggregationDescriptorList()) {
+          List<String> aggregationNames = 
descriptor.getActualAggregationNames(true);
+          for (String aggregationName : aggregationNames) {
+            newDescriptorList.add(
+                    new AggregationDescriptor(
+                            aggregationName,
+                            AggregationStep.PARTIAL,
+                            descriptor.getInputExpressions(),
+                            descriptor.getInputAttributes()));
+          }
+        }
+        scanSourceNode.setAggregationDescriptorList(newDescriptorList);
+      }
+
+    }
+  }
+
   @Override
   public List<PlanNode> visitAggregationMergeSort(
       AggregationMergeSortNode node, DistributionPlanContext context) {
+    // TODO remove this method?
     return null;
   }
 
@@ -622,7 +760,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
                         descriptor.getInputAttributes())));
     leafAggDescriptorList.forEach(
         d ->
-            LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+            updateTypeProviderByPartialAggregation(
                 d, context.queryContext.getTypeProvider()));
     List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
     node.getAggregationDescriptorList()
@@ -1308,7 +1446,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
         }
         if (keep) {
           descriptorList.add(originalDescriptor);
-          LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+          updateTypeProviderByPartialAggregation(
               originalDescriptor, context.queryContext.getTypeProvider());
         }
       }
@@ -1352,7 +1490,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
         descriptor.setStep(level == 0 ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE);
         descriptor.setInputExpressions(new ArrayList<>(descriptorExpressions));
         descriptorList.add(descriptor);
-        LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+        updateTypeProviderByPartialAggregation(
             descriptor, context.queryContext.getTypeProvider());
       }
       handle.setGroupByLevelDescriptors(descriptorList);
@@ -1450,7 +1588,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
           .forEach(
               d -> {
                 d.setStep(AggregationStep.PARTIAL);
-                LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+                updateTypeProviderByPartialAggregation(
                     d, context.queryContext.getTypeProvider());
               });
     }
@@ -1489,7 +1627,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
                 } else {
                   eachSeriesOneRegion[0] = false;
                   descriptor.setStep(AggregationStep.PARTIAL);
-                  LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+                  updateTypeProviderByPartialAggregation(
                       descriptor, context.queryContext.getTypeProvider());
                 }
               });
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
index 41c4cfb2b55..c446715efae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
 
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -32,6 +33,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 public class AggregationMergeSortNode extends MultiChildProcessNode {
 
@@ -39,21 +41,29 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
 
   private final List<String> outputColumns;
 
+  private final Set<Expression> selectExpressions;
+
   public AggregationMergeSortNode(
-      PlanNodeId id, OrderByParameter mergeOrderParameter, List<String> 
outputColumns) {
+      PlanNodeId id,
+      OrderByParameter mergeOrderParameter,
+      List<String> outputColumns,
+      Set<Expression> selectExpressions) {
     super(id);
     this.mergeOrderParameter = mergeOrderParameter;
     this.outputColumns = outputColumns;
+    this.selectExpressions = selectExpressions;
   }
 
   public AggregationMergeSortNode(
       PlanNodeId id,
       List<PlanNode> children,
       OrderByParameter mergeOrderParameter,
-      List<String> outputColumns) {
+      List<String> outputColumns,
+      Set<Expression> selectExpressions) {
     super(id, children);
     this.mergeOrderParameter = mergeOrderParameter;
     this.outputColumns = outputColumns;
+    this.selectExpressions = selectExpressions;
   }
 
   public OrderByParameter getMergeOrderParameter() {
@@ -62,7 +72,8 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new AggregationMergeSortNode(getPlanNodeId(), 
getMergeOrderParameter(), outputColumns);
+    return new AggregationMergeSortNode(
+        getPlanNodeId(), getMergeOrderParameter(), outputColumns, 
selectExpressions);
   }
 
   @Override
@@ -71,7 +82,8 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
         new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
         new ArrayList<>(children.subList(startIndex, endIndex)),
         getMergeOrderParameter(),
-        outputColumns);
+        outputColumns,
+        selectExpressions);
   }
 
   @Override
@@ -113,7 +125,7 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
       columnSize--;
     }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new AggregationMergeSortNode(planNodeId, orderByParameter, 
outputColumns);
+    return new AggregationMergeSortNode(planNodeId, orderByParameter, 
outputColumns, null);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
index 4d4b9b67049..bc7204a0d40 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
@@ -52,7 +52,7 @@ public class DeviceViewNode extends MultiChildProcessNode {
   private final List<String> devices = new ArrayList<>();
 
   // Device column and measurement columns in result output
-  private final List<String> outputColumnNames;
+  private List<String> outputColumnNames;
 
   // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> 
[1, 3], s1 is 1 but
   // not 0 because device is the first column
@@ -114,6 +114,10 @@ public class DeviceViewNode extends MultiChildProcessNode {
     return outputColumnNames;
   }
 
+  public void setOutputColumnNames(List<String> outputColumnNames) {
+    this.outputColumnNames = outputColumnNames;
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitDeviceView(this, context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
index 69eb807711e..ff6abfe2017 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -36,8 +36,13 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.LAST_VALUE;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.addPartialSuffix;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.isBuiltinAggregationName;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.AVG;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_VALUE;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.STDDEV;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.TIME_DURATION;
 
 public class AggregationDescriptor {
 
@@ -143,7 +148,7 @@ public class AggregationDescriptor {
   }
 
   /** Keep the lower case of function name for partial result, and origin 
value for others. */
-  protected List<String> getActualAggregationNames(boolean isPartial) {
+  public List<String> getActualAggregationNames(boolean isPartial) {
     List<String> outputAggregationNames = new ArrayList<>();
     if (isPartial) {
       switch (aggregationType) {
@@ -152,7 +157,7 @@ public class AggregationDescriptor {
           outputAggregationNames.add(SqlConstant.SUM);
           break;
         case FIRST_VALUE:
-          outputAggregationNames.add(SqlConstant.FIRST_VALUE);
+          outputAggregationNames.add(FIRST_VALUE);
           outputAggregationNames.add(SqlConstant.MIN_TIME);
           break;
         case LAST_VALUE:
@@ -164,7 +169,7 @@ public class AggregationDescriptor {
           outputAggregationNames.add(SqlConstant.MIN_TIME);
           break;
         case STDDEV:
-          outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV));
+          outputAggregationNames.add(addPartialSuffix(STDDEV));
           break;
         case STDDEV_POP:
           outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV_POP));
diff --git 
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties 
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
index b89ef13778b..4ef45b29710 100644
--- 
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
+++ 
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
@@ -17,6 +17,9 @@
 # under the License.
 #
 
+query_timeout_threshold=60000000
+series_slot_num=5
+data_replication_factor=1
 timestamp_precision=ms
 udf_lib_dir=target/datanode1/ext/udf
 trigger_lib_dir=target/datanode1/ext/trigger
diff --git 
a/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-common.properties 
b/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-common.properties
index 9cf060d61fd..1d3ce663a5b 100644
--- 
a/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-common.properties
+++ 
b/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-common.properties
@@ -16,6 +16,9 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+query_timeout_threshold=60000000
+series_slot_num=5
+data_replication_factor=1
 timestamp_precision=ms
 udf_lib_dir=target/datanode2/ext/udf
 trigger_lib_dir=target/datanode2/ext/trigger
diff --git 
a/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties 
b/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
index 83dbc1b051e..cac71c4ccfd 100644
--- 
a/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
+++ 
b/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
@@ -16,7 +16,9 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
+query_timeout_threshold=60000000
+series_slot_num=5
+data_replication_factor=1
 timestamp_precision=ms
 udf_lib_dir=target/datanode3/ext/udf
 trigger_lib_dir=target/datanode3/ext/trigger

Reply via email to