This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/aggrOpRefactor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2270436878714108ba9be6c52ed26ee90608451d Author: Minghui Liu <[email protected]> AuthorDate: Wed Jul 6 23:11:49 2022 +0800 refactor SlidingWindowAggregationOperator to batch process --- .../slidingwindow/SlidingWindowAggregator.java | 2 +- .../db/mpp/execution/operator/AggregationUtil.java | 1 + .../process/SlidingWindowAggregationOperator.java | 120 +++++++++++++-------- 3 files changed, 79 insertions(+), 44 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java index aed468c8d8..ec848b354d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java @@ -66,7 +66,7 @@ public abstract class SlidingWindowAggregator extends Aggregator { valueColumn[i] = tsBlock.getColumn(inputLocation.getValueColumnIndex()); } processPartialResult(new PartialAggregationResult(timeColumn, valueColumn)); - return tsBlock.getPositionCount(); + return 1; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java index 64b80d0bfe..f13552c005 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java @@ -118,6 +118,7 @@ public class AggregationUtil { return tsBlock.subTsBlock(left); } + // check if the batchData does not contain points in current interval public static boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean ascending) { TsBlock.TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator(); if (tsBlockIterator == null || !tsBlockIterator.hasNext()) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java index 9e1a3fcf14..94db70e1a9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java @@ -37,10 +37,9 @@ import java.util.Arrays; import java.util.List; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator; -import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.satisfied; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.skipOutOfTimeRangePoints; -import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateResultTsBlockFromAggregators; public class SlidingWindowAggregationOperator implements ProcessOperator { @@ -52,12 +51,18 @@ public class SlidingWindowAggregationOperator implements ProcessOperator { private final List<SlidingWindowAggregator> aggregators; private final ITimeRangeIterator timeRangeIterator; + private final ITimeRangeIterator subTimeRangeIterator; + // current interval of aggregation window [curStartTime, curEndTime) private TimeRange curTimeRange; + // current interval of pre-aggregation window [curStartTime, curEndTime) + private TimeRange curSubTimeRange; private final boolean ascending; - private final TsBlockBuilder tsBlockBuilder; + private final TsBlockBuilder resultTsBlockBuilder; + + private boolean canCallNext = true; public SlidingWindowAggregationOperator( OperatorContext operatorContext, @@ -76,8 +81,9 @@ public class SlidingWindowAggregationOperator implements ProcessOperator { for (Aggregator aggregator : aggregators) { outputDataTypes.addAll(Arrays.asList(aggregator.getOutputType())); } - this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes); + this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, false); + this.subTimeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true); this.ascending = ascending; } @@ -88,6 +94,22 @@ public class SlidingWindowAggregationOperator implements ProcessOperator { @Override public TsBlock next() { + resultTsBlockBuilder.reset(); + while ((curTimeRange != null || timeRangeIterator.hasNextTimeRange()) + && !resultTsBlockBuilder.isFull()) { + if (!calculateNextResult()) { + break; + } + } + + if (resultTsBlockBuilder.getPositionCount() > 0) { + return resultTsBlockBuilder.build(); + } else { + return null; + } + } + + private boolean calculateNextResult() { // Move to next timeRange if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) { curTimeRange = timeRangeIterator.nextTimeRange(); @@ -96,54 +118,66 @@ public class SlidingWindowAggregationOperator implements ProcessOperator { } } - // 1. Calculate aggregation result based on current time window - boolean canCallNext = true; - while (!calcFromTsBlock(cachedTsBlock, curTimeRange)) { - cachedTsBlock = null; - // child.next can only be invoked once - if (child.hasNext() && canCallNext) { - cachedTsBlock = child.next(); - canCallNext = false; - // if child still has next but can't be invoked now - } else if (child.hasNext()) { - return null; - } else { - break; + // Calculate aggregation result based on current time window + while (!isEndCalc()) { + if (cachedTsBlock == null) { + // child.next can only be invoked once + if (child.hasNext()) { + if (canCallNext) { + cachedTsBlock = child.next(); + canCallNext = false; + } else { + // if child still has next but can't be invoked now + return false; + } + } else { + break; + } } + calcFromTsBlock(); } - // 2. Update result using aggregators + // Update result using aggregators + appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator); curTimeRange = null; - return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, timeRangeIterator); + + return true; } - private boolean calcFromTsBlock(TsBlock inputTsBlock, TimeRange timeRange) { - if (inputTsBlock == null || inputTsBlock.isEmpty()) { - return false; + protected boolean isEndCalc() { + if (curSubTimeRange == null && !subTimeRangeIterator.hasNextTimeRange()) { + return true; } - // check if the batchData does not contain points in current interval - if (satisfied(inputTsBlock, timeRange, ascending)) { - // skip points that cannot be calculated - if ((ascending && inputTsBlock.getStartTime() < timeRange.getMin()) - || (!ascending && inputTsBlock.getStartTime() > timeRange.getMax())) { - inputTsBlock = skipOutOfTimeRangePoints(inputTsBlock, timeRange, ascending); - } - int lastReadRowIndex = 0; - for (SlidingWindowAggregator aggregator : aggregators) { - lastReadRowIndex = Math.max(lastReadRowIndex, aggregator.processTsBlock(inputTsBlock)); - } - if (lastReadRowIndex >= inputTsBlock.getPositionCount()) { - inputTsBlock = null; - } else { - inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex); - } + if (curSubTimeRange == null && subTimeRangeIterator.hasNextTimeRange()) { + curSubTimeRange = subTimeRangeIterator.nextTimeRange(); + } + return ascending + ? curSubTimeRange.getMin() >= curTimeRange.getMax() + : curSubTimeRange.getMax() <= curTimeRange.getMin(); + } + + private void calcFromTsBlock() { + if (cachedTsBlock == null || cachedTsBlock.isEmpty()) { + return; + } + + // skip points that cannot be calculated + if ((ascending && cachedTsBlock.getStartTime() < curSubTimeRange.getMin()) + || (!ascending && cachedTsBlock.getStartTime() > curSubTimeRange.getMax())) { + cachedTsBlock = skipOutOfTimeRangePoints(cachedTsBlock, curSubTimeRange, ascending); + } + + int lastReadRowIndex = 0; + for (SlidingWindowAggregator aggregator : aggregators) { + lastReadRowIndex = Math.max(lastReadRowIndex, aggregator.processTsBlock(cachedTsBlock)); + } + if (lastReadRowIndex >= cachedTsBlock.getPositionCount()) { + cachedTsBlock = null; + } else { + cachedTsBlock = cachedTsBlock.subTsBlock(lastReadRowIndex); } - // The result is calculated from the cache - return inputTsBlock != null - && (ascending - ? inputTsBlock.getEndTime() > timeRange.getMax() - : inputTsBlock.getEndTime() < timeRange.getMin()); + curSubTimeRange = null; } @Override
