This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/agg_plan_device_cross_region
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to 
refs/heads/beyyes/agg_plan_device_cross_region by this push:
     new 1dddfda57e7 temp
1dddfda57e7 is described below

commit 1dddfda57e730e05606a4ceed081cc1aa08bf871
Author: Beyyes <[email protected]>
AuthorDate: Wed Feb 28 21:13:53 2024 +0800

    temp
---
 .../execution/aggregation/Accumulator.java         |   5 +
 .../execution/aggregation/AvgAccumulator.java      |   5 +
 .../execution/aggregation/CountAccumulator.java    |   5 +
 .../execution/aggregation/CountIfAccumulator.java  |   5 +
 .../aggregation/CountTimeAccumulator.java          |   5 +
 .../execution/aggregation/ExtremeAccumulator.java  |   5 +
 .../aggregation/FirstValueAccumulator.java         |   5 +
 .../aggregation/LastValueAccumulator.java          |   5 +
 .../execution/aggregation/MaxValueAccumulator.java |   5 +
 .../execution/aggregation/MinTimeAccumulator.java  |   5 +
 .../execution/aggregation/MinValueAccumulator.java |   5 +
 .../aggregation/TimeDurationAccumulator.java       |   5 +
 .../execution/aggregation/UDAFAccumulator.java     |   5 +
 .../execution/aggregation/VarianceAccumulator.java |   5 +
 .../process/AggregationMergeSortOperator.java      | 173 ++++++++++++++++++---
 .../plan/planner/OperatorTreeGenerator.java        |  68 +++++---
 .../plan/planner/distribution/SourceRewriter.java  | 126 ++++++++-------
 .../node/process/AggregationMergeSortNode.java     |  25 ++-
 .../plan/parameter/AggregationDescriptor.java      |   2 +-
 19 files changed, 361 insertions(+), 103 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
