This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e9a1b58d1b32fee0924e5a8e8f0a519918f0e612
Author: Alima777 <[email protected]>
AuthorDate: Fri Apr 29 09:55:01 2022 +0800

    add processTsBlocks
---
 .../db/mpp/operator/aggregation/Aggregator.java    | 28 ++++++++++++++++++++--
 .../operator/aggregation/AggregatorFactory.java    | 14 +----------
 2 files changed, 27 insertions(+), 15 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
index 94fe9080cb..8a05f460c5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 
 import java.util.List;
@@ -32,7 +33,8 @@ import java.util.List;
 public class Aggregator {
 
   private final Accumulator accumulator;
-  private final List<InputLocation> inputLocationList;
+  // In some intermediate result input, inputLocation[] should include two 
columns
+  private final List<InputLocation[]> inputLocationList;
   private final AggregationStep step;
   private final TSDataType intermediateType;
   private final TSDataType finalType;
@@ -42,7 +44,7 @@ public class Aggregator {
   public Aggregator(
       Accumulator accumulator,
       AggregationStep step,
-      List<InputLocation> inputLocationList,
+      List<InputLocation[]> inputLocationList,
       TSDataType intermediateType,
       TSDataType finalType) {
     this.accumulator = accumulator;
@@ -52,6 +54,7 @@ public class Aggregator {
     this.finalType = finalType;
   }
 
+  // Used for SeriesAggregateScanOperator
   public void processTsBlock(TsBlock tsBlock) {
     if (step.isInputRaw()) {
       accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange);
@@ -60,6 +63,27 @@ public class Aggregator {
     }
   }
 
+  // Used for aggregateOperator
+  public void processTsBlocks(TsBlock[] tsBlock) {
+    for (InputLocation[] inputLocations : inputLocationList) {
+      if (step.isInputRaw()) {
+        TsBlock rawTsBlock = tsBlock[inputLocations[0].getTsBlockIndex()];
+        Column[] timeValueColumn = new Column[2];
+        timeValueColumn[0] = rawTsBlock.getTimeColumn();
+        timeValueColumn[1] = 
rawTsBlock.getColumn(inputLocations[0].getValueColumnIndex());
+        accumulator.addInput(timeValueColumn, timeRange);
+      } else {
+        Column[] columns = new Column[inputLocations.length];
+        for (int i = 0; i < inputLocations.length; i++) {
+          columns[i] =
+              tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+                  inputLocations[i].getValueColumnIndex());
+        }
+        accumulator.addIntermediate(columns);
+      }
+    }
+  }
+
   public void outputResult(ColumnBuilder[] columnBuilder) {
     if (step.isOutputPartial()) {
       accumulator.outputIntermediate(columnBuilder);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
index ce493b3256..fb49e8c8c1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
@@ -19,16 +19,4 @@
 
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
-public class AggregatorFactory {
-
-  public static Aggregator createAggregator() {
-    Accumulator accumulator;
-    if (step.isInputRaw()) {
-      accumulator = accumulatorFactory.createAccumulator(lambdaProviders);
-    } else {
-      accumulator = 
accumulatorFactory.createIntermediateAccumulator(lambdaProviders);
-    }
-    return new Aggregator(
-        accumulator, step, intermediateType, finalType, inputChannels, 
maskChannel);
-  }
-}
+public class AggregatorFactory {}

Reply via email to