This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/aggrOpRefactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8c12329a0a807a852e8c4238ae160f1ad3c021ca
Author: Minghui Liu <[email protected]>
AuthorDate: Mon Jul 4 15:45:29 2022 +0800

    move calculate code to next() & extends SeriesAggregationScanOperator
---
 .../AlignedSeriesAggregationScanOperator.java      | 278 +++++----------------
 .../source/SeriesAggregationScanOperator.java      | 186 +++++++-------
 .../file/metadata/statistics/Statistics.java       |   5 +
 3 files changed, 159 insertions(+), 310 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
index f90d8a26f3..15cbab8795 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
@@ -19,55 +19,28 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
-import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import 
org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.List;
 
-import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isEndCalc;
-import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.skipOutOfTimeRangePoints;
-import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateResultTsBlockFromAggregators;
 
 /** This operator is responsible to do the aggregation calculation especially 
for aligned series. */
-public class AlignedSeriesAggregationScanOperator implements 
DataSourceOperator {
+public class AlignedSeriesAggregationScanOperator extends 
SeriesAggregationScanOperator
+    implements DataSourceOperator {
 
-  private final OperatorContext operatorContext;
-  private final PlanNodeId sourceId;
-  private final AlignedSeriesScanUtil alignedSeriesScanUtil;
   private final int subSensorSize;
-  private final boolean ascending;
-  // We still think aggregator in AlignedSeriesAggregateScanOperator is a 
inputRaw step.
-  // But in facing of statistics, it will invoke another method 
processStatistics()
-  private List<Aggregator> aggregators;
-
-  private ITimeRangeIterator timeRangeIterator;
-  // current interval of aggregation window [curStartTime, curEndTime)
-  private TimeRange curTimeRange;
-  private boolean isGroupByQuery;
-
-  private TsBlock preCachedData;
-
-  private TsBlockBuilder tsBlockBuilder;
-  private TsBlock resultTsBlock;
-  private boolean hasCachedTsBlock = false;
-  private boolean finished = false;
 
   public AlignedSeriesAggregationScanOperator(
       PlanNodeId sourceId,
@@ -77,87 +50,48 @@ public class AlignedSeriesAggregationScanOperator 
implements DataSourceOperator
       Filter timeFilter,
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter) {
-    this.sourceId = sourceId;
-    this.operatorContext = context;
-    this.ascending = ascending;
-    this.alignedSeriesScanUtil =
-        new AlignedSeriesScanUtil(
-            seriesPath,
-            new HashSet<>(seriesPath.getMeasurementList()),
-            context.getInstanceContext(),
-            timeFilter,
-            null,
-            ascending);
+    super(
+        sourceId,
+        seriesPath,
+        Collections.emptySet(),
+        context,
+        aggregators,
+        timeFilter,
+        ascending,
+        groupByTimeParameter);
     this.subSensorSize = seriesPath.getMeasurementList().size();
-    this.aggregators = aggregators;
-    List<TSDataType> dataTypes = new ArrayList<>();
-    for (Aggregator aggregator : aggregators) {
-      dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
-    }
-    tsBlockBuilder = new TsBlockBuilder(dataTypes);
-    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, 
ascending, true);
-    this.isGroupByQuery = groupByTimeParameter != null;
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
   }
 
   @Override
-  public TsBlock next() {
-    if (hasCachedTsBlock || hasNext()) {
-      hasCachedTsBlock = false;
-      return resultTsBlock;
-    }
-    return null;
-  }
-
-  @Override
-  public boolean hasNext() {
-    if (hasCachedTsBlock) {
-      return true;
-    }
+  protected void calculateNextResult() {
     try {
-      if (!timeRangeIterator.hasNextTimeRange()) {
-        return false;
-      }
-      curTimeRange = timeRangeIterator.nextTimeRange();
-
-      // 1. Clear previous aggregation result
-      for (Aggregator aggregator : aggregators) {
-        aggregator.reset();
-        aggregator.updateTimeRange(curTimeRange);
-      }
-
-      // 2. Calculate aggregation result based on current time window
       if (calcFromCacheData(curTimeRange)) {
         updateResultTsBlock();
-        return true;
+        return;
       }
 
       // read page data firstly
       if (readAndCalcFromPage(curTimeRange)) {
         updateResultTsBlock();
-        return true;
+        return;
       }
 
       // read chunk data secondly
       if (readAndCalcFromChunk(curTimeRange)) {
         updateResultTsBlock();
-        return true;
+        return;
       }
 
       // read from file first
-      while (alignedSeriesScanUtil.hasNextFile()) {
+      while (seriesScanUtil.hasNextFile()) {
         if (canUseCurrentFileStatistics()) {
-          Statistics fileTimeStatistics = 
alignedSeriesScanUtil.currentFileTimeStatistics();
+          Statistics fileTimeStatistics = 
seriesScanUtil.currentFileTimeStatistics();
           if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) {
             if (ascending) {
               updateResultTsBlock();
-              return true;
+              return;
             } else {
-              alignedSeriesScanUtil.skipCurrentFile();
+              seriesScanUtil.skipCurrentFile();
               continue;
             }
           }
@@ -166,10 +100,10 @@ public class AlignedSeriesAggregationScanOperator 
implements DataSourceOperator
               fileTimeStatistics.getStartTime(), 
fileTimeStatistics.getEndTime())) {
             Statistics[] statisticsList = new Statistics[subSensorSize];
             for (int i = 0; i < subSensorSize; i++) {
-              statisticsList[i] = 
alignedSeriesScanUtil.currentFileStatistics(i);
+              statisticsList[i] = seriesScanUtil.currentFileStatistics(i);
             }
-            calcFromStatistics(statisticsList);
-            alignedSeriesScanUtil.skipCurrentFile();
+            calcFromStatisticsArray(statisticsList);
+            seriesScanUtil.skipCurrentFile();
             if (isEndCalc(aggregators) && !isGroupByQuery) {
               break;
             } else {
@@ -181,125 +115,47 @@ public class AlignedSeriesAggregationScanOperator 
implements DataSourceOperator
         // read chunk
         if (readAndCalcFromChunk(curTimeRange)) {
           updateResultTsBlock();
-          return true;
+          return;
         }
       }
 
       updateResultTsBlock();
-      return true;
     } catch (IOException e) {
       throw new RuntimeException("Error while scanning the file", e);
     }
   }
 
-  @Override
-  public boolean isFinished() {
-    return finished || (finished = !hasNext());
-  }
-
-  @Override
-  public void initQueryDataSource(QueryDataSource dataSource) {
-    alignedSeriesScanUtil.initQueryDataSource(dataSource);
-  }
-
-  @Override
-  public PlanNodeId getSourceId() {
-    return sourceId;
-  }
-
-  private void updateResultTsBlock() {
-    resultTsBlock =
-        updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, 
timeRangeIterator);
-    hasCachedTsBlock = true;
-  }
-
-  /** @return if already get the result */
-  private boolean calcFromCacheData(TimeRange curTimeRange) throws IOException 
{
-    calcFromBatch(preCachedData, curTimeRange);
-    // The result is calculated from the cache
-    return (preCachedData != null
-            && (ascending
-                ? preCachedData.getEndTime() > curTimeRange.getMax()
-                : preCachedData.getEndTime() < curTimeRange.getMin()))
-        || isEndCalc(aggregators);
-  }
-
-  @SuppressWarnings("squid:S3776")
-  private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) {
-    // check if the batchData does not contain points in current interval
-    if (tsBlock != null && satisfied(tsBlock, curTimeRange, ascending)) {
-      // skip points that cannot be calculated
-      if ((ascending && tsBlock.getStartTime() < curTimeRange.getMin())
-          || (!ascending && tsBlock.getStartTime() > curTimeRange.getMax())) {
-        tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange, ascending);
-      }
-
-      int lastReadRowIndex = 0;
-      for (Aggregator aggregator : aggregators) {
-        // current agg method has been calculated
-        if (aggregator.hasFinalResult()) {
-          continue;
-        }
-
-        lastReadRowIndex = Math.max(lastReadRowIndex, 
aggregator.processTsBlock(tsBlock));
-      }
-      if (lastReadRowIndex >= tsBlock.getPositionCount()) {
-        tsBlock = null;
-      } else {
-        tsBlock = tsBlock.subTsBlock(lastReadRowIndex);
-      }
-
-      // can calc for next interval
-      if (tsBlock != null && 
tsBlock.getTsBlockSingleColumnIterator().hasNext()) {
-        preCachedData = tsBlock;
+  private void calcFromStatisticsArray(Statistics[] statistics) {
+    for (Aggregator aggregator : aggregators) {
+      if (aggregator.hasFinalResult()) {
+        continue;
       }
+      aggregator.processStatistics(statistics);
     }
   }
 
-  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean 
ascending) {
-    TsBlockSingleColumnIterator tsBlockIterator = 
tsBlock.getTsBlockSingleColumnIterator();
-    if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
-      return false;
-    }
-
-    if (ascending
-        && (tsBlockIterator.getEndTime() < timeRange.getMin()
-            || tsBlockIterator.currentTime() > timeRange.getMax())) {
-      return false;
-    }
-    if (!ascending
-        && (tsBlockIterator.getEndTime() > timeRange.getMax()
-            || tsBlockIterator.currentTime() < timeRange.getMin())) {
-      preCachedData = tsBlock;
-      return false;
-    }
-    return true;
-  }
-
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   private boolean readAndCalcFromPage(TimeRange curTimeRange) throws 
IOException {
-    while (alignedSeriesScanUtil.hasNextPage()) {
+    while (seriesScanUtil.hasNextPage()) {
       if (canUseCurrentPageStatistics()) {
-        Statistics pageTimeStatistics = 
alignedSeriesScanUtil.currentPageTimeStatistics();
+        Statistics pageTimeStatistics = 
seriesScanUtil.currentPageTimeStatistics();
         // There is no more eligible points in current time range
         if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) {
           if (ascending) {
             return true;
           } else {
-            alignedSeriesScanUtil.skipCurrentPage();
+            seriesScanUtil.skipCurrentPage();
             continue;
           }
         }
         // can use pageHeader
-        if (canUseCurrentPageStatistics()
-            && curTimeRange.contains(
-                pageTimeStatistics.getStartTime(), 
pageTimeStatistics.getEndTime())) {
+        if (curTimeRange.contains(
+            pageTimeStatistics.getStartTime(), 
pageTimeStatistics.getEndTime())) {
           Statistics[] statisticsList = new Statistics[subSensorSize];
           for (int i = 0; i < subSensorSize; i++) {
-            statisticsList[i] = alignedSeriesScanUtil.currentPageStatistics(i);
+            statisticsList[i] = seriesScanUtil.currentPageStatistics(i);
           }
-          calcFromStatistics(statisticsList);
-          alignedSeriesScanUtil.skipCurrentPage();
+          calcFromStatisticsArray(statisticsList);
+          seriesScanUtil.skipCurrentPage();
           if (isEndCalc(aggregators) && !isGroupByQuery) {
             return true;
           } else {
@@ -309,7 +165,7 @@ public class AlignedSeriesAggregationScanOperator 
implements DataSourceOperator
       }
 
       // calc from page data
-      TsBlock tsBlock = alignedSeriesScanUtil.nextPage();
+      TsBlock tsBlock = seriesScanUtil.nextPage();
       TsBlockSingleColumnIterator tsBlockIterator = 
tsBlock.getTsBlockSingleColumnIterator();
       if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
         continue;
@@ -337,14 +193,14 @@ public class AlignedSeriesAggregationScanOperator 
implements DataSourceOperator
   }
 
   private boolean readAndCalcFromChunk(TimeRange curTimeRange) throws 
IOException {
-    while (alignedSeriesScanUtil.hasNextChunk()) {
+    while (seriesScanUtil.hasNextChunk()) {
       if (canUseCurrentChunkStatistics()) {
-        Statistics chunkTimeStatistics = 
alignedSeriesScanUtil.currentChunkTimeStatistics();
+        Statistics chunkTimeStatistics = 
seriesScanUtil.currentChunkTimeStatistics();
         if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) {
           if (ascending) {
             return true;
           } else {
-            alignedSeriesScanUtil.skipCurrentChunk();
+            seriesScanUtil.skipCurrentChunk();
             continue;
           }
         }
@@ -354,10 +210,10 @@ public class AlignedSeriesAggregationScanOperator 
implements DataSourceOperator
           // calc from chunkMetaData
           Statistics[] statisticsList = new Statistics[subSensorSize];
           for (int i = 0; i < subSensorSize; i++) {
-            statisticsList[i] = 
alignedSeriesScanUtil.currentChunkStatistics(i);
+            statisticsList[i] = seriesScanUtil.currentChunkStatistics(i);
           }
-          calcFromStatistics(statisticsList);
-          alignedSeriesScanUtil.skipCurrentChunk();
+          calcFromStatisticsArray(statisticsList);
+          seriesScanUtil.skipCurrentChunk();
           if (isEndCalc(aggregators) && !isGroupByQuery) {
             return true;
           } else {
@@ -374,43 +230,27 @@ public class AlignedSeriesAggregationScanOperator 
implements DataSourceOperator
     return false;
   }
 
-  private void calcFromStatistics(Statistics[] statistics) {
-    for (int i = 0; i < aggregators.size(); i++) {
-      Aggregator aggregator = aggregators.get(i);
-      if (aggregator.hasFinalResult()) {
-        continue;
-      }
-      aggregator.processStatistics(statistics);
-    }
-  }
-
-  public boolean canUseCurrentFileStatistics() throws IOException {
-    Statistics fileStatistics = 
alignedSeriesScanUtil.currentFileTimeStatistics();
-    return !alignedSeriesScanUtil.isFileOverlapped()
-        && containedByTimeFilter(fileStatistics)
-        && !alignedSeriesScanUtil.currentFileModified();
+  private boolean canUseCurrentFileStatistics() throws IOException {
+    Statistics fileStatistics = seriesScanUtil.currentFileTimeStatistics();
+    return !seriesScanUtil.isFileOverlapped()
+        && fileStatistics.containedByTimeFilter(seriesScanUtil.getTimeFilter())
+        && !seriesScanUtil.currentFileModified();
   }
 
-  public boolean canUseCurrentChunkStatistics() throws IOException {
-    Statistics chunkStatistics = 
alignedSeriesScanUtil.currentChunkTimeStatistics();
-    return !alignedSeriesScanUtil.isChunkOverlapped()
-        && containedByTimeFilter(chunkStatistics)
-        && !alignedSeriesScanUtil.currentChunkModified();
+  private boolean canUseCurrentChunkStatistics() throws IOException {
+    Statistics chunkStatistics = seriesScanUtil.currentChunkTimeStatistics();
+    return !seriesScanUtil.isChunkOverlapped()
+        && 
chunkStatistics.containedByTimeFilter(seriesScanUtil.getTimeFilter())
+        && !seriesScanUtil.currentChunkModified();
   }
 
-  public boolean canUseCurrentPageStatistics() throws IOException {
-    Statistics currentPageStatistics = 
alignedSeriesScanUtil.currentPageTimeStatistics();
+  private boolean canUseCurrentPageStatistics() throws IOException {
+    Statistics currentPageStatistics = 
seriesScanUtil.currentPageTimeStatistics();
     if (currentPageStatistics == null) {
       return false;
     }
-    return !alignedSeriesScanUtil.isPageOverlapped()
-        && containedByTimeFilter(currentPageStatistics)
-        && !alignedSeriesScanUtil.currentPageModified();
-  }
-
-  private boolean containedByTimeFilter(Statistics statistics) {
-    Filter timeFilter = alignedSeriesScanUtil.getTimeFilter();
-    return timeFilter == null
-        || timeFilter.containStartEndTime(statistics.getStartTime(), 
statistics.getEndTime());
+    return !seriesScanUtil.isPageOverlapped()
+        && 
currentPageStatistics.containedByTimeFilter(seriesScanUtil.getTimeFilter())
+        && !seriesScanUtil.currentPageModified();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
index 42a9776015..bd46896174 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
@@ -36,6 +38,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -54,25 +57,24 @@ import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateR
  */
 public class SeriesAggregationScanOperator implements DataSourceOperator {
 
-  private final OperatorContext operatorContext;
-  private final PlanNodeId sourceId;
-  private final SeriesScanUtil seriesScanUtil;
-  private final boolean ascending;
+  protected final OperatorContext operatorContext;
+  protected final PlanNodeId sourceId;
+  protected final SeriesScanUtil seriesScanUtil;
+  protected final boolean ascending;
   // We still think aggregator in SeriesAggregateScanOperator is a inputRaw 
step.
   // But in facing of statistics, it will invoke another method 
processStatistics()
-  private List<Aggregator> aggregators;
+  protected final List<Aggregator> aggregators;
 
-  private ITimeRangeIterator timeRangeIterator;
+  protected final ITimeRangeIterator timeRangeIterator;
   // current interval of aggregation window [curStartTime, curEndTime)
-  private TimeRange curTimeRange;
-  private boolean isGroupByQuery;
+  protected TimeRange curTimeRange;
+  protected final boolean isGroupByQuery;
 
-  private TsBlock preCachedData;
+  protected TsBlock preCachedData;
 
-  private TsBlockBuilder tsBlockBuilder;
-  private TsBlock resultTsBlock;
-  private boolean hasCachedTsBlock = false;
-  private boolean finished = false;
+  protected final TsBlockBuilder tsBlockBuilder;
+  protected TsBlock resultTsBlock;
+  protected boolean finished = false;
 
   public SeriesAggregationScanOperator(
       PlanNodeId sourceId,
@@ -86,15 +88,30 @@ public class SeriesAggregationScanOperator implements 
DataSourceOperator {
     this.sourceId = sourceId;
     this.operatorContext = context;
     this.ascending = ascending;
-    this.seriesScanUtil =
-        new SeriesScanUtil(
-            seriesPath,
-            allSensors,
-            seriesPath.getSeriesType(),
-            context.getInstanceContext(),
-            timeFilter,
-            null,
-            ascending);
+
+    if (seriesPath instanceof MeasurementPath) {
+      this.seriesScanUtil =
+          new SeriesScanUtil(
+              seriesPath,
+              allSensors,
+              seriesPath.getSeriesType(),
+              context.getInstanceContext(),
+              timeFilter,
+              null,
+              ascending);
+    } else if (seriesPath instanceof AlignedPath) {
+      this.seriesScanUtil =
+          new AlignedSeriesScanUtil(
+              seriesPath,
+              new HashSet<>(((AlignedPath) seriesPath).getMeasurementList()),
+              context.getInstanceContext(),
+              timeFilter,
+              null,
+              ascending);
+    } else {
+      throw new UnsupportedOperationException("");
+    }
+
     this.aggregators = aggregators;
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
@@ -110,48 +127,61 @@ public class SeriesAggregationScanOperator implements 
DataSourceOperator {
     return operatorContext;
   }
 
+  @Override
+  public boolean hasNext() {
+    return timeRangeIterator.hasNextTimeRange();
+  }
+
   @Override
   public TsBlock next() {
-    if (hasCachedTsBlock || hasNext()) {
-      hasCachedTsBlock = false;
-      return resultTsBlock;
+    if (!timeRangeIterator.hasNextTimeRange()) {
+      return null;
     }
-    return null;
+    curTimeRange = timeRangeIterator.nextTimeRange();
+
+    // 1. Clear previous aggregation result
+    for (Aggregator aggregator : aggregators) {
+      aggregator.reset();
+      aggregator.updateTimeRange(curTimeRange);
+    }
+
+    // 2. Calculate aggregation result based on current time window
+    calculateNextResult();
+    return resultTsBlock;
   }
 
   @Override
-  public boolean hasNext() {
-    if (hasCachedTsBlock) {
-      return true;
-    }
-    try {
-      if (!timeRangeIterator.hasNextTimeRange()) {
-        return false;
-      }
-      curTimeRange = timeRangeIterator.nextTimeRange();
+  public boolean isFinished() {
+    return finished || (finished = !hasNext());
+  }
 
-      // 1. Clear previous aggregation result
-      for (Aggregator aggregator : aggregators) {
-        aggregator.reset();
-        aggregator.updateTimeRange(curTimeRange);
-      }
+  @Override
+  public PlanNodeId getSourceId() {
+    return sourceId;
+  }
 
-      // 2. Calculate aggregation result based on current time window
+  @Override
+  public void initQueryDataSource(QueryDataSource dataSource) {
+    seriesScanUtil.initQueryDataSource(dataSource);
+  }
+
+  protected void calculateNextResult() {
+    try {
       if (calcFromCacheData(curTimeRange)) {
         updateResultTsBlock();
-        return true;
+        return;
       }
 
       // read page data firstly
       if (readAndCalcFromPage(curTimeRange)) {
         updateResultTsBlock();
-        return true;
+        return;
       }
 
       // read chunk data secondly
       if (readAndCalcFromChunk(curTimeRange)) {
         updateResultTsBlock();
-        return true;
+        return;
       }
 
       // read from file first
@@ -161,7 +191,7 @@ public class SeriesAggregationScanOperator implements 
DataSourceOperator {
           if (fileStatistics.getStartTime() > curTimeRange.getMax()) {
             if (ascending) {
               updateResultTsBlock();
-              return true;
+              return;
             } else {
               seriesScanUtil.skipCurrentFile();
               continue;
@@ -182,40 +212,23 @@ public class SeriesAggregationScanOperator implements 
DataSourceOperator {
         // read chunk
         if (readAndCalcFromChunk(curTimeRange)) {
           updateResultTsBlock();
-          return true;
+          return;
         }
       }
 
       updateResultTsBlock();
-      return true;
     } catch (IOException e) {
       throw new RuntimeException("Error while scanning the file", e);
     }
   }
 
-  private void updateResultTsBlock() {
+  protected void updateResultTsBlock() {
     resultTsBlock =
         updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, 
timeRangeIterator);
-    hasCachedTsBlock = true;
-  }
-
-  @Override
-  public boolean isFinished() {
-    return finished || (finished = !hasNext());
-  }
-
-  @Override
-  public PlanNodeId getSourceId() {
-    return sourceId;
-  }
-
-  @Override
-  public void initQueryDataSource(QueryDataSource dataSource) {
-    seriesScanUtil.initQueryDataSource(dataSource);
   }
 
   /** @return if already get the result */
-  private boolean calcFromCacheData(TimeRange curTimeRange) throws IOException 
{
+  protected boolean calcFromCacheData(TimeRange curTimeRange) throws 
IOException {
     calcFromBatch(preCachedData, curTimeRange);
     // The result is calculated from the cache
     return (preCachedData != null
@@ -225,8 +238,7 @@ public class SeriesAggregationScanOperator implements 
DataSourceOperator {
         || isEndCalc(aggregators);
   }
 
-  @SuppressWarnings("squid:S3776")
-  private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) {
+  protected void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) {
     // check if the batchData does not contain points in current interval
     if (tsBlock != null && satisfied(tsBlock, curTimeRange, ascending)) {
       // skip points that cannot be calculated
@@ -257,7 +269,7 @@ public class SeriesAggregationScanOperator implements 
DataSourceOperator {
     }
   }
 
-  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean 
ascending) {
+  protected boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean 
ascending) {
     TsBlockSingleColumnIterator tsBlockIterator = 
tsBlock.getTsBlockSingleColumnIterator();
     if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
       return false;
@@ -277,7 +289,15 @@ public class SeriesAggregationScanOperator implements 
DataSourceOperator {
     return true;
   }
 
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
+  private void calcFromStatistics(Statistics statistics) {
+    for (Aggregator aggregator : aggregators) {
+      if (aggregator.hasFinalResult()) {
+        continue;
+      }
+      aggregator.processStatistics(statistics);
+    }
+  }
+
   private boolean readAndCalcFromPage(TimeRange curTimeRange) throws 
IOException {
     while (seriesScanUtil.hasNextPage()) {
       if (canUseCurrentPageStatistics()) {
@@ -363,43 +383,27 @@ public class SeriesAggregationScanOperator implements 
DataSourceOperator {
     return false;
   }
 
-  private void calcFromStatistics(Statistics statistics) {
-    for (int i = 0; i < aggregators.size(); i++) {
-      Aggregator aggregator = aggregators.get(i);
-      if (aggregator.hasFinalResult()) {
-        continue;
-      }
-      aggregator.processStatistics(statistics);
-    }
-  }
-
-  public boolean canUseCurrentFileStatistics() throws IOException {
+  private boolean canUseCurrentFileStatistics() throws IOException {
     Statistics fileStatistics = seriesScanUtil.currentFileStatistics();
     return !seriesScanUtil.isFileOverlapped()
-        && containedByTimeFilter(fileStatistics)
+        && fileStatistics.containedByTimeFilter(seriesScanUtil.getTimeFilter())
         && !seriesScanUtil.currentFileModified();
   }
 
-  public boolean canUseCurrentChunkStatistics() throws IOException {
+  private boolean canUseCurrentChunkStatistics() throws IOException {
     Statistics chunkStatistics = seriesScanUtil.currentChunkStatistics();
     return !seriesScanUtil.isChunkOverlapped()
-        && containedByTimeFilter(chunkStatistics)
+        && 
chunkStatistics.containedByTimeFilter(seriesScanUtil.getTimeFilter())
         && !seriesScanUtil.currentChunkModified();
   }
 
-  public boolean canUseCurrentPageStatistics() throws IOException {
+  private boolean canUseCurrentPageStatistics() throws IOException {
     Statistics currentPageStatistics = seriesScanUtil.currentPageStatistics();
     if (currentPageStatistics == null) {
       return false;
     }
     return !seriesScanUtil.isPageOverlapped()
-        && containedByTimeFilter(currentPageStatistics)
+        && 
currentPageStatistics.containedByTimeFilter(seriesScanUtil.getTimeFilter())
         && !seriesScanUtil.currentPageModified();
   }
-
-  private boolean containedByTimeFilter(Statistics statistics) {
-    Filter timeFilter = seriesScanUtil.getTimeFilter();
-    return timeFilter == null
-        || timeFilter.containStartEndTime(statistics.getStartTime(), 
statistics.getEndTime());
-  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 012fbe729d..197d9848f8 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.tsfile.file.metadata.statistics;
 import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
 import org.apache.iotdb.tsfile.exception.write.UnknownColumnTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -373,6 +374,10 @@ public abstract class Statistics<T extends Serializable> {
 
   public abstract long calculateRamSize();
 
+  public boolean containedByTimeFilter(Filter timeFilter) {
+    return timeFilter == null || 
timeFilter.containStartEndTime(getStartTime(), getEndTime());
+  }
+
   @Override
   public String toString() {
     return "startTime: " + startTime + " endTime: " + endTime + " count: " + 
count;

Reply via email to