This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch MemoryControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 93cea7db3125ccb0c865f9433856158f3f883e2c Author: JackieTien97 <[email protected]> AuthorDate: Mon Aug 8 20:55:30 2022 +0800 Add memory control for some operators --- .../resources/conf/iotdb-datanode.properties | 6 +-- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 46 ++++++++++++++++------ .../mpp/execution/memory/LocalMemoryManager.java | 8 ++-- .../iotdb/db/mpp/execution/operator/Operator.java | 19 +++++++++ .../execution/operator/process/FillOperator.java | 13 ++++++ .../execution/operator/process/LimitOperator.java | 10 +++++ .../operator/process/LinearFillOperator.java | 14 +++++++ .../execution/operator/process/OffsetOperator.java | 10 +++++ .../execution/operator/process/SortOperator.java | 10 +++++ .../process/join/RowBasedTimeJoinOperator.java | 19 +++++++++ .../operator/process/join/TimeJoinOperator.java | 19 +++++++++ .../process/last/LastQueryCollectOperator.java | 18 +++++++++ .../process/last/LastQueryMergeOperator.java | 30 ++++++++++++++ .../operator/process/last/LastQueryOperator.java | 16 ++++++++ .../process/last/LastQuerySortOperator.java | 17 ++++++++ .../process/last/UpdateLastCacheOperator.java | 10 +++++ .../operator/source/AlignedSeriesScanOperator.java | 21 +++++++++- .../operator/source/ExchangeOperator.java | 12 ++++++ .../operator/source/LastCacheScanOperator.java | 10 +++++ .../operator/source/SeriesScanOperator.java | 18 ++++++++- .../execution/operator/source/SeriesScanUtil.java | 4 +- 21 files changed, 307 insertions(+), 23 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties index 57448902cd..35a0f89d14 100644 --- a/server/src/assembly/resources/conf/iotdb-datanode.properties +++ b/server/src/assembly/resources/conf/iotdb-datanode.properties @@ -637,9 +637,9 @@ timestamp_precision=ms # Datatype: boolean # meta_data_cache_enable=true -# Read memory Allocation Ratio: BloomFilterCache, ChunkCache, TimeSeriesMetadataCache, memory used for constructing QueryDataSet and Free Memory Used in Query. -# The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for example: 1:1:1:1:1 , 1:100:200:300:400 -# chunk_timeseriesmeta_free_memory_proportion=1:100:200:300:400 +# Read memory Allocation Ratio: BloomFilterCache : ChunkCache : TimeSeriesMetadataCache : Coordinator : Operators : DataExchange. +# The parameter form is a:b:c:d:e:f, where a, b, c, d, e and f are integers. for example: 1:1:1:1:1:1 , 1:100:200:50:300:350 +# chunk_timeseriesmeta_free_memory_proportion=1:100:200:50:300:350 #################### ### LAST Cache Configuration diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index cb2583e4d9..17234c37be 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -130,10 +130,7 @@ public class IoTDBConfig { private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10; /** Memory allocated for the mtree */ - private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() * 1 / 10; - - /** Memory allocated for the read process besides cache */ - private long allocateMemoryForReadWithoutCache = allocateMemoryForRead * 300 / 1001; + private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() / 10; private volatile int maxQueryDeduplicatedPathNum = 1000; @@ -474,6 +471,15 @@ public class IoTDBConfig { /** Memory allocated for chunk cache in read process */ private long allocateMemoryForChunkCache = allocateMemoryForRead * 100 / 1001; + /** Memory allocated for operators */ + private long allocateMemoryForCoordinator = allocateMemoryForRead * 50 / 1001; + + /** Memory allocated for operators */ + private long allocateMemoryForOperators = allocateMemoryForRead * 300 / 1001; + + /** Memory allocated for operators */ + private long allocateMemoryForDataExchange = allocateMemoryForRead * 350 / 1001; + /** Whether to enable Last cache */ private boolean lastCacheEnable = true; @@ -1715,14 +1721,6 @@ public class IoTDBConfig { this.allocateMemoryForRead = allocateMemoryForRead; } - public long getAllocateMemoryForReadWithoutCache() { - return allocateMemoryForReadWithoutCache; - } - - public void setAllocateMemoryForReadWithoutCache(long allocateMemoryForReadWithoutCache) { - this.allocateMemoryForReadWithoutCache = allocateMemoryForReadWithoutCache; - } - public boolean isEnableExternalSort() { return enableExternalSort; } @@ -1934,6 +1932,30 @@ public class IoTDBConfig { this.allocateMemoryForChunkCache = allocateMemoryForChunkCache; } + public long getAllocateMemoryForCoordinator() { + return allocateMemoryForCoordinator; + } + + public void setAllocateMemoryForCoordinator(long allocateMemoryForCoordinator) { + this.allocateMemoryForCoordinator = allocateMemoryForCoordinator; + } + + public long getAllocateMemoryForOperators() { + return allocateMemoryForOperators; + } + + public void setAllocateMemoryForOperators(long allocateMemoryForOperators) { + this.allocateMemoryForOperators = allocateMemoryForOperators; + } + + public long getAllocateMemoryForDataExchange() { + return allocateMemoryForDataExchange; + } + + public void setAllocateMemoryForDataExchange(long allocateMemoryForDataExchange) { + this.allocateMemoryForDataExchange = allocateMemoryForDataExchange; + } + public boolean isLastCacheEnabled() { return lastCacheEnable; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java index d0eff60394..5c6f3c5659 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java @@ -30,11 +30,9 @@ public class LocalMemoryManager { private final MemoryPool queryPool; public LocalMemoryManager() { - queryPool = - new MemoryPool( - "query", - IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead(), - (long) (IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead() * 0.5)); + long totalMemory = IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForDataExchange(); + int maxQueryThread = IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(); + queryPool = new MemoryPool("query", totalMemory, totalMemory / maxQueryThread); } public MemoryPool getQueryPool() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java index dfa08e033f..7d8765eaa3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java @@ -52,4 +52,23 @@ public interface Operator extends AutoCloseable { * Is this operator completely finished processing and no more output TsBlock will be produced. */ boolean isFinished(); + + // TODO remove the default while completing all the operators + /** + * We should also consider the memory used by its children operator, so the calculation logic may + * be like: long estimatedOfCurrentOperator = XXXXX; return max(estimatedOfCurrentOperator, + * child1.calculateMaxPeekMemory(), child2.calculateMaxPeekMemory(), ....) + * + * @return estimated max memory footprint that the Operator Tree(rooted from this operator) will + * use while doing its own query processing + */ + default long calculateMaxPeekMemory() { + return 0L; + } + + // TODO remove the default while completing all the operators + /** @return estimated max memory footprint for returned TsBlock when calling operator.next() */ + default long calculateMaxReturnSize() { + return 0L; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java index da7ffe6ab1..94fa08fcd9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java @@ -91,4 +91,17 @@ public class FillOperator implements ProcessOperator { public boolean isFinished() { return child.isFinished(); } + + @Override + public long calculateMaxPeekMemory() { + // while doing constant and previous fill, we may need to copy the corresponding column if there + // exists null values + // so the max peek memory may be double + return 2 * child.calculateMaxPeekMemory(); + } + + @Override + public long calculateMaxReturnSize() { + return child.calculateMaxReturnSize(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java index ef3870fb17..4c5d608902 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java @@ -80,4 +80,14 @@ public class LimitOperator implements ProcessOperator { public boolean isFinished() { return remainingLimit == 0 || child.isFinished(); } + + @Override + public long calculateMaxPeekMemory() { + return child.calculateMaxPeekMemory(); + } + + @Override + public long calculateMaxReturnSize() { + return child.calculateMaxReturnSize(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java index fcae23ce7c..ee69557074 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java @@ -160,6 +160,20 @@ public class LinearFillOperator implements ProcessOperator { return cachedTsBlock.isEmpty() && child.isFinished(); } + @Override + public long calculateMaxPeekMemory() { + // while doing linear fill, we may need to copy the corresponding column if there exists null + // values, and we may also need to cache next TsBlock to get next not null value + // so the max peek memory may be triple or more, here we just use 3 as the estimated factor + // because in most cases, we will get next not null value in next TsBlock + return 3 * child.calculateMaxPeekMemory(); + } + + @Override + public long calculateMaxReturnSize() { + return child.calculateMaxReturnSize(); + } + /** * Judge whether we can use current cached TsBlock to fill Column * diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java index 2820992745..a329b25346 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java @@ -79,4 +79,14 @@ public class OffsetOperator implements ProcessOperator { public boolean isFinished() { return child.isFinished(); } + + @Override + public long calculateMaxPeekMemory() { + return child.calculateMaxPeekMemory(); + } + + @Override + public long calculateMaxReturnSize() { + return child.calculateMaxReturnSize(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java index 38bda24bf7..a7cd8d046a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java @@ -54,4 +54,14 @@ public class SortOperator implements ProcessOperator { public boolean isFinished() { return false; } + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java index f026ef35c4..386f840d6c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerge import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.db.utils.datastructure.TimeSelector; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; @@ -244,6 +245,24 @@ public class RowBasedTimeJoinOperator implements ProcessOperator { return finished; } + @Override + public long calculateMaxPeekMemory() { + long maxPeekMemory = calculateMaxReturnSize(); + long childrenMaxPeekMemory = 0; + for (Operator child : children) { + maxPeekMemory += child.calculateMaxReturnSize(); + childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory()); + } + return Math.max(maxPeekMemory, childrenMaxPeekMemory); + } + + @Override + public long calculateMaxReturnSize() { + // time + all value columns + return (1L + outputColumnCount) + * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); + } + private void updateTimeSelector(int index) { timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index])); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java index eb185e09ff..bd5e005acd 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerge import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.db.utils.datastructure.TimeSelector; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; @@ -243,6 +244,24 @@ public class TimeJoinOperator implements ProcessOperator { return finished; } + @Override + public long calculateMaxPeekMemory() { + long maxPeekMemory = calculateMaxReturnSize(); + long childrenMaxPeekMemory = 0; + for (Operator child : children) { + maxPeekMemory += child.calculateMaxReturnSize(); + childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory()); + } + return Math.max(maxPeekMemory, childrenMaxPeekMemory); + } + + @Override + public long calculateMaxReturnSize() { + // time + all value columns + return (1L + outputColumnCount) + * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); + } + /** * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else * return false; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java index 7f011c32cf..f679bd1769 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java @@ -85,4 +85,22 @@ public class LastQueryCollectOperator implements ProcessOperator { public boolean isFinished() { return !hasNext(); } + + @Override + public long calculateMaxPeekMemory() { + long maxPeekMemory = 0; + for (Operator child : children) { + maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory()); + } + return maxPeekMemory; + } + + @Override + public long calculateMaxReturnSize() { + long maxReturnMemory = 0; + for (Operator child : children) { + maxReturnMemory = Math.max(maxReturnMemory, child.calculateMaxReturnSize()); + } + return maxReturnMemory; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java index 99deebfd59..9d8be1e23d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java @@ -21,11 +21,13 @@ package org.apache.iotdb.db.mpp.execution.operator.process.last; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import org.apache.iotdb.tsfile.utils.Binary; import com.google.common.util.concurrent.ListenableFuture; +import org.openjdk.jol.info.ClassLayout; import java.util.ArrayList; import java.util.Comparator; @@ -40,6 +42,8 @@ import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryU // time-series public class LastQueryMergeOperator implements ProcessOperator { + private static final int MAP_NODE_RETRAINED_SIZE = 16; + private final OperatorContext operatorContext; private final List<Operator> children; @@ -219,6 +223,30 @@ public class LastQueryMergeOperator implements ProcessOperator { return finished; } + @Override + public long calculateMaxPeekMemory() { + // result size + cached TreeMap size + long maxPeekMemory = + calculateMaxReturnSize() + + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber() + * (Location.INSTANCE_SIZE + MAP_NODE_RETRAINED_SIZE); + long childrenMaxPeekMemory = 0; + for (Operator child : children) { + maxPeekMemory += child.calculateMaxReturnSize(); + childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory()); + } + return Math.max(maxPeekMemory, childrenMaxPeekMemory); + } + + @Override + public long calculateMaxReturnSize() { + long maxReturnSize = 0; + for (Operator child : children) { + maxReturnSize = Math.max(maxReturnSize, child.calculateMaxReturnSize()); + } + return maxReturnSize; + } + /** * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else * return false; @@ -241,6 +269,8 @@ public class LastQueryMergeOperator implements ProcessOperator { } private static class Location { + + private static final long INSTANCE_SIZE = ClassLayout.parseClass(Location.class).instanceSize(); int tsBlockIndex; int rowIndex; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java index 57d391d5df..40e434ef54 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import static com.google.common.util.concurrent.Futures.successfulAsList; +import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; // collect all last query result in the same data region and there is no order guarantee public class LastQueryOperator implements ProcessOperator { @@ -140,4 +141,19 @@ public class LastQueryOperator implements ProcessOperator { private int getEndIndex() { return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex); } + + @Override + public long calculateMaxPeekMemory() { + long maxPeekMemory = + Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes()); + for (Operator child : children) { + maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory()); + } + return maxPeekMemory; + } + + @Override + public long calculateMaxReturnSize() { + return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java index 2945856933..1b56d844c1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import static com.google.common.util.concurrent.Futures.successfulAsList; import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil.compareTimeSeries; +import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; // collect all last query result in the same data region and sort them according to the // time-series's alphabetical order @@ -190,6 +191,22 @@ public class LastQuerySortOperator implements ProcessOperator { return !hasNext(); } + @Override + public long calculateMaxPeekMemory() { + long maxPeekMemory = + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlockBuilder.getRetainedSizeInBytes(); + long maxChildrenReturnSize = 0; + for (Operator child : children) { + maxChildrenReturnSize = Math.max(maxChildrenReturnSize, child.calculateMaxReturnSize()); + } + return maxPeekMemory; + } + + @Override + public long calculateMaxReturnSize() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } + private int getEndIndex() { return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java index 93a5d81cc3..afd543b8dc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java @@ -131,4 +131,14 @@ public class UpdateLastCacheOperator implements ProcessOperator { public void close() throws Exception { child.close(); } + + @Override + public long calculateMaxPeekMemory() { + return child.calculateMaxPeekMemory(); + } + + @Override + public long calculateMaxReturnSize() { + return child.calculateMaxReturnSize(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java index c47ab9f95d..5b4f07cf24 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.filter.basic.Filter; @@ -37,6 +38,8 @@ public class AlignedSeriesScanOperator implements DataSourceOperator { private boolean hasCachedTsBlock = false; private boolean finished = false; + private final long maxReturnSize; + public AlignedSeriesScanOperator( PlanNodeId sourceId, AlignedPath seriesPath, @@ -54,6 +57,10 @@ public class AlignedSeriesScanOperator implements DataSourceOperator { timeFilter, valueFilter, ascending); + // time + all value columns + this.maxReturnSize = + (1L + seriesPath.getMeasurementList().size()) + * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); } @Override @@ -65,7 +72,9 @@ public class AlignedSeriesScanOperator implements DataSourceOperator { public TsBlock next() { if (hasCachedTsBlock || hasNext()) { hasCachedTsBlock = false; - return tsBlock; + TsBlock res = tsBlock; + tsBlock = null; + return res; } throw new IllegalStateException("no next batch"); } @@ -114,6 +123,16 @@ public class AlignedSeriesScanOperator implements DataSourceOperator { return finished || (finished = !hasNext()); } + @Override + public long calculateMaxPeekMemory() { + return maxReturnSize; + } + + @Override + public long calculateMaxReturnSize() { + return maxReturnSize; + } + private boolean readChunkData() throws IOException { while (seriesScanUtil.hasNextChunk()) { if (readPageData()) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java index c72063caeb..d9202de3f2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java @@ -25,6 +25,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock; import com.google.common.util.concurrent.ListenableFuture; +import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + public class ExchangeOperator implements SourceOperator { private final OperatorContext operatorContext; @@ -62,6 +64,16 @@ public class ExchangeOperator implements SourceOperator { return sourceHandle.isFinished(); } + @Override + public long calculateMaxPeekMemory() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } + + @Override + public long calculateMaxReturnSize() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } + @Override public PlanNodeId getSourceId() { return sourceId; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java index dfb6d82c5c..6e5ff45cfa 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java @@ -57,6 +57,16 @@ public class LastCacheScanOperator implements SourceOperator { return !hasNext(); } + @Override + public long calculateMaxPeekMemory() { + return tsBlock.getRetainedSizeInBytes(); + } + + @Override + public long calculateMaxReturnSize() { + return tsBlock.getRetainedSizeInBytes(); + } + @Override public PlanNodeId getSourceId() { return sourceId; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java index f74d14a2f6..5a58594940 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.filter.basic.Filter; @@ -38,6 +39,8 @@ public class SeriesScanOperator implements DataSourceOperator { private boolean hasCachedTsBlock = false; private boolean finished = false; + private final long maxReturnSize; + public SeriesScanOperator( PlanNodeId sourceId, PartialPath seriesPath, @@ -58,6 +61,7 @@ public class SeriesScanOperator implements DataSourceOperator { timeFilter, valueFilter, ascending); + this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); } @Override @@ -69,7 +73,9 @@ public class SeriesScanOperator implements DataSourceOperator { public TsBlock next() { if (hasCachedTsBlock || hasNext()) { hasCachedTsBlock = false; - return tsBlock; + TsBlock res = tsBlock; + tsBlock = null; + return res; } throw new IllegalStateException("no next batch"); } @@ -118,6 +124,16 @@ public class SeriesScanOperator implements DataSourceOperator { return finished || (finished = !hasNext()); } + @Override + public long calculateMaxPeekMemory() { + return maxReturnSize; + } + + @Override + public long calculateMaxReturnSize() { + return maxReturnSize; + } + private boolean readChunkData() throws IOException { while (seriesScanUtil.hasNextChunk()) { if (readPageData()) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java index af6b72e47a..22fd2cbabc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java @@ -618,7 +618,9 @@ public class SeriesScanUtil { if (hasCachedNextOverlappedPage) { hasCachedNextOverlappedPage = false; - return cachedTsBlock; + TsBlock res = cachedTsBlock; + cachedTsBlock = null; + return res; } else { /*
