This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5e9192cdf2041fe8db4af599dcf25973906abf24
Author: liuminghui233 <[email protected]>
AuthorDate: Mon Aug 15 00:16:00 2022 +0800

    finish memory calculate for AggregationOperator
---
 .../db/mpp/execution/operator/AggregationUtil.java |  2 +-
 .../operator/process/AggregationOperator.java      | 36 ++++++++++++----------
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 30 +++++++++++++++---
 .../operator/AggregationOperatorTest.java          |  5 ++-
 4 files changed, 48 insertions(+), 25 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index 1896fc3af4..564f21245e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -177,7 +177,7 @@ public class AggregationUtil {
   }
 
   public static long calculateMaxAggregationResultSize(
-      List<AggregationDescriptor> aggregationDescriptors,
+      List<? extends AggregationDescriptor> aggregationDescriptors,
       ITimeRangeIterator timeRangeIterator,
       boolean isGroupByQuery,
       TypeProvider typeProvider) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 92da980d1e..7efe992cdd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -37,8 +36,6 @@ import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
-import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
-import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 /**
  * AggregationOperator can process the situation: aggregation of intermediate 
aggregate result, it
@@ -63,16 +60,20 @@ public class AggregationOperator implements ProcessOperator 
{
   // using for building result tsBlock
   private final TsBlockBuilder resultTsBlockBuilder;
 
+  private final long maxRetainedSize;
+  private final long childrenRetainedSize;
+  private final long maxReturnSize;
+
   public AggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       List<Operator> children,
-      boolean ascending,
-      GroupByTimeParameter groupByTimeParameter,
-      boolean outputPartialTimeWindow) {
+      long maxReturnSize) {
     this.operatorContext = operatorContext;
     this.children = children;
     this.aggregators = aggregators;
+    this.timeRangeIterator = timeRangeIterator;
 
     this.inputOperatorsCount = children.size();
     this.inputTsBlocks = new TsBlock[inputOperatorsCount];
@@ -81,14 +82,16 @@ public class AggregationOperator implements ProcessOperator 
{
       canCallNext[i] = false;
     }
 
-    this.timeRangeIterator =
-        initTimeRangeIterator(groupByTimeParameter, ascending, 
outputPartialTimeWindow);
-
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+    this.maxRetainedSize = 
children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
+    this.childrenRetainedSize =
+        
children.stream().mapToLong(Operator::calculateRetainedSizeAfterCallingNext).sum();
+    this.maxReturnSize = maxReturnSize;
   }
 
   @Override
@@ -98,18 +101,17 @@ public class AggregationOperator implements 
ProcessOperator {
 
   @Override
   public long calculateMaxPeekMemory() {
-    long maxPeekMemory = calculateMaxReturnSize();
-    long childrenMaxPeekMemory = 0;
-    for (Operator child : children) {
-      maxPeekMemory += child.calculateMaxReturnSize();
-      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, 
child.calculateMaxPeekMemory());
-    }
-    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+    return maxReturnSize + maxRetainedSize + childrenRetainedSize;
   }
 
   @Override
   public long calculateMaxReturnSize() {
-    return (1L + inputOperatorsCount) * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return maxRetainedSize + childrenRetainedSize;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 50d65c2bbb..0978a006f5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1045,7 +1045,8 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
-    for (GroupByLevelDescriptor descriptor : 
node.getGroupByLevelDescriptors()) {
+    List<GroupByLevelDescriptor> aggregationDescriptors = 
node.getGroupByLevelDescriptors();
+    for (GroupByLevelDescriptor descriptor : aggregationDescriptors) {
       List<InputLocation[]> inputLocationList = 
calcInputLocationList(descriptor, layout);
       TSDataType seriesDataType =
           context
@@ -1067,9 +1068,19 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 node.getPlanNodeId(),
                 AggregationOperator.class.getSimpleName());
 
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSize(
+            aggregationDescriptors,
+            timeRangeIterator,
+            groupByTimeParameter != null,
+            context.getTypeProvider());
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
     return new AggregationOperator(
-        operatorContext, aggregators, children, ascending, 
node.getGroupByTimeParameter(), false);
+        operatorContext, aggregators, timeRangeIterator, children, 
maxReturnSize);
   }
 
   @Override
@@ -1183,6 +1194,8 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
               inputLocationList));
     }
     boolean inputRaw = 
node.getAggregationDescriptorList().get(0).getStep().isInputRaw();
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+
     if (inputRaw) {
       checkArgument(children.size() == 1, "rawDataAggregateOperator can only 
accept one input");
       OperatorContext operatorContext =
@@ -1194,7 +1207,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   RawDataAggregationOperator.class.getSimpleName());
       context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
 
-      GroupByTimeParameter groupByTimeParameter = 
node.getGroupByTimeParameter();
       ITimeRangeIterator timeRangeIterator =
           initTimeRangeIterator(groupByTimeParameter, ascending, true);
       long maxReturnSize =
@@ -1219,9 +1231,19 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   AggregationOperator.class.getSimpleName());
+
+      ITimeRangeIterator timeRangeIterator =
+          initTimeRangeIterator(groupByTimeParameter, ascending, true);
+      long maxReturnSize =
+          calculateMaxAggregationResultSize(
+              aggregationDescriptors,
+              timeRangeIterator,
+              groupByTimeParameter != null,
+              context.getTypeProvider());
+
       context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
       return new AggregationOperator(
-          operatorContext, aggregators, children, ascending, 
node.getGroupByTimeParameter(), true);
+          operatorContext, aggregators, timeRangeIterator, children, 
maxReturnSize);
     }
   }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index b3e80f1b82..85e65849a6 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -374,9 +374,8 @@ public class AggregationOperatorTest {
     return new AggregationOperator(
         fragmentInstanceContext.getOperatorContexts().get(2),
         finalAggregators,
+        initTimeRangeIterator(groupByTimeParameter, true, true),
         children,
-        true,
-        groupByTimeParameter,
-        true);
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
   }
 }

Reply via email to