This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/AggOpMemoryControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c1691b42055fe6429821d10662aa215826e66c39 Author: liuminghui233 <[email protected]> AuthorDate: Sun Aug 14 23:58:32 2022 +0800 finish memory calculate for SingleInputAggregationOperator --- .../process/RawDataAggregationOperator.java | 23 +++--------- .../process/SingleInputAggregationOperator.java | 31 ++++++++++++---- .../process/SlidingWindowAggregationOperator.java | 17 +++------ .../db/mpp/plan/planner/OperatorTreeGenerator.java | 41 ++++++++++++++++++++-- .../operator/RawDataAggregationOperatorTest.java | 5 ++- .../SlidingWindowAggregationOperatorTest.java | 4 ++- 6 files changed, 78 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java index c88345c901..d95968850d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java @@ -20,17 +20,15 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.db.mpp.aggregation.Aggregator; +import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; -import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; -import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.utils.Pair; import java.util.List; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData; -import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; /** * RawDataAggregationOperator is used to process raw data tsBlock input calculating using value @@ -43,13 +41,15 @@ import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEF * <p>Return aggregation result in many time intervals once. */ public class RawDataAggregationOperator extends SingleInputAggregationOperator { + public RawDataAggregationOperator( OperatorContext operatorContext, List<Aggregator> aggregators, + ITimeRangeIterator timeRangeIterator, Operator child, boolean ascending, - GroupByTimeParameter groupByTimeParameter) { - super(operatorContext, aggregators, child, ascending, groupByTimeParameter, true); + long maxReturnSize) { + super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize); } @Override @@ -81,17 +81,4 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator { inputTsBlock = calcResult.getRight(); return calcResult.getLeft(); } - - @Override - public long calculateMaxPeekMemory() { - return calculateMaxReturnSize() + child.calculateMaxReturnSize(); - } - - @Override - public long calculateMaxReturnSize() { - // time + all value columns - return (1L + inputTsBlock.getValueColumnCount()) - * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() - + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; - } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java index c59de13988..1d746f1150 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java @@ -23,7 +23,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator; import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; -import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -37,7 +36,6 @@ import java.util.List; import java.util.concurrent.TimeUnit; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult; -import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator; public abstract class SingleInputAggregationOperator implements ProcessOperator { @@ -57,26 +55,30 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator // using for building result tsBlock protected final TsBlockBuilder resultTsBlockBuilder; + protected final long maxRetainedSize; + protected final long maxReturnSize; + public SingleInputAggregationOperator( OperatorContext operatorContext, List<Aggregator> aggregators, Operator child, boolean ascending, - GroupByTimeParameter groupByTimeParameter, - boolean outputPartialTimeWindow) { + ITimeRangeIterator timeRangeIterator, + long maxReturnSize) { this.operatorContext = operatorContext; this.ascending = ascending; this.child = child; this.aggregators = aggregators; - - this.timeRangeIterator = - initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow); + this.timeRangeIterator = timeRangeIterator; List<TSDataType> dataTypes = new ArrayList<>(); for (Aggregator aggregator : aggregators) { dataTypes.addAll(Arrays.asList(aggregator.getOutputType())); } this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes); + + this.maxRetainedSize = child.calculateMaxReturnSize(); + this.maxReturnSize = maxReturnSize; } @Override @@ -146,4 +148,19 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator curTimeRange = null; appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator); } + + @Override + public long calculateMaxPeekMemory() { + return maxReturnSize + maxRetainedSize + child.calculateRetainedSizeAfterCallingNext(); + } + + @Override + public long calculateMaxReturnSize() { + return maxReturnSize; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java index 9f1050772c..d73491d3e4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java @@ -30,7 +30,6 @@ import java.util.List; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator; -import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; public class SlidingWindowAggregationOperator extends SingleInputAggregationOperator { @@ -41,10 +40,12 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper public SlidingWindowAggregationOperator( OperatorContext operatorContext, List<Aggregator> aggregators, + ITimeRangeIterator timeRangeIterator, Operator child, boolean ascending, - GroupByTimeParameter groupByTimeParameter) { - super(operatorContext, aggregators, child, ascending, groupByTimeParameter, false); + GroupByTimeParameter groupByTimeParameter, + long maxReturnSize) { + super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize); checkArgument( groupByTimeParameter != null, "GroupByTimeParameter cannot be null in SlidingWindowAggregationOperator"); @@ -105,14 +106,4 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper } curSubTimeRange = null; } - - @Override - public long calculateMaxPeekMemory() { - return calculateMaxReturnSize() + child.calculateMaxReturnSize(); - } - - @Override - public long calculateMaxReturnSize() { - return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; - } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index cfb2b42a57..50d65c2bbb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -188,6 +188,7 @@ import java.util.Objects; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSize; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator; import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode; @@ -1088,7 +1089,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP boolean ascending = node.getScanOrder() == Ordering.ASC; List<Aggregator> aggregators = new ArrayList<>(); Map<String, List<InputLocation>> layout = makeLayout(node); - for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) { + List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList(); + for (AggregationDescriptor descriptor : aggregationDescriptors) { List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout); aggregators.add( SlidingWindowAggregatorFactory.createSlidingWindowAggregator( @@ -1102,9 +1104,25 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP descriptor.getStep())); } + GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter(); + ITimeRangeIterator timeRangeIterator = + initTimeRangeIterator(groupByTimeParameter, ascending, false); + long maxReturnSize = + calculateMaxAggregationResultSize( + aggregationDescriptors, + timeRangeIterator, + groupByTimeParameter != null, + context.getTypeProvider()); + context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size()); return new SlidingWindowAggregationOperator( - operatorContext, aggregators, child, ascending, node.getGroupByTimeParameter()); + operatorContext, + aggregators, + timeRangeIterator, + child, + ascending, + groupByTimeParameter, + maxReturnSize); } @Override @@ -1149,6 +1167,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP boolean ascending = node.getScanOrder() == Ordering.ASC; List<Aggregator> aggregators = new ArrayList<>(); Map<String, List<InputLocation>> layout = makeLayout(node); + List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList(); for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) { List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout); aggregators.add( @@ -1174,8 +1193,24 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getPlanNodeId(), RawDataAggregationOperator.class.getSimpleName()); context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size()); + + GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter(); + ITimeRangeIterator timeRangeIterator = + initTimeRangeIterator(groupByTimeParameter, ascending, true); + long maxReturnSize = + calculateMaxAggregationResultSize( + aggregationDescriptors, + timeRangeIterator, + groupByTimeParameter != null, + context.getTypeProvider()); + return new RawDataAggregationOperator( - operatorContext, aggregators, children.get(0), ascending, node.getGroupByTimeParameter()); + operatorContext, + aggregators, + timeRangeIterator, + children.get(0), + ascending, + maxReturnSize); } else { OperatorContext operatorContext = context diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java index 6a9902f24b..f25b0d2fcd 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java @@ -64,6 +64,8 @@ import java.util.concurrent.ExecutorService; import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE; +import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; import static org.junit.Assert.assertEquals; public class RawDataAggregationOperatorTest { @@ -381,8 +383,9 @@ public class RawDataAggregationOperatorTest { return new RawDataAggregationOperator( fragmentInstanceContext.getOperatorContexts().get(3), aggregators, + initTimeRangeIterator(groupByTimeParameter, true, true), timeJoinOperator, true, - groupByTimeParameter); + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES); } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java index 7eebdedbcc..a0ed242e63 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java @@ -258,8 +258,10 @@ public class SlidingWindowAggregationOperatorTest { return new SlidingWindowAggregationOperator( fragmentInstanceContext.getOperatorContexts().get(1), finalAggregators, + initTimeRangeIterator(groupByTimeParameter, ascending, false), seriesAggregationScanOperator, ascending, - groupByTimeParameter); + groupByTimeParameter, + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES); } }
