This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/aggrOpRefactor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4a3b4e7cf1f24245c8af924a003a2fc1dbf3b16f Author: Minghui Liu <liuminghui...@foxmail.com> AuthorDate: Tue Jul 5 09:44:26 2022 +0800 refactor RawDataAggregationOperator to batch process --- .../process/RawDataAggregationOperator.java | 31 +++++++++++++++++----- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java index a64a924426..55e47bb159 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java @@ -35,11 +35,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isEndCalc; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.satisfied; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.skipOutOfTimeRangePoints; -import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateResultTsBlockFromAggregators; /** * RawDataAggregationOperator is used to process raw data tsBlock input calculating using value @@ -57,14 +57,14 @@ public class RawDataAggregationOperator implements ProcessOperator { private final List<Aggregator> aggregators; private final Operator child; private final boolean ascending; - private ITimeRangeIterator timeRangeIterator; + private final ITimeRangeIterator timeRangeIterator; // current interval of aggregation window [curStartTime, curEndTime) private TimeRange curTimeRange; private TsBlock preCachedData; // Using for building result tsBlock - private final TsBlockBuilder tsBlockBuilder; + private final TsBlockBuilder resultTsBlockBuilder; public RawDataAggregationOperator( OperatorContext operatorContext, @@ -81,7 +81,7 @@ public class RawDataAggregationOperator implements ProcessOperator { for (Aggregator aggregator : aggregators) { dataTypes.addAll(Arrays.asList(aggregator.getOutputType())); } - tsBlockBuilder = new TsBlockBuilder(dataTypes); + resultTsBlockBuilder = new TsBlockBuilder(dataTypes); this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true); } @@ -97,9 +97,26 @@ public class RawDataAggregationOperator implements ProcessOperator { @Override public TsBlock next() { + resultTsBlockBuilder.reset(); + while ((curTimeRange != null || timeRangeIterator.hasNextTimeRange()) + && !resultTsBlockBuilder.isFull()) { + if (!calculateNextResult()) { + break; + } + } + + if (resultTsBlockBuilder.getPositionCount() > 0) { + return resultTsBlockBuilder.build(); + } else { + return null; + } + } + + private boolean calculateNextResult() { // Move to next timeRange if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) { curTimeRange = timeRangeIterator.nextTimeRange(); + // 0. Clear previous aggregation result for (Aggregator aggregator : aggregators) { aggregator.reset(); aggregator.updateTimeRange(curTimeRange); @@ -116,7 +133,7 @@ public class RawDataAggregationOperator implements ProcessOperator { canCallNext = false; // if child still has next but can't be invoked now } else if (child.hasNext()) { - return null; + return false; } else { break; } @@ -124,7 +141,9 @@ public class RawDataAggregationOperator implements ProcessOperator { // 2. Update result using aggregators curTimeRange = null; - return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, timeRangeIterator); + appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator); + + return true; } @Override