This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 25b649bd0d0000db00cc29970018da6d8b20b636
Author: Alima777 <[email protected]>
AuthorDate: Thu Apr 28 17:09:23 2022 +0800

    modify seriesAggregateScanOperator using aggregator
---
 .../Accumulator.java}                              |  45 +--
 .../db/mpp/operator/aggregation/Aggregator.java    |  98 +++++++
 .../AggregatorFactory.java}                        |  45 +--
 .../mpp/operator/aggregation/AvgAccumulator.java   |  86 ++++++
 .../mpp/operator/aggregation/CountAccumulator.java |  84 ++++++
 .../db/mpp/operator/process/AggregateOperator.java |  17 +-
 .../source/SeriesAggregateScanOperator.java        | 111 +++----
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |   2 +-
 .../sql/planner/plan/parameter/InputLocation.java  |   4 +-
 .../operator/SeriesAggregateScanOperatorTest.java  |   5 +-
 .../iotdb/tsfile/read/common/block/TsBlock.java    |  38 ++-
 .../read/common/block/column/BinaryColumn.java     |   9 +
 .../read/common/block/column/BooleanColumn.java    |   9 +
 .../tsfile/read/common/block/column/Column.java    |   6 +
 .../read/common/block/column/DoubleColumn.java     |   9 +
 .../read/common/block/column/FloatColumn.java      |   8 +
 .../tsfile/read/common/block/column/IntColumn.java |   8 +
 .../read/common/block/column/LongColumn.java       |   8 +
 .../block/column/RunLengthEncodedColumn.java       |   8 +
 .../read/common/block/column/TimeColumn.java       |   8 +
 .../iotdb/tsfile/common/block/TsBlockTest.java     |  49 ++++
 .../iotdb/tsfile/read/common/ColumnTest.java       | 322 +++++++++++++++++++++
 22 files changed, 837 insertions(+), 142 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
similarity index 52%
copy from 
server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
copy to 
server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
index 0eda188787..f6f268aad1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
@@ -16,42 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.operator.process;
+package org.apache.iotdb.db.mpp.operator.aggregation;
 
