This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/aggrOpRefactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2270436878714108ba9be6c52ed26ee90608451d
Author: Minghui Liu <[email protected]>
AuthorDate: Wed Jul 6 23:11:49 2022 +0800

    refactor SlidingWindowAggregationOperator to batch process
---
 .../slidingwindow/SlidingWindowAggregator.java     |   2 +-
 .../db/mpp/execution/operator/AggregationUtil.java |   1 +
 .../process/SlidingWindowAggregationOperator.java  | 120 +++++++++++++--------
 3 files changed, 79 insertions(+), 44 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
index aed468c8d8..ec848b354d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
@@ -66,7 +66,7 @@ public abstract class SlidingWindowAggregator extends 
Aggregator {
       valueColumn[i] = tsBlock.getColumn(inputLocation.getValueColumnIndex());
     }
     processPartialResult(new PartialAggregationResult(timeColumn, 
valueColumn));
-    return tsBlock.getPositionCount();
+    return 1;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index 64b80d0bfe..f13552c005 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -118,6 +118,7 @@ public class AggregationUtil {
     return tsBlock.subTsBlock(left);
   }
 
+  // check if the batchData does not contain points in current interval
   public static boolean satisfied(TsBlock tsBlock, TimeRange timeRange, 
boolean ascending) {
     TsBlock.TsBlockSingleColumnIterator tsBlockIterator = 
tsBlock.getTsBlockSingleColumnIterator();
     if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 9e1a3fcf14..94db70e1a9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -37,10 +37,9 @@ import java.util.Arrays;
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
-import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.satisfied;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.skipOutOfTimeRangePoints;
-import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateResultTsBlockFromAggregators;
 
 public class SlidingWindowAggregationOperator implements ProcessOperator {
 
@@ -52,12 +51,18 @@ public class SlidingWindowAggregationOperator implements 
ProcessOperator {
   private final List<SlidingWindowAggregator> aggregators;
 
   private final ITimeRangeIterator timeRangeIterator;
+  private final ITimeRangeIterator subTimeRangeIterator;
+
   // current interval of aggregation window [curStartTime, curEndTime)
   private TimeRange curTimeRange;
+  // current interval of pre-aggregation window [curStartTime, curEndTime)
+  private TimeRange curSubTimeRange;
 
   private final boolean ascending;
 
-  private final TsBlockBuilder tsBlockBuilder;
+  private final TsBlockBuilder resultTsBlockBuilder;
+
+  private boolean canCallNext = true;
 
   public SlidingWindowAggregationOperator(
       OperatorContext operatorContext,
@@ -76,8 +81,9 @@ public class SlidingWindowAggregationOperator implements 
ProcessOperator {
     for (Aggregator aggregator : aggregators) {
       outputDataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
-    this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+    this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
     this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, 
ascending, false);
+    this.subTimeRangeIterator = initTimeRangeIterator(groupByTimeParameter, 
ascending, true);
     this.ascending = ascending;
   }
 
@@ -88,6 +94,22 @@ public class SlidingWindowAggregationOperator implements 
ProcessOperator {
 
   @Override
   public TsBlock next() {
+    resultTsBlockBuilder.reset();
+    while ((curTimeRange != null || timeRangeIterator.hasNextTimeRange())
+        && !resultTsBlockBuilder.isFull()) {
+      if (!calculateNextResult()) {
+        break;
+      }
+    }
+
+    if (resultTsBlockBuilder.getPositionCount() > 0) {
+      return resultTsBlockBuilder.build();
+    } else {
+      return null;
+    }
+  }
+
+  private boolean calculateNextResult() {
     // Move to next timeRange
     if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
       curTimeRange = timeRangeIterator.nextTimeRange();
@@ -96,54 +118,66 @@ public class SlidingWindowAggregationOperator implements 
ProcessOperator {
       }
     }
 
-    // 1. Calculate aggregation result based on current time window
-    boolean canCallNext = true;
-    while (!calcFromTsBlock(cachedTsBlock, curTimeRange)) {
-      cachedTsBlock = null;
-      // child.next can only be invoked once
-      if (child.hasNext() && canCallNext) {
-        cachedTsBlock = child.next();
-        canCallNext = false;
-        // if child still has next but can't be invoked now
-      } else if (child.hasNext()) {
-        return null;
-      } else {
-        break;
+    // Calculate aggregation result based on current time window
+    while (!isEndCalc()) {
+      if (cachedTsBlock == null) {
+        // child.next can only be invoked once
+        if (child.hasNext()) {
+          if (canCallNext) {
+            cachedTsBlock = child.next();
+            canCallNext = false;
+          } else {
+            // if child still has next but can't be invoked now
+            return false;
+          }
+        } else {
+          break;
+        }
       }
+      calcFromTsBlock();
     }
 
-    // 2. Update result using aggregators
+    // Update result using aggregators
+    appendAggregationResult(resultTsBlockBuilder, aggregators, 
timeRangeIterator);
     curTimeRange = null;
-    return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, 
timeRangeIterator);
+
+    return true;
   }
 
-  private boolean calcFromTsBlock(TsBlock inputTsBlock, TimeRange timeRange) {
-    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
-      return false;
+  protected boolean isEndCalc() {
+    if (curSubTimeRange == null && !subTimeRangeIterator.hasNextTimeRange()) {
+      return true;
     }
-    // check if the batchData does not contain points in current interval
-    if (satisfied(inputTsBlock, timeRange, ascending)) {
-      // skip points that cannot be calculated
-      if ((ascending && inputTsBlock.getStartTime() < timeRange.getMin())
-          || (!ascending && inputTsBlock.getStartTime() > timeRange.getMax())) 
{
-        inputTsBlock = skipOutOfTimeRangePoints(inputTsBlock, timeRange, 
ascending);
-      }
 
-      int lastReadRowIndex = 0;
-      for (SlidingWindowAggregator aggregator : aggregators) {
-        lastReadRowIndex = Math.max(lastReadRowIndex, 
aggregator.processTsBlock(inputTsBlock));
-      }
-      if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
-        inputTsBlock = null;
-      } else {
-        inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex);
-      }
+    if (curSubTimeRange == null && subTimeRangeIterator.hasNextTimeRange()) {
+      curSubTimeRange = subTimeRangeIterator.nextTimeRange();
+    }
+    return ascending
+        ? curSubTimeRange.getMin() >= curTimeRange.getMax()
+        : curSubTimeRange.getMax() <= curTimeRange.getMin();
+  }
+
+  private void calcFromTsBlock() {
+    if (cachedTsBlock == null || cachedTsBlock.isEmpty()) {
+      return;
+    }
+
+    // skip points that cannot be calculated
+    if ((ascending && cachedTsBlock.getStartTime() < curSubTimeRange.getMin())
+        || (!ascending && cachedTsBlock.getStartTime() > 
curSubTimeRange.getMax())) {
+      cachedTsBlock = skipOutOfTimeRangePoints(cachedTsBlock, curSubTimeRange, 
ascending);
+    }
+
+    int lastReadRowIndex = 0;
+    for (SlidingWindowAggregator aggregator : aggregators) {
+      lastReadRowIndex = Math.max(lastReadRowIndex, 
aggregator.processTsBlock(cachedTsBlock));
+    }
+    if (lastReadRowIndex >= cachedTsBlock.getPositionCount()) {
+      cachedTsBlock = null;
+    } else {
+      cachedTsBlock = cachedTsBlock.subTsBlock(lastReadRowIndex);
     }
-    // The result is calculated from the cache
-    return inputTsBlock != null
-        && (ascending
-            ? inputTsBlock.getEndTime() > timeRange.getMax()
-            : inputTsBlock.getEndTime() < timeRange.getMin());
+    curSubTimeRange = null;
   }
 
   @Override

Reply via email to