This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/aggrOpRefactor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8c12329a0a807a852e8c4238ae160f1ad3c021ca Author: Minghui Liu <[email protected]> AuthorDate: Mon Jul 4 15:45:29 2022 +0800 move calculate code to next() & extends SeriesAggregationScanOperator --- .../AlignedSeriesAggregationScanOperator.java | 278 +++++---------------- .../source/SeriesAggregationScanOperator.java | 186 +++++++------- .../file/metadata/statistics/Statistics.java | 5 + 3 files changed, 159 insertions(+), 310 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java index f90d8a26f3..15cbab8795 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java @@ -19,55 +19,28 @@ package org.apache.iotdb.db.mpp.execution.operator.source; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.mpp.aggregation.Aggregator; -import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator; -import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; +import java.util.Collections; import java.util.List; -import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isEndCalc; -import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.skipOutOfTimeRangePoints; -import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateResultTsBlockFromAggregators; /** This operator is responsible to do the aggregation calculation especially for aligned series. */ -public class AlignedSeriesAggregationScanOperator implements DataSourceOperator { +public class AlignedSeriesAggregationScanOperator extends SeriesAggregationScanOperator + implements DataSourceOperator { - private final OperatorContext operatorContext; - private final PlanNodeId sourceId; - private final AlignedSeriesScanUtil alignedSeriesScanUtil; private final int subSensorSize; - private final boolean ascending; - // We still think aggregator in AlignedSeriesAggregateScanOperator is a inputRaw step. - // But in facing of statistics, it will invoke another method processStatistics() - private List<Aggregator> aggregators; - - private ITimeRangeIterator timeRangeIterator; - // current interval of aggregation window [curStartTime, curEndTime) - private TimeRange curTimeRange; - private boolean isGroupByQuery; - - private TsBlock preCachedData; - - private TsBlockBuilder tsBlockBuilder; - private TsBlock resultTsBlock; - private boolean hasCachedTsBlock = false; - private boolean finished = false; public AlignedSeriesAggregationScanOperator( PlanNodeId sourceId, @@ -77,87 +50,48 @@ public class AlignedSeriesAggregationScanOperator implements DataSourceOperator Filter timeFilter, boolean ascending, GroupByTimeParameter groupByTimeParameter) { - this.sourceId = sourceId; - this.operatorContext = context; - this.ascending = ascending; - this.alignedSeriesScanUtil = - new AlignedSeriesScanUtil( - seriesPath, - new HashSet<>(seriesPath.getMeasurementList()), - context.getInstanceContext(), - timeFilter, - null, - ascending); + super( + sourceId, + seriesPath, + Collections.emptySet(), + context, + aggregators, + timeFilter, + ascending, + groupByTimeParameter); this.subSensorSize = seriesPath.getMeasurementList().size(); - this.aggregators = aggregators; - List<TSDataType> dataTypes = new ArrayList<>(); - for (Aggregator aggregator : aggregators) { - dataTypes.addAll(Arrays.asList(aggregator.getOutputType())); - } - tsBlockBuilder = new TsBlockBuilder(dataTypes); - this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true); - this.isGroupByQuery = groupByTimeParameter != null; - } - - @Override - public OperatorContext getOperatorContext() { - return operatorContext; } @Override - public TsBlock next() { - if (hasCachedTsBlock || hasNext()) { - hasCachedTsBlock = false; - return resultTsBlock; - } - return null; - } - - @Override - public boolean hasNext() { - if (hasCachedTsBlock) { - return true; - } + protected void calculateNextResult() { try { - if (!timeRangeIterator.hasNextTimeRange()) { - return false; - } - curTimeRange = timeRangeIterator.nextTimeRange(); - - // 1. Clear previous aggregation result - for (Aggregator aggregator : aggregators) { - aggregator.reset(); - aggregator.updateTimeRange(curTimeRange); - } - - // 2. Calculate aggregation result based on current time window if (calcFromCacheData(curTimeRange)) { updateResultTsBlock(); - return true; + return; } // read page data firstly if (readAndCalcFromPage(curTimeRange)) { updateResultTsBlock(); - return true; + return; } // read chunk data secondly if (readAndCalcFromChunk(curTimeRange)) { updateResultTsBlock(); - return true; + return; } // read from file first - while (alignedSeriesScanUtil.hasNextFile()) { + while (seriesScanUtil.hasNextFile()) { if (canUseCurrentFileStatistics()) { - Statistics fileTimeStatistics = alignedSeriesScanUtil.currentFileTimeStatistics(); + Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics(); if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) { if (ascending) { updateResultTsBlock(); - return true; + return; } else { - alignedSeriesScanUtil.skipCurrentFile(); + seriesScanUtil.skipCurrentFile(); continue; } } @@ -166,10 +100,10 @@ public class AlignedSeriesAggregationScanOperator implements DataSourceOperator fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) { Statistics[] statisticsList = new Statistics[subSensorSize]; for (int i = 0; i < subSensorSize; i++) { - statisticsList[i] = alignedSeriesScanUtil.currentFileStatistics(i); + statisticsList[i] = seriesScanUtil.currentFileStatistics(i); } - calcFromStatistics(statisticsList); - alignedSeriesScanUtil.skipCurrentFile(); + calcFromStatisticsArray(statisticsList); + seriesScanUtil.skipCurrentFile(); if (isEndCalc(aggregators) && !isGroupByQuery) { break; } else { @@ -181,125 +115,47 @@ public class AlignedSeriesAggregationScanOperator implements DataSourceOperator // read chunk if (readAndCalcFromChunk(curTimeRange)) { updateResultTsBlock(); - return true; + return; } } updateResultTsBlock(); - return true; } catch (IOException e) { throw new RuntimeException("Error while scanning the file", e); } } - @Override - public boolean isFinished() { - return finished || (finished = !hasNext()); - } - - @Override - public void initQueryDataSource(QueryDataSource dataSource) { - alignedSeriesScanUtil.initQueryDataSource(dataSource); - } - - @Override - public PlanNodeId getSourceId() { - return sourceId; - } - - private void updateResultTsBlock() { - resultTsBlock = - updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, timeRangeIterator); - hasCachedTsBlock = true; - } - - /** @return if already get the result */ - private boolean calcFromCacheData(TimeRange curTimeRange) throws IOException { - calcFromBatch(preCachedData, curTimeRange); - // The result is calculated from the cache - return (preCachedData != null - && (ascending - ? preCachedData.getEndTime() > curTimeRange.getMax() - : preCachedData.getEndTime() < curTimeRange.getMin())) - || isEndCalc(aggregators); - } - - @SuppressWarnings("squid:S3776") - private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) { - // check if the batchData does not contain points in current interval - if (tsBlock != null && satisfied(tsBlock, curTimeRange, ascending)) { - // skip points that cannot be calculated - if ((ascending && tsBlock.getStartTime() < curTimeRange.getMin()) - || (!ascending && tsBlock.getStartTime() > curTimeRange.getMax())) { - tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange, ascending); - } - - int lastReadRowIndex = 0; - for (Aggregator aggregator : aggregators) { - // current agg method has been calculated - if (aggregator.hasFinalResult()) { - continue; - } - - lastReadRowIndex = Math.max(lastReadRowIndex, aggregator.processTsBlock(tsBlock)); - } - if (lastReadRowIndex >= tsBlock.getPositionCount()) { - tsBlock = null; - } else { - tsBlock = tsBlock.subTsBlock(lastReadRowIndex); - } - - // can calc for next interval - if (tsBlock != null && tsBlock.getTsBlockSingleColumnIterator().hasNext()) { - preCachedData = tsBlock; + private void calcFromStatisticsArray(Statistics[] statistics) { + for (Aggregator aggregator : aggregators) { + if (aggregator.hasFinalResult()) { + continue; } + aggregator.processStatistics(statistics); } } - private boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean ascending) { - TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator(); - if (tsBlockIterator == null || !tsBlockIterator.hasNext()) { - return false; - } - - if (ascending - && (tsBlockIterator.getEndTime() < timeRange.getMin() - || tsBlockIterator.currentTime() > timeRange.getMax())) { - return false; - } - if (!ascending - && (tsBlockIterator.getEndTime() > timeRange.getMax() - || tsBlockIterator.currentTime() < timeRange.getMin())) { - preCachedData = tsBlock; - return false; - } - return true; - } - - @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning private boolean readAndCalcFromPage(TimeRange curTimeRange) throws IOException { - while (alignedSeriesScanUtil.hasNextPage()) { + while (seriesScanUtil.hasNextPage()) { if (canUseCurrentPageStatistics()) { - Statistics pageTimeStatistics = alignedSeriesScanUtil.currentPageTimeStatistics(); + Statistics pageTimeStatistics = seriesScanUtil.currentPageTimeStatistics(); // There is no more eligible points in current time range if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) { if (ascending) { return true; } else { - alignedSeriesScanUtil.skipCurrentPage(); + seriesScanUtil.skipCurrentPage(); continue; } } // can use pageHeader - if (canUseCurrentPageStatistics() - && curTimeRange.contains( - pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) { + if (curTimeRange.contains( + pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) { Statistics[] statisticsList = new Statistics[subSensorSize]; for (int i = 0; i < subSensorSize; i++) { - statisticsList[i] = alignedSeriesScanUtil.currentPageStatistics(i); + statisticsList[i] = seriesScanUtil.currentPageStatistics(i); } - calcFromStatistics(statisticsList); - alignedSeriesScanUtil.skipCurrentPage(); + calcFromStatisticsArray(statisticsList); + seriesScanUtil.skipCurrentPage(); if (isEndCalc(aggregators) && !isGroupByQuery) { return true; } else { @@ -309,7 +165,7 @@ public class AlignedSeriesAggregationScanOperator implements DataSourceOperator } // calc from page data - TsBlock tsBlock = alignedSeriesScanUtil.nextPage(); + TsBlock tsBlock = seriesScanUtil.nextPage(); TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator(); if (tsBlockIterator == null || !tsBlockIterator.hasNext()) { continue; @@ -337,14 +193,14 @@ public class AlignedSeriesAggregationScanOperator implements DataSourceOperator } private boolean readAndCalcFromChunk(TimeRange curTimeRange) throws IOException { - while (alignedSeriesScanUtil.hasNextChunk()) { + while (seriesScanUtil.hasNextChunk()) { if (canUseCurrentChunkStatistics()) { - Statistics chunkTimeStatistics = alignedSeriesScanUtil.currentChunkTimeStatistics(); + Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics(); if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) { if (ascending) { return true; } else { - alignedSeriesScanUtil.skipCurrentChunk(); + seriesScanUtil.skipCurrentChunk(); continue; } } @@ -354,10 +210,10 @@ public class AlignedSeriesAggregationScanOperator implements DataSourceOperator // calc from chunkMetaData Statistics[] statisticsList = new Statistics[subSensorSize]; for (int i = 0; i < subSensorSize; i++) { - statisticsList[i] = alignedSeriesScanUtil.currentChunkStatistics(i); + statisticsList[i] = seriesScanUtil.currentChunkStatistics(i); } - calcFromStatistics(statisticsList); - alignedSeriesScanUtil.skipCurrentChunk(); + calcFromStatisticsArray(statisticsList); + seriesScanUtil.skipCurrentChunk(); if (isEndCalc(aggregators) && !isGroupByQuery) { return true; } else { @@ -374,43 +230,27 @@ public class AlignedSeriesAggregationScanOperator implements DataSourceOperator return false; } - private void calcFromStatistics(Statistics[] statistics) { - for (int i = 0; i < aggregators.size(); i++) { - Aggregator aggregator = aggregators.get(i); - if (aggregator.hasFinalResult()) { - continue; - } - aggregator.processStatistics(statistics); - } - } - - public boolean canUseCurrentFileStatistics() throws IOException { - Statistics fileStatistics = alignedSeriesScanUtil.currentFileTimeStatistics(); - return !alignedSeriesScanUtil.isFileOverlapped() - && containedByTimeFilter(fileStatistics) - && !alignedSeriesScanUtil.currentFileModified(); + private boolean canUseCurrentFileStatistics() throws IOException { + Statistics fileStatistics = seriesScanUtil.currentFileTimeStatistics(); + return !seriesScanUtil.isFileOverlapped() + && fileStatistics.containedByTimeFilter(seriesScanUtil.getTimeFilter()) + && !seriesScanUtil.currentFileModified(); } - public boolean canUseCurrentChunkStatistics() throws IOException { - Statistics chunkStatistics = alignedSeriesScanUtil.currentChunkTimeStatistics(); - return !alignedSeriesScanUtil.isChunkOverlapped() - && containedByTimeFilter(chunkStatistics) - && !alignedSeriesScanUtil.currentChunkModified(); + private boolean canUseCurrentChunkStatistics() throws IOException { + Statistics chunkStatistics = seriesScanUtil.currentChunkTimeStatistics(); + return !seriesScanUtil.isChunkOverlapped() + && chunkStatistics.containedByTimeFilter(seriesScanUtil.getTimeFilter()) + && !seriesScanUtil.currentChunkModified(); } - public boolean canUseCurrentPageStatistics() throws IOException { - Statistics currentPageStatistics = alignedSeriesScanUtil.currentPageTimeStatistics(); + private boolean canUseCurrentPageStatistics() throws IOException { + Statistics currentPageStatistics = seriesScanUtil.currentPageTimeStatistics(); if (currentPageStatistics == null) { return false; } - return !alignedSeriesScanUtil.isPageOverlapped() - && containedByTimeFilter(currentPageStatistics) - && !alignedSeriesScanUtil.currentPageModified(); - } - - private boolean containedByTimeFilter(Statistics statistics) { - Filter timeFilter = alignedSeriesScanUtil.getTimeFilter(); - return timeFilter == null - || timeFilter.containStartEndTime(statistics.getStartTime(), statistics.getEndTime()); + return !seriesScanUtil.isPageOverlapped() + && currentPageStatistics.containedByTimeFilter(seriesScanUtil.getTimeFilter()) + && !seriesScanUtil.currentPageModified(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java index 42a9776015..bd46896174 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.metadata.path.AlignedPath; +import org.apache.iotdb.db.metadata.path.MeasurementPath; import org.apache.iotdb.db.mpp.aggregation.Aggregator; import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; @@ -36,6 +38,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -54,25 +57,24 @@ import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateR */ public class SeriesAggregationScanOperator implements DataSourceOperator { - private final OperatorContext operatorContext; - private final PlanNodeId sourceId; - private final SeriesScanUtil seriesScanUtil; - private final boolean ascending; + protected final OperatorContext operatorContext; + protected final PlanNodeId sourceId; + protected final SeriesScanUtil seriesScanUtil; + protected final boolean ascending; // We still think aggregator in SeriesAggregateScanOperator is a inputRaw step. // But in facing of statistics, it will invoke another method processStatistics() - private List<Aggregator> aggregators; + protected final List<Aggregator> aggregators; - private ITimeRangeIterator timeRangeIterator; + protected final ITimeRangeIterator timeRangeIterator; // current interval of aggregation window [curStartTime, curEndTime) - private TimeRange curTimeRange; - private boolean isGroupByQuery; + protected TimeRange curTimeRange; + protected final boolean isGroupByQuery; - private TsBlock preCachedData; + protected TsBlock preCachedData; - private TsBlockBuilder tsBlockBuilder; - private TsBlock resultTsBlock; - private boolean hasCachedTsBlock = false; - private boolean finished = false; + protected final TsBlockBuilder tsBlockBuilder; + protected TsBlock resultTsBlock; + protected boolean finished = false; public SeriesAggregationScanOperator( PlanNodeId sourceId, @@ -86,15 +88,30 @@ public class SeriesAggregationScanOperator implements DataSourceOperator { this.sourceId = sourceId; this.operatorContext = context; this.ascending = ascending; - this.seriesScanUtil = - new SeriesScanUtil( - seriesPath, - allSensors, - seriesPath.getSeriesType(), - context.getInstanceContext(), - timeFilter, - null, - ascending); + + if (seriesPath instanceof MeasurementPath) { + this.seriesScanUtil = + new SeriesScanUtil( + seriesPath, + allSensors, + seriesPath.getSeriesType(), + context.getInstanceContext(), + timeFilter, + null, + ascending); + } else if (seriesPath instanceof AlignedPath) { + this.seriesScanUtil = + new AlignedSeriesScanUtil( + seriesPath, + new HashSet<>(((AlignedPath) seriesPath).getMeasurementList()), + context.getInstanceContext(), + timeFilter, + null, + ascending); + } else { + throw new UnsupportedOperationException(""); + } + this.aggregators = aggregators; List<TSDataType> dataTypes = new ArrayList<>(); for (Aggregator aggregator : aggregators) { @@ -110,48 +127,61 @@ public class SeriesAggregationScanOperator implements DataSourceOperator { return operatorContext; } + @Override + public boolean hasNext() { + return timeRangeIterator.hasNextTimeRange(); + } + @Override public TsBlock next() { - if (hasCachedTsBlock || hasNext()) { - hasCachedTsBlock = false; - return resultTsBlock; + if (!timeRangeIterator.hasNextTimeRange()) { + return null; } - return null; + curTimeRange = timeRangeIterator.nextTimeRange(); + + // 1. Clear previous aggregation result + for (Aggregator aggregator : aggregators) { + aggregator.reset(); + aggregator.updateTimeRange(curTimeRange); + } + + // 2. Calculate aggregation result based on current time window + calculateNextResult(); + return resultTsBlock; } @Override - public boolean hasNext() { - if (hasCachedTsBlock) { - return true; - } - try { - if (!timeRangeIterator.hasNextTimeRange()) { - return false; - } - curTimeRange = timeRangeIterator.nextTimeRange(); + public boolean isFinished() { + return finished || (finished = !hasNext()); + } - // 1. Clear previous aggregation result - for (Aggregator aggregator : aggregators) { - aggregator.reset(); - aggregator.updateTimeRange(curTimeRange); - } + @Override + public PlanNodeId getSourceId() { + return sourceId; + } - // 2. Calculate aggregation result based on current time window + @Override + public void initQueryDataSource(QueryDataSource dataSource) { + seriesScanUtil.initQueryDataSource(dataSource); + } + + protected void calculateNextResult() { + try { if (calcFromCacheData(curTimeRange)) { updateResultTsBlock(); - return true; + return; } // read page data firstly if (readAndCalcFromPage(curTimeRange)) { updateResultTsBlock(); - return true; + return; } // read chunk data secondly if (readAndCalcFromChunk(curTimeRange)) { updateResultTsBlock(); - return true; + return; } // read from file first @@ -161,7 +191,7 @@ public class SeriesAggregationScanOperator implements DataSourceOperator { if (fileStatistics.getStartTime() > curTimeRange.getMax()) { if (ascending) { updateResultTsBlock(); - return true; + return; } else { seriesScanUtil.skipCurrentFile(); continue; @@ -182,40 +212,23 @@ public class SeriesAggregationScanOperator implements DataSourceOperator { // read chunk if (readAndCalcFromChunk(curTimeRange)) { updateResultTsBlock(); - return true; + return; } } updateResultTsBlock(); - return true; } catch (IOException e) { throw new RuntimeException("Error while scanning the file", e); } } - private void updateResultTsBlock() { + protected void updateResultTsBlock() { resultTsBlock = updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, timeRangeIterator); - hasCachedTsBlock = true; - } - - @Override - public boolean isFinished() { - return finished || (finished = !hasNext()); - } - - @Override - public PlanNodeId getSourceId() { - return sourceId; - } - - @Override - public void initQueryDataSource(QueryDataSource dataSource) { - seriesScanUtil.initQueryDataSource(dataSource); } /** @return if already get the result */ - private boolean calcFromCacheData(TimeRange curTimeRange) throws IOException { + protected boolean calcFromCacheData(TimeRange curTimeRange) throws IOException { calcFromBatch(preCachedData, curTimeRange); // The result is calculated from the cache return (preCachedData != null @@ -225,8 +238,7 @@ public class SeriesAggregationScanOperator implements DataSourceOperator { || isEndCalc(aggregators); } - @SuppressWarnings("squid:S3776") - private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) { + protected void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) { // check if the batchData does not contain points in current interval if (tsBlock != null && satisfied(tsBlock, curTimeRange, ascending)) { // skip points that cannot be calculated @@ -257,7 +269,7 @@ public class SeriesAggregationScanOperator implements DataSourceOperator { } } - private boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean ascending) { + protected boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean ascending) { TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator(); if (tsBlockIterator == null || !tsBlockIterator.hasNext()) { return false; @@ -277,7 +289,15 @@ public class SeriesAggregationScanOperator implements DataSourceOperator { return true; } - @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning + private void calcFromStatistics(Statistics statistics) { + for (Aggregator aggregator : aggregators) { + if (aggregator.hasFinalResult()) { + continue; + } + aggregator.processStatistics(statistics); + } + } + private boolean readAndCalcFromPage(TimeRange curTimeRange) throws IOException { while (seriesScanUtil.hasNextPage()) { if (canUseCurrentPageStatistics()) { @@ -363,43 +383,27 @@ public class SeriesAggregationScanOperator implements DataSourceOperator { return false; } - private void calcFromStatistics(Statistics statistics) { - for (int i = 0; i < aggregators.size(); i++) { - Aggregator aggregator = aggregators.get(i); - if (aggregator.hasFinalResult()) { - continue; - } - aggregator.processStatistics(statistics); - } - } - - public boolean canUseCurrentFileStatistics() throws IOException { + private boolean canUseCurrentFileStatistics() throws IOException { Statistics fileStatistics = seriesScanUtil.currentFileStatistics(); return !seriesScanUtil.isFileOverlapped() - && containedByTimeFilter(fileStatistics) + && fileStatistics.containedByTimeFilter(seriesScanUtil.getTimeFilter()) && !seriesScanUtil.currentFileModified(); } - public boolean canUseCurrentChunkStatistics() throws IOException { + private boolean canUseCurrentChunkStatistics() throws IOException { Statistics chunkStatistics = seriesScanUtil.currentChunkStatistics(); return !seriesScanUtil.isChunkOverlapped() - && containedByTimeFilter(chunkStatistics) + && chunkStatistics.containedByTimeFilter(seriesScanUtil.getTimeFilter()) && !seriesScanUtil.currentChunkModified(); } - public boolean canUseCurrentPageStatistics() throws IOException { + private boolean canUseCurrentPageStatistics() throws IOException { Statistics currentPageStatistics = seriesScanUtil.currentPageStatistics(); if (currentPageStatistics == null) { return false; } return !seriesScanUtil.isPageOverlapped() - && containedByTimeFilter(currentPageStatistics) + && currentPageStatistics.containedByTimeFilter(seriesScanUtil.getTimeFilter()) && !seriesScanUtil.currentPageModified(); } - - private boolean containedByTimeFilter(Statistics statistics) { - Filter timeFilter = seriesScanUtil.getTimeFilter(); - return timeFilter == null - || timeFilter.containStartEndTime(statistics.getStartTime(), statistics.getEndTime()); - } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java index 012fbe729d..197d9848f8 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java @@ -21,6 +21,7 @@ package org.apache.iotdb.tsfile.file.metadata.statistics; import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; import org.apache.iotdb.tsfile.exception.write.UnknownColumnTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -373,6 +374,10 @@ public abstract class Statistics<T extends Serializable> { public abstract long calculateRamSize(); + public boolean containedByTimeFilter(Filter timeFilter) { + return timeFilter == null || timeFilter.containStartEndTime(getStartTime(), getEndTime()); + } + @Override public String toString() { return "startTime: " + startTime + " endTime: " + endTime + " count: " + count;
