This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/AggOpMemoryControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8e7c5f6a49f2285e711509d6247884da45db8d28 Author: Minghui Liu <[email protected]> AuthorDate: Wed Aug 10 11:41:04 2022 +0800 memory control for AggregationOperator --- .../execution/operator/process/AggregationOperator.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java index 8cf4aef8b0..92da980d1e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import static com.google.common.util.concurrent.Futures.successfulAsList; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; /** * AggregationOperator can process the situation: aggregation of intermediate aggregate result, it @@ -95,6 +96,22 @@ public class AggregationOperator implements ProcessOperator { return operatorContext; } + @Override + public long calculateMaxPeekMemory() { + long maxPeekMemory = calculateMaxReturnSize(); + long childrenMaxPeekMemory = 0; + for (Operator child : children) { + maxPeekMemory += child.calculateMaxReturnSize(); + childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory()); + } + return Math.max(maxPeekMemory, childrenMaxPeekMemory); + } + + @Override + public long calculateMaxReturnSize() { + return (1L + inputOperatorsCount) * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } + @Override public ListenableFuture<?> isBlocked() { List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