index 9386903f493..7d765aa5857 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
@@ -85,4 +85,9 @@ public interface Accumulator {
   TSDataType[] getIntermediateType();
 
   TSDataType getFinalType();
+
+  default int getPartialResultSize() {
+    throw new UnsupportedOperationException(
+        "This type of accumulator does not support getPartialResultSize!");
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
index 318dbb7ada2..66ff521ce69 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
@@ -163,6 +163,11 @@ public class AvgAccumulator implements Accumulator {
     return TSDataType.DOUBLE;
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 2;
+  }
+
   private void addIntInput(Column[] column, BitMap bitMap) {
     int count = column[0].getPositionCount();
     for (int i = 0; i < count; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountAccumulator.java
index d7e1d621033..3273629490f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountAccumulator.java
@@ -121,4 +121,9 @@ public class CountAccumulator implements Accumulator {
   public TSDataType getFinalType() {
     return TSDataType.INT64;
   }
+
+  @Override
+  public int getPartialResultSize() {
+    return 0;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountIfAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountIfAccumulator.java
index 8638ef6e3c6..0f20730618e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountIfAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountIfAccumulator.java
@@ -143,4 +143,9 @@ public class CountIfAccumulator implements Accumulator {
   public TSDataType getFinalType() {
     return TSDataType.INT64;
   }
+
+  @Override
+  public int getPartialResultSize() {
+    return 1;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountTimeAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountTimeAccumulator.java
index a8a7d888762..e7ed4e57ac2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountTimeAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CountTimeAccumulator.java
@@ -108,4 +108,9 @@ public class CountTimeAccumulator implements Accumulator {
   public TSDataType getFinalType() {
     return TSDataType.INT64;
   }
+
+  @Override
+  public int getPartialResultSize() {
+    return 1;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
index 2ae041cf1c8..fc20b23ac32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
@@ -225,6 +225,11 @@ public class ExtremeAccumulator implements Accumulator {
     return extremeResult.getDataType();
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 1;
+  }
+
   private void addIntInput(Column[] column, BitMap bitMap) {
     int count = column[0].getPositionCount();
     for (int i = 0; i < count; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
index d4262d453b2..8514f67091d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
@@ -252,6 +252,11 @@ public class FirstValueAccumulator implements Accumulator {
     return firstValue.getDataType();
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 2;
+  }
+
   protected void addIntInput(Column[] column, BitMap bitMap) {
     int count = column[0].getPositionCount();
     for (int i = 0; i < count; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
index 9891b1a528f..6d38324559a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
@@ -252,6 +252,11 @@ public class LastValueAccumulator implements Accumulator {
     return lastValue.getDataType();
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 2;
+  }
+
   protected void addIntInput(Column[] column, BitMap bitMap) {
     int count = column[0].getPositionCount();
     for (int i = 0; i < count; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxValueAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxValueAccumulator.java
index 15c07ba154c..7b9f4b9e3c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxValueAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxValueAccumulator.java
@@ -223,6 +223,11 @@ public class MaxValueAccumulator implements Accumulator {
     return maxResult.getDataType();
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 1;
+  }
+
   private void addIntInput(Column[] column, BitMap bitMap) {
     int count = column[0].getPositionCount();
     for (int i = 0; i < count; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinTimeAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinTimeAccumulator.java
index a3d0ec9a3a4..8a99be7cba2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinTimeAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinTimeAccumulator.java
@@ -119,6 +119,11 @@ public class MinTimeAccumulator implements Accumulator {
     return TSDataType.INT64;
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 1;
+  }
+
   protected void updateMinTime(long curTime) {
     hasCandidateResult = true;
     minTime = Math.min(minTime, curTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinValueAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinValueAccumulator.java
index 14c18e1f45c..e768cffab39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinValueAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinValueAccumulator.java
@@ -223,6 +223,11 @@ public class MinValueAccumulator implements Accumulator {
     return minResult.getDataType();
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 1;
+  }
+
   private void addIntInput(Column[] column, BitMap bitMap) {
     int count = column[0].getPositionCount();
     for (int i = 0; i < count; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
index 095cece46ea..b0a8dd9f33d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
@@ -113,6 +113,11 @@ public class TimeDurationAccumulator implements 
Accumulator {
     return TSDataType.INT64;
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 2;
+  }
+
   protected void updateMaxTime(long curTime) {
     initResult = true;
     maxTime = Math.max(maxTime, curTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java
index e9e875205f8..fd4065d43ab 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java
@@ -195,6 +195,11 @@ public class UDAFAccumulator implements Accumulator {
     return 
UDFDataTypeTransformer.transformToTsDataType(configurations.getOutputDataType());
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 1;
+  }
+
   private void onError(String methodName, Exception e) {
     LOGGER.warn(
         "Error occurred during executing UDAF, please check whether the 
implementation of UDF is correct according to the udf-api description.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/VarianceAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/VarianceAccumulator.java
index 1dfc265f6b1..f3604bcb5cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/VarianceAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/VarianceAccumulator.java
@@ -211,6 +211,11 @@ public class VarianceAccumulator implements Accumulator {
     return TSDataType.DOUBLE;
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 1;
+  }
+
   private void addIntInput(Column[] columns, BitMap bitmap) {
     int size = columns[0].getPositionCount();
     for (int i = 0; i < size; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
index c280ab2eb36..b83e491e522 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
@@ -19,8 +19,11 @@
 
 package org.apache.iotdb.db.queryengine.execution.operator.process;
 
+import org.apache.iotdb.db.queryengine.execution.aggregation.Accumulator;
+import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -28,15 +31,22 @@ import 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-public class AggregationMergeSortOperator extends AbstractConsumeAllOperator {
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+import static 
org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory.createAccumulator;
 
-  // private final ITimeRangeIterator timeRangeIterator;
+public class AggregationMergeSortOperator extends AbstractConsumeAllOperator {
 
-  // Current interval of aggregation window [curStartTime, curEndTime)
-  private TimeRange curTimeRange;
+  private List<Accumulator> accumulators;
 
   private final List<TSDataType> dataTypes;
   private final TsBlockBuilder tsBlockBuilder;
@@ -45,18 +55,37 @@ public class AggregationMergeSortOperator extends 
AbstractConsumeAllOperator {
 
   private boolean finished;
 
+  private Map<String, List<Aggregator>> aggMap;
+
+  private final TimeComparator timeComparator;
+
+  private final Comparator<Binary> deviceComparator;
+
   private boolean currentFinished;
 
-  private String currentDevice;
+  private Binary currentDevice;
 
   private long currentTime;
 
+  private int[] readIndex;
+
+  List<Integer> newAggregationIdx;
+
   public AggregationMergeSortOperator(
-      OperatorContext operatorContext, List<Operator> children, 
List<TSDataType> dataTypes) {
+      OperatorContext operatorContext,
+      List<Operator> children,
+      List<TSDataType> dataTypes,
+      List<Accumulator> accumulators,
+      TimeComparator timeComparator,
+      Comparator<Binary> deviceComparator) {
     super(operatorContext, children);
     this.dataTypes = dataTypes;
     this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
     this.noMoreTsBlocks = new boolean[this.inputOperatorsCount];
+    this.accumulators = accumulators;
+    this.timeComparator = timeComparator;
+    this.deviceComparator = deviceComparator;
+    readIndex = new int[inputTsBlocks.length];
   }
 
   @Override
@@ -64,7 +93,6 @@ public class AggregationMergeSortOperator extends 
AbstractConsumeAllOperator {
     long startTime = System.nanoTime();
     long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
 
-    // 1. fill consumed up TsBlock
     if (!prepareInput()) {
       return null;
     }
@@ -72,9 +100,71 @@ public class AggregationMergeSortOperator extends 
AbstractConsumeAllOperator {
     tsBlockBuilder.reset();
     TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
     ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
-    for (TsBlock tsBlock : inputTsBlocks) {
-      timeBuilder.writeLong(tsBlock.getTimeColumn().getLong(0));
-      
valueColumnBuilders[0].writeBinary(tsBlock.getValueColumns()[0].getBinary(0));
+
+    while (true) {
+      currentDevice = null;
+
+      for (int idx = 0; idx < inputTsBlocks.length; idx++) {
+        TsBlock tsBlock = inputTsBlocks[idx];
+        if (!noMoreTsBlocks[idx] && tsBlock == null) {
+          return null;
+        }
+
+        if (readIndex[idx] >= tsBlock.getPositionCount()) {
+          inputTsBlocks[idx] = null;
+        }
+
+        Binary device = tsBlock.getColumn(0).getBinary(readIndex[idx]);
+
+        if (currentDevice == null) {
+          currentDevice = device;
+        } else {
+          if (deviceComparator.compare(device, currentDevice) < 0) {
+            currentDevice = device;
+          }
+        }
+      }
+
+      if (currentDevice == null) {
+        break;
+      }
+
+      for (int idx = 0; idx < inputTsBlocks.length; idx++) {
+        TsBlock tsBlock = inputTsBlocks[idx];
+        if (tsBlock == null) {
+          continue;
+        }
+
+        if (readIndex[idx] >= tsBlock.getPositionCount()) {
+          inputTsBlocks[idx] = null;
+        }
+
+        Binary device = tsBlock.getColumn(0).getBinary(readIndex[idx]);
+        if (device.equals(currentDevice)) {
+          currentTime = tsBlock.getTimeColumn().getLong(readIndex[idx]);
+          int cnt = 0;
+          for (int i = 0; i < accumulators.size(); i++) {
+            Accumulator accumulator = accumulators.get(i);
+            if (newAggregationIdx.get(i) == 2) {
+              accumulator.addIntermediate(tsBlock.getColumns(new int[2]{cnt++, 
cnt+}));
+            } else {
+              accumulator.addIntermediate(tsBlock.getColumns(new int[]));
+            }
+          }
+          readIndex[idx] ++;
+        }
+      }
+
+      timeBuilder.writeLong(currentTime);
+      for (int i = 1; i < dataTypes.size(); i++) {
+        accumulators.get(i).outputFinal(valueColumnBuilders[i]);
+      }
+
+      currentDevice = null;
+
+      if (System.nanoTime() - startTime > maxRuntime || 
tsBlockBuilder.isFull()) {
+        break;
+      }
     }
 
     return tsBlockBuilder.build();
@@ -82,38 +172,58 @@ public class AggregationMergeSortOperator extends 
AbstractConsumeAllOperator {
 
   @Override
   public boolean hasNext() throws Exception {
-    // TODO the child of DeviceViewNode already calc TimeRange?
-    // return curTimeRange != null || timeRangeIterator.hasNextTimeRange();
-
     if (finished) {
       return false;
     }
+
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!isEmpty(i)) {
+      if (isInputNotEmpty(i)) {
         return true;
       } else if (!noMoreTsBlocks[i]) {
         if (!canCallNext[i] || children.get(i).hasNextWithTimer()) {
           return true;
         } else {
-          children.get(i).close();
-          children.set(i, null);
-          noMoreTsBlocks[i] = true;
-          inputTsBlocks[i] = null;
+          handleFinishedChild(i);
         }
       }
     }
+
     return false;
   }
 
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    boolean hasReadyChild = false;
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (noMoreTsBlocks[i] || isInputNotEmpty(i) || children.get(i) == null) {
+        continue;
+      }
+      ListenableFuture<?> blocked = children.get(i).isBlocked();
+      if (blocked.isDone()) {
+        hasReadyChild = true;
+        // only when not blocked, canCallNext[i] equals true
+        canCallNext[i] = true;
+      } else {
+        listenableFutures.add(blocked);
+      }
+    }
+
+    return (hasReadyChild || listenableFutures.isEmpty())
+        ? NOT_BLOCKED
+        : successfulAsList(listenableFutures);
+  }
+
   @Override
   public boolean isFinished() throws Exception {
     if (finished) {
       return true;
     }
-    finished = true;
 
+    finished = true;
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!noMoreTsBlocks[i] || !isEmpty(i)) {
+      if (!noMoreTsBlocks[i] || isInputNotEmpty(i)) {
         finished = false;
         break;
       }
@@ -121,6 +231,25 @@ public class AggregationMergeSortOperator extends 
AbstractConsumeAllOperator {
     return finished;
   }
 
+  @Override
+  public void close() throws Exception {
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      final Operator operator = children.get(i);
+      if (operator != null) {
+        operator.close();
+      }
+    }
+  }
+
+  @Override
+  protected void handleFinishedChild(int currentChildIndex) throws Exception {
+    // invoking this method when children.get(currentChildIndex).hasNext 
return false
+    noMoreTsBlocks[currentChildIndex] = true;
+    inputTsBlocks[currentChildIndex] = null;
+    children.get(currentChildIndex).close();
+    children.set(currentChildIndex, null);
+  }
+
   @Override
   public long calculateMaxPeekMemory() {
     return 0;
@@ -135,4 +264,8 @@ public class AggregationMergeSortOperator extends 
AbstractConsumeAllOperator {
   public long calculateRetainedSizeAfterCallingNext() {
     return 0;
   }
+
+  private boolean isInputNotEmpty(int index) {
+    return inputTsBlocks[index] != null && !inputTsBlocks[index].isEmpty();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 61174b5e776..b052c007bc4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.common.NodeRef;
+import org.apache.iotdb.db.queryengine.execution.aggregation.Accumulator;
 import 
org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator;
 import 
org.apache.iotdb.db.queryengine.execution.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
@@ -147,6 +148,7 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaC
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
+import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.ColumnTransformerVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
@@ -277,6 +279,7 @@ import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.getOutputColumnSizePerLine;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static 
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor.getAggregationTypeByFuncName;
 import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.updateFilterUsingTTL;
 import static 
org.apache.iotdb.db.utils.TimestampPrecisionUtils.TIMESTAMP_PRECISION;
 
@@ -872,24 +875,53 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
     List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, 
context);
 
-    //    for (Expression expression : selectExpressions) {
-    //      if (expression instanceof FunctionExpression) {
-    //        FunctionExpression functionExpression = (FunctionExpression) 
expression;
-    //        String functionName = functionExpression.getFunctionName();
-    //        expression.getExpressionType();
-    //        Accumulator accumulator = AccumulatorFactory.createAccumulator(
-    //                functionName,
-    //                aggregationType,
-    //
-    // 
Collections.singletonList(context.getTypeProvider().getType(functionExpression.getOutputSymbol())),
-    //                null,
-    //                null,
-    //                true,
-    //                true);
-    //      }
-    //    }
-
-    return new AggregationMergeSortOperator(operatorContext, children, 
dataTypes);
+    List<SortItem> sortItemList = 
node.getMergeOrderParameter().getSortItemList();
+    List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
+    List<TSDataType> sortItemDataTypeList = new 
ArrayList<>(sortItemList.size());
+    genSortInformation(
+        node.getOutputColumnNames(),
+        dataTypes,
+        sortItemList,
+        sortItemIndexList,
+        sortItemDataTypeList);
+
+    boolean timeAscending = true;
+    TimeComparator timeComparator = ASC_TIME_COMPARATOR;
+    Comparator<Binary> deviceComparator = ASC_BINARY_COMPARATOR;
+    for (SortItem sortItem : sortItemList) {
+      if (TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(sortItem.getSortKey())) 
{
+        if (sortItem.getOrdering() == Ordering.DESC) {
+          timeAscending = false;
+          timeComparator = DESC_TIME_COMPARATOR;
+        }
+      } else if ("Device".equalsIgnoreCase(sortItem.getSortKey())) {
+        if (sortItem.getOrdering() == Ordering.DESC) {
+          deviceComparator = DESC_BINARY_COMPARATOR;
+        }
+      }
+    }
+
+    List<Accumulator> accumulators = new ArrayList<>();
+    for (Expression expression : node.getSelectExpressions()) {
+      if (expression instanceof FunctionExpression) {
+        FunctionExpression functionExpression = (FunctionExpression) 
expression;
+        String aggregationName = functionExpression.getFunctionName();
+        Accumulator accumulator =
+            AccumulatorFactory.createAccumulator(
+                aggregationName,
+                getAggregationTypeByFuncName(aggregationName),
+                Collections.singletonList(
+                    
context.getTypeProvider().getType(functionExpression.getOutputSymbol())),
+                null,
+                null,
+                timeAscending,
+                false);
+        accumulators.add(accumulator);
+      }
+    }
+
+    return new AggregationMergeSortOperator(
+        operatorContext, children, dataTypes, accumulators, timeComparator, 
deviceComparator);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 57c807c8a41..022345dd250 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
-import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.BaseSourceRewriter;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -74,6 +73,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
 import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -90,6 +90,7 @@ import java.util.stream.Collectors;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.LAST_VALUE;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression;
 import static 
org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder.updateTypeProviderByPartialAggregation;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.AVG;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_VALUE;
@@ -228,55 +229,66 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
     if (analysis.isExistDeviceCrossRegion() && 
analysis.isDeviceViewSpecialProcess()) {
       // return processSpecialDeviceView(node, context);
 
-      // TODO 1. generate old and new measurement idx relationship 2. generate 
new outputColumns for
+      // TODO 1. generate old and new measurement idx relationship
+      // TODO 2. generate new outputColumns for
       // each subDeviceView
-//      Map<Integer, List<Integer>> newMeasurementIdxMap = new HashMap<>();
-//      List<String> newPartialOutputColumns = new ArrayList<>();
-//
+      Map<Integer, List<Integer>> newMeasurementIdxMap = new HashMap<>();
+      List<String> newPartialOutputColumns = new ArrayList<>();
+
       Set<Expression> selectExpressions = analysis.getSelectExpressions();
-//
-//      int i = 0, idxSum = 0;
-//      for (Expression expression : selectExpressions) {
-//        if (i == 0) {
-//          // device
-//          newPartialOutputColumns.add(expression.getOutputSymbol());
-//          i++;
-//          idxSum++;
-//          continue;
-//        }
-//        FunctionExpression aggExpression = (FunctionExpression) expression;
-//        List<String> actualPartialAggregationNames =
-//            
getActualPartialAggregationNames(aggExpression.getFunctionName());
-//        for (String actualAggName : actualPartialAggregationNames) {
-//          newPartialOutputColumns.add(
-//              new FunctionExpression(
-//                      actualAggName,
-//                      aggExpression.getFunctionAttributes(),
-//                      aggExpression.getExpressions())
-//                  .getOutputSymbol());
-//        }
-//        // TODO need update typeProvider?
-//        if (actualPartialAggregationNames.size() > 1) {
-//          newMeasurementIdxMap.put(i, Arrays.asList(idxSum++, idxSum++));
-//        } else {
-//          newMeasurementIdxMap.put(i, Collections.singletonList(idxSum++));
-//        }
-//        i++;
-//      }
-
-//      for (String device : node.getDevices()) {
-//        List<Integer> oldMeasurementList =
-//                node.getDeviceToMeasurementIndexesMap().get(device);
-//        List<Integer> newMeasurementList = new ArrayList<>();
-//        for (int idx : oldMeasurementList) {
-//          newMeasurementList.addAll(newMeasurementIdxMap.get(idx));
-//        }
-//        node.getDeviceToMeasurementIndexesMap().put(device, 
newMeasurementList);
-//      }
+      int[] newAggregationIdx = new int[selectExpressions.size()];
+
+      int i = 0, idxSum = 0;
+      for (Expression expression : selectExpressions) {
+        if (i == 0) {
+          // device
+          newPartialOutputColumns.add(expression.getOutputSymbol());
+          i++;
+          idxSum++;
+          continue;
+        }
+        FunctionExpression aggExpression = (FunctionExpression) expression;
+        // used for AVG, FIRST_VALUE, LAST_VALUE, TIME_DURATION agg function
+        List<String> actualPartialAggregationNames =
+            getActualPartialAggregationNames(aggExpression.getFunctionName());
+        for (String actualAggName : actualPartialAggregationNames) {
+          FunctionExpression partialFunctionExpression =
+              new FunctionExpression(
+                  actualAggName,
+                  aggExpression.getFunctionAttributes(),
+                  aggExpression.getExpressions());
+          if (actualPartialAggregationNames.size() > 1) {
+            TSDataType dataType = analyzeExpression(analysis, 
partialFunctionExpression);
+            context
+                .queryContext
+                .getTypeProvider()
+                .setType(partialFunctionExpression.getOutputSymbol(), 
dataType);
+          }
+          
newPartialOutputColumns.add(partialFunctionExpression.getOutputSymbol());
+        }
+
+        newAggregationIdx[i] = actualPartialAggregationNames.size();
+        // TODO need update typeProvider?
+        if (actualPartialAggregationNames.size() > 1) {
+          newMeasurementIdxMap.put(i, Arrays.asList(idxSum++, idxSum++));
+        } else {
+          newMeasurementIdxMap.put(i, Collections.singletonList(idxSum++));
+        }
+        i++;
+      }
+
+      for (String device : node.getDevices()) {
+        List<Integer> oldMeasurementList = 
node.getDeviceToMeasurementIndexesMap().get(device);
+        List<Integer> newMeasurementList = new ArrayList<>();
+        for (int idx : oldMeasurementList) {
+          newMeasurementList.addAll(newMeasurementIdxMap.get(idx));
+        }
+        node.getDeviceToMeasurementIndexesMap().put(device, 
newMeasurementList);
+      }
 
       for (PlanNode planNode : deviceViewNodeList) {
         DeviceViewNode deviceViewNode = (DeviceViewNode) planNode;
-        // deviceViewNode.setOutputColumnNames(newPartialOutputColumns);
+        deviceViewNode.setOutputColumnNames(newPartialOutputColumns);
         transferAggregatorsRecursively2(planNode, context);
       }
 
@@ -285,7 +297,8 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
               context.queryContext.getQueryId().genPlanNodeId(),
               node.getMergeOrderParameter(),
               node.getOutputColumnNames(),
-              selectExpressions);
+              selectExpressions,
+              newAggregationIdx);
       deviceViewNodeList.forEach(mergeSortNode::addChild);
       return Collections.singletonList(mergeSortNode);
     } else {
@@ -410,16 +423,15 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
           List<String> aggregationNames = 
descriptor.getActualAggregationNames(true);
           for (String aggregationName : aggregationNames) {
             newDescriptorList.add(
-                    new AggregationDescriptor(
-                            aggregationName,
-                            AggregationStep.PARTIAL,
-                            descriptor.getInputExpressions(),
-                            descriptor.getInputAttributes()));
+                new AggregationDescriptor(
+                    aggregationName,
+                    AggregationStep.PARTIAL,
+                    descriptor.getInputExpressions(),
+                    descriptor.getInputAttributes()));
           }
         }
         scanSourceNode.setAggregationDescriptorList(newDescriptorList);
       }
-
     }
   }
 
@@ -759,9 +771,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
                         descriptor.getInputExpressions(),
                         descriptor.getInputAttributes())));
     leafAggDescriptorList.forEach(
-        d ->
-            updateTypeProviderByPartialAggregation(
-                d, context.queryContext.getTypeProvider()));
+        d -> updateTypeProviderByPartialAggregation(d, 
context.queryContext.getTypeProvider()));
     List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
     node.getAggregationDescriptorList()
         .forEach(
@@ -1490,8 +1500,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
         descriptor.setStep(level == 0 ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE);
         descriptor.setInputExpressions(new ArrayList<>(descriptorExpressions));
         descriptorList.add(descriptor);
-        updateTypeProviderByPartialAggregation(
-            descriptor, context.queryContext.getTypeProvider());
+        updateTypeProviderByPartialAggregation(descriptor, 
context.queryContext.getTypeProvider());
       }
       handle.setGroupByLevelDescriptors(descriptorList);
     }
@@ -1588,8 +1597,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
           .forEach(
               d -> {
                 d.setStep(AggregationStep.PARTIAL);
-                updateTypeProviderByPartialAggregation(
-                    d, context.queryContext.getTypeProvider());
+                updateTypeProviderByPartialAggregation(d, 
context.queryContext.getTypeProvider());
               });
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
index c446715efae..dd7dc6cd404 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
@@ -43,15 +43,19 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
 
   private final Set<Expression> selectExpressions;
 
+  private final int[] newAggregationIdx;
+
   public AggregationMergeSortNode(
       PlanNodeId id,
       OrderByParameter mergeOrderParameter,
       List<String> outputColumns,
-      Set<Expression> selectExpressions) {
+      Set<Expression> selectExpressions,
+      int[] newAggregationIdx) {
     super(id);
     this.mergeOrderParameter = mergeOrderParameter;
     this.outputColumns = outputColumns;
     this.selectExpressions = selectExpressions;
+    this.newAggregationIdx = newAggregationIdx;
   }
 
   public AggregationMergeSortNode(
@@ -59,21 +63,31 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
       List<PlanNode> children,
       OrderByParameter mergeOrderParameter,
       List<String> outputColumns,
-      Set<Expression> selectExpressions) {
+      Set<Expression> selectExpressions,
+      int[] newAggregationIdx) {
     super(id, children);
     this.mergeOrderParameter = mergeOrderParameter;
     this.outputColumns = outputColumns;
     this.selectExpressions = selectExpressions;
+    this.newAggregationIdx = newAggregationIdx;
   }
 
   public OrderByParameter getMergeOrderParameter() {
     return mergeOrderParameter;
   }
 
+  public Set<Expression> getSelectExpressions() {
+    return this.selectExpressions;
+  }
+
   @Override
   public PlanNode clone() {
     return new AggregationMergeSortNode(
-        getPlanNodeId(), getMergeOrderParameter(), outputColumns, 
selectExpressions);
+        getPlanNodeId(),
+        getMergeOrderParameter(),
+        outputColumns,
+        selectExpressions,
+        newAggregationIdx);
   }
 
   @Override
@@ -83,7 +97,8 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
         new ArrayList<>(children.subList(startIndex, endIndex)),
         getMergeOrderParameter(),
         outputColumns,
-        selectExpressions);
+        selectExpressions,
+        null);
   }
 
   @Override
@@ -125,7 +140,7 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
       columnSize--;
     }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new AggregationMergeSortNode(planNodeId, orderByParameter, 
outputColumns, null);
+    return new AggregationMergeSortNode(planNodeId, orderByParameter, 
outputColumns, null, null);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
index ff6abfe2017..c695c2280b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -81,7 +81,7 @@ public class AggregationDescriptor {
     this.inputAttributes = inputAttributes;
   }
 
-  private TAggregationType getAggregationTypeByFuncName(String funcName) {
+  public static TAggregationType getAggregationTypeByFuncName(String funcName) 
{
     if (isBuiltinAggregationName(funcName.toLowerCase())) {
       return TAggregationType.valueOf(funcName.toUpperCase());
     } else {


Reply via email to