-import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 
-import com.google.common.util.concurrent.ListenableFuture;
+public interface Accumulator {
 
-public class AggregateOperator implements ProcessOperator {
+  // Column should be like: | Time | Value |
+  void addInput(Column[] column, TimeRange timeRange);
 
-  @Override
-  public OperatorContext getOperatorContext() {
-    return null;
-  }
+  void addIntermediate(Column[] partialResult);
 
-  @Override
-  public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
-  }
+  void addStatistics(Statistics statistics);
 
-  @Override
-  public TsBlock next() {
-    return null;
-  }
+  void setFinal(Column finalResult);
 
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
+  void outputIntermediate(ColumnBuilder[] tsBlockBuilder);
 
-  @Override
-  public void close() throws Exception {
-    ProcessOperator.super.close();
-  }
+  void outputFinal(ColumnBuilder tsBlockBuilder);
 
-  @Override
-  public boolean isFinished() {
-    return false;
-  }
+  void reset();
+
+  boolean hasFinalResult();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
new file mode 100644
index 0000000000..94fe9080cb
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public class Aggregator {
+
+  private final Accumulator accumulator;
+  private final List<InputLocation> inputLocationList;
+  private final AggregationStep step;
+  private final TSDataType intermediateType;
+  private final TSDataType finalType;
+
+  private TimeRange timeRange;
+
+  public Aggregator(
+      Accumulator accumulator,
+      AggregationStep step,
+      List<InputLocation> inputLocationList,
+      TSDataType intermediateType,
+      TSDataType finalType) {
+    this.accumulator = accumulator;
+    this.step = step;
+    this.inputLocationList = inputLocationList;
+    this.intermediateType = intermediateType;
+    this.finalType = finalType;
+  }
+
+  public void processTsBlock(TsBlock tsBlock) {
+    if (step.isInputRaw()) {
+      accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange);
+    } else {
+      accumulator.addIntermediate(tsBlock.getColumns(new int[] {0}));
+    }
+  }
+
+  public void outputResult(ColumnBuilder[] columnBuilder) {
+    if (step.isOutputPartial()) {
+      accumulator.outputIntermediate(columnBuilder);
+    } else {
+      accumulator.outputFinal(columnBuilder[0]);
+    }
+  }
+
+  public void processStatistics(Statistics statistics) {
+    accumulator.addStatistics(statistics);
+  }
+
+  public TSDataType getOutputType() {
+    if (step.isOutputPartial()) {
+      return intermediateType;
+    } else {
+      return finalType;
+    }
+  }
+
+  public void reset() {
+    accumulator.reset();
+  }
+
+  public boolean hasFinalResult() {
+    return accumulator.hasFinalResult();
+  }
+
+  public void setTimeRange(TimeRange timeRange) {
+    this.timeRange = timeRange;
+  }
+
+  public TimeRange getTimeRange() {
+    return timeRange;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
similarity index 52%
copy from 
server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
copy to 
server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
index 0eda188787..ce493b3256 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
@@ -16,42 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.operator.process;
 
-import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+package org.apache.iotdb.db.mpp.operator.aggregation;
 
-import com.google.common.util.concurrent.ListenableFuture;
+public class AggregatorFactory {
 
-public class AggregateOperator implements ProcessOperator {
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return null;
-  }
-
-  @Override
-  public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
-  }
-
-  @Override
-  public TsBlock next() {
-    return null;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public void close() throws Exception {
-    ProcessOperator.super.close();
-  }
-
-  @Override
-  public boolean isFinished() {
-    return false;
+  public static Aggregator createAggregator() {
+    Accumulator accumulator;
+    if (step.isInputRaw()) {
+      accumulator = accumulatorFactory.createAccumulator(lambdaProviders);
+    } else {
+      accumulator = 
accumulatorFactory.createIntermediateAccumulator(lambdaProviders);
+    }
+    return new Aggregator(
+        accumulator, step, intermediateType, finalType, inputChannels, 
maskChannel);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
new file mode 100644
index 0000000000..dc993a65c5
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+public class AvgAccumulator implements Accumulator {
+
+  private TSDataType seriesDataType;
+  private long countValue;
+  private double sumValue;
+
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {}
+
+  @Override
+  public void addIntermediate(Column[] partialResult) {}
+
+  @Override
+  public void addStatistics(Statistics statistics) {}
+
+  @Override
+  public void setFinal(Column finalResult) {}
+
+  @Override
+  public void outputIntermediate(ColumnBuilder[] tsBlockBuilder) {}
+
+  @Override
+  public void outputFinal(ColumnBuilder tsBlockBuilder) {}
+
+  @Override
+  public void reset() {
+    this.countValue = 0;
+    this.sumValue = 0.0;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  private void updateAvg(TSDataType type, Object sumVal) throws 
UnSupportedDataTypeException {
+    double val;
+    switch (type) {
+      case INT32:
+        val = (int) sumVal;
+        break;
+      case INT64:
+        val = (long) sumVal;
+        break;
+      case FLOAT:
+        val = (float) sumVal;
+        break;
+      case DOUBLE:
+        val = (double) sumVal;
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in aggregation AVG : %s", 
type));
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
new file mode 100644
index 0000000000..db266b55e4
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class CountAccumulator implements Accumulator {
+
+  private long countValue = 0;
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      countValue++;
+    }
+  }
+
+  // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    for (int i = 0; i < partialResult.length; i++) {
+      countValue += partialResult[i].getLong(0);
+    }
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    countValue += statistics.getCount();
+  }
+
+  // finalResult should be single column, like: | finalCountValue |
+  @Override
+  public void setFinal(Column finalResult) {
+    countValue = finalResult.getLong(0);
+  }
+
+  // columnBuilder should be single in countAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeLong(countValue);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeLong(countValue);
+  }
+
+  @Override
+  public void reset() {
+    this.countValue = 0;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
index 0eda188787..e0cd04a5a4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
@@ -18,16 +18,31 @@
  */
 package org.apache.iotdb.db.mpp.operator.process;
 
+import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.operator.aggregation.Aggregator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.List;
+
 public class AggregateOperator implements ProcessOperator {
 
+  private final OperatorContext operatorContext;
+  private final List<Aggregator> aggregators;
+  private final List<Operator> children;
+
+  public AggregateOperator(
+      OperatorContext operatorContext, List<Aggregator> aggregators, 
List<Operator> children) {
+    this.operatorContext = operatorContext;
+    this.aggregators = aggregators;
+    this.children = children;
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
-    return null;
+    return operatorContext;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index b94fadccc1..8966b1f57b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -19,15 +19,11 @@
 package org.apache.iotdb.db.mpp.operator.source;
 
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.operator.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
-import org.apache.iotdb.db.query.aggregation.AggregateResult;
-import org.apache.iotdb.db.query.aggregation.AggregationType;
-import org.apache.iotdb.db.query.factory.AggregateResultFactory;
-import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.utils.timerangeiterator.SingleTimeWindowIterator;
 import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
@@ -43,7 +39,6 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -62,15 +57,15 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
   private final PlanNodeId sourceId;
   private final SeriesScanUtil seriesScanUtil;
   private final boolean ascending;
-  private List<AggregateResult> aggregateResultList;
+  // We still think aggregator in SeriesAggregateScanOperator is a inputRaw 
step.
+  // But in facing of statistics, it will invoke another method 
processStatistics()
+  private List<Aggregator> aggregators;
 
   private ITimeRangeIterator timeRangeIterator;
   // current interval of aggregation window [curStartTime, curEndTime)
   private TimeRange curTimeRange;
 
-  private TsBlockSingleColumnIterator preCachedData;
-  // used for resetting the preCachedData to the last read index
-  private int lastReadIndex;
+  private TsBlock preCachedData;
 
   private TsBlockBuilder tsBlockBuilder;
   private TsBlock resultTsBlock;
@@ -82,7 +77,7 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
       PartialPath seriesPath,
       Set<String> allSensors,
       OperatorContext context,
-      List<AggregationType> aggregateFuncList,
+      List<Aggregator> aggregators,
       Filter timeFilter,
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter) {
@@ -98,21 +93,10 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
             timeFilter,
             null,
             ascending);
-    aggregateResultList = new ArrayList<>(aggregateFuncList.size());
-    for (AggregationType aggregationType : aggregateFuncList) {
-      aggregateResultList.add(
-          AggregateResultFactory.getAggrResultByType(
-              aggregationType,
-              seriesPath.getSeriesType(),
-              seriesScanUtil.getOrderUtils().getAscending()));
-    }
+    this.aggregators = aggregators;
     tsBlockBuilder =
         new TsBlockBuilder(
-            aggregateFuncList.stream()
-                .map(
-                    functionType ->
-                        SchemaUtils.getSeriesTypeByPath(seriesPath, 
functionType.name()))
-                .collect(Collectors.toList()));
+            
aggregators.stream().map(Aggregator::getOutputType).collect(Collectors.toList()));
     this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter);
   }
 
@@ -169,8 +153,9 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
       curTimeRange = timeRangeIterator.nextTimeRange();
 
       // 1. Clear previous aggregation result
-      for (AggregateResult result : aggregateResultList) {
-        result.reset();
+      for (Aggregator aggregator : aggregators) {
+        aggregator.reset();
+        aggregator.setTimeRange(curTimeRange);
       }
 
       // 2. Calculate aggregation result based on current time window
@@ -226,14 +211,15 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
   }
 
   private void updateResultTsBlockUsingAggregateResult() {
-    // TODO AVG
     tsBlockBuilder.reset();
     TimeColumnBuilder timeColumnBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
     // Use start time of current time range as time column
     timeColumnBuilder.writeLong(curTimeRange.getMin());
     ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
-    for (int i = 0; i < aggregateResultList.size(); i++) {
-      columnBuilders[i].writeObject(aggregateResultList.get(i).getResult());
+    for (int i = 0; i < aggregators.size(); i++) {
+      ColumnBuilder[] columnBuilder = new ColumnBuilder[1];
+      columnBuilder[0] = columnBuilders[i];
+      aggregators.get(i).outputResult(columnBuilder);
     }
     tsBlockBuilder.declarePosition();
     resultTsBlock = tsBlockBuilder.build();
@@ -273,41 +259,33 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
   }
 
   @SuppressWarnings("squid:S3776")
-  private void calcFromBatch(TsBlockSingleColumnIterator blockIterator, 
TimeRange curTimeRange)
-      throws IOException {
+  private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) {
     // check if the batchData does not contain points in current interval
-    if (!satisfied(blockIterator, curTimeRange)) {
+    if (!satisfied(tsBlock, curTimeRange)) {
       return;
     }
 
-    for (AggregateResult result : aggregateResultList) {
+    // skip points that cannot be calculated
+    tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange);
+
+    for (Aggregator aggregator : aggregators) {
       // current agg method has been calculated
-      if (result.hasFinalResult()) {
+      if (aggregator.hasFinalResult()) {
         continue;
       }
-      // lazy reset batch data for calculation
-      blockIterator.setRowIndex(lastReadIndex);
-      // skip points that cannot be calculated
-      skipOutOfTimeRangePoints(blockIterator, curTimeRange);
-
-      if (blockIterator.hasNext()) {
-        result.updateResultFromPageData(
-            blockIterator, curTimeRange.getMin(), curTimeRange.getMax());
-      }
-    }
 
-    // reset the last position to current Index
-    lastReadIndex = blockIterator.getRowIndex();
+      aggregator.processTsBlock(tsBlock);
+    }
 
     // can calc for next interval
-    if (blockIterator.hasNext()) {
-      preCachedData = blockIterator;
+    if (tsBlock.getTsBlockSingleColumnIterator().hasNext()) {
+      preCachedData = tsBlock;
     }
   }
 
   // skip points that cannot be calculated
-  private void skipOutOfTimeRangePoints(
-      TsBlockSingleColumnIterator tsBlockIterator, TimeRange curTimeRange) {
+  private TsBlock skipOutOfTimeRangePoints(TsBlock tsBlock, TimeRange 
curTimeRange) {
+    TsBlockSingleColumnIterator tsBlockIterator = 
tsBlock.getTsBlockSingleColumnIterator();
     if (ascending) {
       while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < 
curTimeRange.getMin()) {
         tsBlockIterator.next();
@@ -317,9 +295,11 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
         tsBlockIterator.next();
       }
     }
+    return tsBlock.subTsBlock(tsBlockIterator.getRowIndex());
   }
 
-  private boolean satisfied(TsBlockSingleColumnIterator tsBlockIterator, 
TimeRange timeRange) {
+  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange) {
+    TsBlockSingleColumnIterator tsBlockIterator = 
tsBlock.getTsBlockSingleColumnIterator();
     if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
       return false;
     }
@@ -332,15 +312,15 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
     if (!ascending
         && (tsBlockIterator.getStartTime() >= timeRange.getMax()
             || tsBlockIterator.currentTime() < timeRange.getMin())) {
-      preCachedData = tsBlockIterator;
+      preCachedData = tsBlock;
       return false;
     }
     return true;
   }
 
   private boolean isEndCalc() {
-    for (AggregateResult result : aggregateResultList) {
-      if (!result.hasFinalResult()) {
+    for (Aggregator aggregator : aggregators) {
+      if (!aggregator.hasFinalResult()) {
         return false;
       }
     }
@@ -375,23 +355,23 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
       }
 
       // calc from page data
-      TsBlockSingleColumnIterator tsBlockIterator =
-          seriesScanUtil.nextPage().getTsBlockSingleColumnIterator();
+      TsBlock tsBlock = seriesScanUtil.nextPage();
+      TsBlockSingleColumnIterator tsBlockIterator = 
tsBlock.getTsBlockSingleColumnIterator();
       if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
         continue;
       }
 
       // reset the last position to current Index
-      lastReadIndex = tsBlockIterator.getRowIndex();
+      // lastReadIndex = tsBlockIterator.getRowIndex();
 
       // stop calc and cached current batchData
       if (ascending && tsBlockIterator.currentTime() >= curTimeRange.getMax()) 
{
-        preCachedData = tsBlockIterator;
+        preCachedData = tsBlock;
         return true;
       }
 
       // calc from batch data
-      calcFromBatch(tsBlockIterator, curTimeRange);
+      calcFromBatch(tsBlock, curTimeRange);
 
       // judge whether the calculation finished
       if (isEndCalc()
@@ -432,15 +412,12 @@ public class SeriesAggregateScanOperator implements 
DataSourceOperator {
   }
 
   private void calcFromStatistics(Statistics statistics) {
-    try {
-      for (AggregateResult result : aggregateResultList) {
-        if (result.hasFinalResult()) {
-          continue;
-        }
-        result.updateResultFromStatistics(statistics);
+    for (int i = 0; i < aggregators.size(); i++) {
+      Aggregator aggregator = aggregators.get(i);
+      if (aggregator.hasFinalResult()) {
+        continue;
       }
-    } catch (QueryProcessException e) {
-      throw new RuntimeException("Error while updating result using 
statistics", e);
+      aggregator.processStatistics(statistics);
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index cfa593146a..bee6e9f5fe 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -232,7 +232,7 @@ public class LocalExecutionPlanner {
               seriesPath,
               node.getAllSensors(),
               operatorContext,
-              node.getAggregateFuncList(),
+              null,
               node.getTimeFilter(),
               ascending,
               node.getGroupByTimeParameter());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java
index fda0f4a957..97d7bb591a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java
@@ -24,9 +24,9 @@ import java.nio.ByteBuffer;
 import java.util.Objects;
 
 public class InputLocation {
-  // which input tsblock
+  // which input tsBlock
   private final int tsBlockIndex;
-  // which value column of that tsblock
+  // which value column of that tsBlock
   private final int valueColumnIndex;
 
   public InputLocation(int tsBlockIndex, int valueColumnIndex) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index 09d56b7211..bfaced0670 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.operator.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -346,7 +347,7 @@ public class SeriesAggregateScanOperatorTest {
   }
 
   public SeriesAggregateScanOperator initSeriesAggregateScanOperator(
-      List<AggregationType> aggregateFuncList,
+      List<Aggregator> aggregators,
       Filter timeFilter,
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter)
@@ -373,7 +374,7 @@ public class SeriesAggregateScanOperatorTest {
             measurementPath,
             allSensors,
             fragmentInstanceContext.getOperatorContexts().get(0),
-            aggregateFuncList,
+            aggregators,
             timeFilter,
             ascending,
             groupByTimeParameter);
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index f6d4b68c4d..008db2dde3 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -92,12 +92,6 @@ public class TsBlock {
     }
   }
 
-  public boolean hasNext() {
-    return false;
-  }
-
-  public void next() {}
-
   public int getPositionCount() {
     return positionCount;
   }
@@ -170,6 +164,23 @@ public class TsBlock {
     return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
   }
 
+  /**
+   * This method will create a temporary view of origin tsBlock, which will 
reuse the arrays of
+   * columns but with different offset. It can be used where you want to skip 
some points when
+   * getting iterator.
+   */
+  public TsBlock subTsBlock(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("FromIndex of subTsBlock cannot over 
positionCount.");
+    }
+    TimeColumn subTimeColumn = (TimeColumn) timeColumn.subColumn(fromIndex);
+    Column[] subValueColumns = new Column[valueColumns.length];
+    for (int i = 0; i < subValueColumns.length; i++) {
+      subValueColumns[i] = valueColumns[i].subColumn(fromIndex);
+    }
+    return new TsBlock(subTimeColumn, subValueColumns);
+  }
+
   public long getTimeByIndex(int index) {
     return timeColumn.getLong(index);
   }
@@ -186,6 +197,21 @@ public class TsBlock {
     return valueColumns[columnIndex];
   }
 
+  public Column[] getTimeAndValueColumn(int columnIndex) {
+    Column[] columns = new Column[2];
+    columns[0] = getTimeColumn();
+    columns[1] = getColumn(columnIndex);
+    return columns;
+  }
+
+  public Column[] getColumns(int[] columnIndexes) {
+    Column[] columns = new Column[columnIndexes.length];
+    for (int i = 0; i < columnIndexes.length; i++) {
+      columns[i] = valueColumns[columnIndexes[i]];
+    }
+    return columns;
+  }
+
   public TsBlockSingleColumnIterator getTsBlockSingleColumnIterator() {
     return new TsBlockSingleColumnIterator(0);
   }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
index 4d7a888394..f9a5497992 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
@@ -123,6 +123,15 @@ public class BinaryColumn implements Column {
     return new BinaryColumn(positionOffset + arrayOffset, length, valueIsNull, 
values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new BinaryColumn(
+        arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, 
values);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= getPositionCount()) {
       throw new IllegalArgumentException("position is not valid");
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
index 218ce1baf8..1166ccfa4c 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
@@ -122,6 +122,15 @@ public class BooleanColumn implements Column {
     return new BooleanColumn(positionOffset + arrayOffset, length, 
valueIsNull, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new BooleanColumn(
+        arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, 
values);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= getPositionCount()) {
       throw new IllegalArgumentException("position is not valid");
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
index ef9fb7d637..446b6a83fe 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
@@ -102,4 +102,10 @@ public interface Column {
    * also be released. If the region column is released, this block may also 
be released.
    */
   Column getRegion(int positionOffset, int length);
+
+  /**
+   * This method will create a temporary view of origin column, which will 
reuse the array of column
+   * but with different array offset.
+   */
+  Column subColumn(int fromIndex);
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
index 13faf135fb..1da44fe212 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
@@ -122,6 +122,15 @@ public class DoubleColumn implements Column {
     return new DoubleColumn(positionOffset + arrayOffset, length, valueIsNull, 
values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new DoubleColumn(
+        arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, 
values);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= getPositionCount()) {
       throw new IllegalArgumentException("position is not valid");
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
index 08762164fb..49d73c3156 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
@@ -121,6 +121,14 @@ public class FloatColumn implements Column {
     return new FloatColumn(positionOffset + arrayOffset, length, valueIsNull, 
values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new FloatColumn(arrayOffset + fromIndex, positionCount - fromIndex, 
valueIsNull, values);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= getPositionCount()) {
       throw new IllegalArgumentException("position is not valid");
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
index 7e8d67f1b3..cfad52184d 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
@@ -121,6 +121,14 @@ public class IntColumn implements Column {
     return new IntColumn(positionOffset + arrayOffset, length, valueIsNull, 
values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new IntColumn(arrayOffset + fromIndex, positionCount - fromIndex, 
valueIsNull, values);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= getPositionCount()) {
       throw new IllegalArgumentException("position is not valid");
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
index a786918af8..9b89a09233 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
@@ -121,6 +121,14 @@ public class LongColumn implements Column {
     return new LongColumn(positionOffset + arrayOffset, length, valueIsNull, 
values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new LongColumn(arrayOffset + fromIndex, positionCount - fromIndex, 
valueIsNull, values);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= getPositionCount()) {
       throw new IllegalArgumentException("position is not valid");
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
index 283c374a99..c55d2e5686 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -147,6 +147,14 @@ public class RunLengthEncodedColumn implements Column {
     return new RunLengthEncodedColumn(value, length);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new RunLengthEncodedColumn(value, positionCount - fromIndex);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= positionCount) {
       throw new IllegalArgumentException("position is not valid");
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
index d8b44fd384..87164cf6d7 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -104,6 +104,14 @@ public class TimeColumn implements Column {
     return new TimeColumn(positionOffset + arrayOffset, length, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new TimeColumn(arrayOffset + fromIndex, positionCount - fromIndex, 
values);
+  }
+
   public long getStartTime() {
     return values[arrayOffset];
   }
diff --git 
a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java 
b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java
index 75d28596e9..669d3a41ad 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.common.block;
 
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import 
org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
@@ -30,6 +31,7 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
 import org.apache.iotdb.tsfile.utils.Binary;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -322,4 +324,51 @@ public class TsBlockTest {
       }
     }
   }
+
+  @Test
+  public void testSubTsBlock() {
+    TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    for (int i = 0; i < 10; i++) {
+      builder.getTimeColumnBuilder().writeLong(i);
+      builder.getColumnBuilder(0).writeInt(i);
+      builder.declarePosition();
+    }
+    TsBlock tsBlock = builder.build();
+    TsBlockSingleColumnIterator iterator = 
tsBlock.getTsBlockSingleColumnIterator();
+    int index = 0;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(index, iterator.currentTime());
+      Assert.assertEquals(index, iterator.currentValue());
+      iterator.next();
+      index++;
+    }
+    // get subTsBlock from TsBlock, offset = 3
+    int offset = 3;
+    TsBlock subTsBlock = tsBlock.subTsBlock(offset);
+    iterator = subTsBlock.getTsBlockSingleColumnIterator();
+    index = offset;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(index, iterator.currentTime());
+      Assert.assertEquals(index, iterator.currentValue());
+      iterator.next();
+      index++;
+    }
+    // get subSubTsBlock from subTsBlock, offset = 2
+    int nextOffset = 2;
+    TsBlock subSubTsBlock = subTsBlock.subTsBlock(nextOffset);
+    iterator = subSubTsBlock.getTsBlockSingleColumnIterator();
+    index = offset + nextOffset;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(index, iterator.currentTime());
+      Assert.assertEquals(index, iterator.currentValue());
+      iterator.next();
+      index++;
+    }
+    try {
+      subSubTsBlock.subTsBlock(3);
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(
+          e.getMessage().contains("FromIndex of subTsBlock cannot over 
positionCount."));
+    }
+  }
 }
diff --git 
a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java 
b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java
new file mode 100644
index 0000000000..f21df099f7
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common;
+
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ColumnTest {
+
+  @Test
+  public void timeColumnSubColumnTest() {
+    TimeColumnBuilder columnBuilder = new TimeColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeLong(i);
+    }
+    TimeColumn timeColumn = (TimeColumn) columnBuilder.build();
+    timeColumn = (TimeColumn) timeColumn.subColumn(5);
+    Assert.assertEquals(5, timeColumn.getPositionCount());
+    Assert.assertEquals(5, timeColumn.getLong(0));
+    Assert.assertEquals(9, timeColumn.getLong(4));
+    try {
+      timeColumn.getLong(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    timeColumn = (TimeColumn) timeColumn.subColumn(3);
+    Assert.assertEquals(2, timeColumn.getPositionCount());
+    Assert.assertEquals(8, timeColumn.getLong(0));
+    Assert.assertEquals(9, timeColumn.getLong(1));
+    try {
+      timeColumn.getLong(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      timeColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void binaryColumnSubColumnTest() {
+    BinaryColumnBuilder columnBuilder = new BinaryColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeBinary(Binary.valueOf(String.valueOf(i)));
+    }
+    BinaryColumn binaryColumn = (BinaryColumn) columnBuilder.build();
+    binaryColumn = (BinaryColumn) binaryColumn.subColumn(5);
+    Assert.assertEquals(5, binaryColumn.getPositionCount());
+    Assert.assertEquals("5", binaryColumn.getBinary(0).toString());
+    Assert.assertEquals("9", binaryColumn.getBinary(4).toString());
+    try {
+      binaryColumn.getBinary(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    binaryColumn = (BinaryColumn) binaryColumn.subColumn(3);
+    Assert.assertEquals(2, binaryColumn.getPositionCount());
+    Assert.assertEquals("8", binaryColumn.getBinary(0).toString());
+    Assert.assertEquals("9", binaryColumn.getBinary(1).toString());
+    try {
+      binaryColumn.getBinary(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      binaryColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void booleanColumnSubColumnTest() {
+    BooleanColumnBuilder columnBuilder = new BooleanColumnBuilder(null, 10);
+    // 0: true, 1: false
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeBoolean(i % 2 == 0);
+    }
+    BooleanColumn booleanColumn = (BooleanColumn) columnBuilder.build();
+    booleanColumn = (BooleanColumn) booleanColumn.subColumn(5);
+    Assert.assertEquals(5, booleanColumn.getPositionCount());
+    Assert.assertFalse(booleanColumn.getBoolean(0));
+    Assert.assertFalse(booleanColumn.getBoolean(4));
+    try {
+      booleanColumn.getBoolean(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    booleanColumn = (BooleanColumn) booleanColumn.subColumn(3);
+    Assert.assertEquals(2, booleanColumn.getPositionCount());
+    Assert.assertTrue(booleanColumn.getBoolean(0));
+    Assert.assertFalse(booleanColumn.getBoolean(1));
+    try {
+      booleanColumn.getBoolean(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      booleanColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void doubleColumnSubColumnTest() {
+    DoubleColumnBuilder columnBuilder = new DoubleColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeDouble(i);
+    }
+    DoubleColumn doubleColumn = (DoubleColumn) columnBuilder.build();
+    doubleColumn = (DoubleColumn) doubleColumn.subColumn(5);
+    Assert.assertEquals(5, doubleColumn.getPositionCount());
+    Assert.assertEquals(5.0, doubleColumn.getDouble(0), 0.001);
+    Assert.assertEquals(9.0, doubleColumn.getDouble(4), 0.001);
+    try {
+      doubleColumn.getDouble(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    doubleColumn = (DoubleColumn) doubleColumn.subColumn(3);
+    Assert.assertEquals(2, doubleColumn.getPositionCount());
+    Assert.assertEquals(8.0, doubleColumn.getDouble(0), 0.001);
+    Assert.assertEquals(9.0, doubleColumn.getDouble(1), 0.001);
+    try {
+      doubleColumn.getDouble(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      doubleColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void floatColumnSubColumnTest() {
+    FloatColumnBuilder columnBuilder = new FloatColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeFloat(i);
+    }
+    FloatColumn floatColumn = (FloatColumn) columnBuilder.build();
+    floatColumn = (FloatColumn) floatColumn.subColumn(5);
+    Assert.assertEquals(5, floatColumn.getPositionCount());
+    Assert.assertEquals(5.0, floatColumn.getFloat(0), 0.001);
+    Assert.assertEquals(9.0, floatColumn.getFloat(4), 0.001);
+    try {
+      floatColumn.getFloat(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    floatColumn = (FloatColumn) floatColumn.subColumn(3);
+    Assert.assertEquals(2, floatColumn.getPositionCount());
+    Assert.assertEquals(8.0, floatColumn.getFloat(0), 0.001);
+    Assert.assertEquals(9.0, floatColumn.getFloat(1), 0.001);
+    try {
+      floatColumn.getFloat(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      floatColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void intColumnSubColumnTest() {
+    IntColumnBuilder columnBuilder = new IntColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeInt(i);
+    }
+    IntColumn intColumn = (IntColumn) columnBuilder.build();
+    intColumn = (IntColumn) intColumn.subColumn(5);
+    Assert.assertEquals(5, intColumn.getPositionCount());
+    Assert.assertEquals(5, intColumn.getInt(0));
+    Assert.assertEquals(9, intColumn.getInt(4));
+    try {
+      intColumn.getInt(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    intColumn = (IntColumn) intColumn.subColumn(3);
+    Assert.assertEquals(2, intColumn.getPositionCount());
+    Assert.assertEquals(8, intColumn.getInt(0));
+    Assert.assertEquals(9, intColumn.getInt(1));
+    try {
+      intColumn.getInt(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      intColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void longColumnSubColumnTest() {
+    LongColumnBuilder columnBuilder = new LongColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeLong(i);
+    }
+    LongColumn longColumn = (LongColumn) columnBuilder.build();
+    longColumn = (LongColumn) longColumn.subColumn(5);
+    Assert.assertEquals(5, longColumn.getPositionCount());
+    Assert.assertEquals(5, longColumn.getLong(0));
+    Assert.assertEquals(9, longColumn.getLong(4));
+    try {
+      longColumn.getLong(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    longColumn = (LongColumn) longColumn.subColumn(3);
+    Assert.assertEquals(2, longColumn.getPositionCount());
+    Assert.assertEquals(8, longColumn.getLong(0));
+    Assert.assertEquals(9, longColumn.getLong(1));
+    try {
+      longColumn.getLong(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      longColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void runLengthEncodedColumnSubColumnTest() {
+    LongColumnBuilder longColumnBuilder = new LongColumnBuilder(null, 1);
+    longColumnBuilder.writeLong(1);
+    RunLengthEncodedColumn column = new 
RunLengthEncodedColumn(longColumnBuilder.build(), 10);
+    column = (RunLengthEncodedColumn) column.subColumn(5);
+    Assert.assertEquals(5, column.getPositionCount());
+    Assert.assertEquals(1, column.getLong(0));
+    Assert.assertEquals(1, column.getLong(4));
+    try {
+      column.getLong(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    column = (RunLengthEncodedColumn) column.subColumn(3);
+    Assert.assertEquals(2, column.getPositionCount());
+    Assert.assertEquals(1, column.getLong(0));
+    Assert.assertEquals(1, column.getLong(1));
+    try {
+      column.getLong(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      column.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+}

Reply via email to