This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch MemoryControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aba13789191747901c4e9b011d4ec5c0f374f857 Author: zyk990424 <[email protected]> AuthorDate: Wed Aug 10 18:04:36 2022 +0800 implement memory control for schema operators --- .../operator/schema/CountMergeOperator.java | 20 +++++++++++++++++ .../operator/schema/DevicesCountOperator.java | 12 ++++++++++ .../schema/LevelTimeSeriesCountOperator.java | 12 ++++++++++ .../schema/NodeManageMemoryMergeOperator.java | 13 +++++++++++ .../operator/schema/NodePathsConvertOperator.java | 10 +++++++++ .../operator/schema/NodePathsCountOperator.java | 13 +++++++++++ .../schema/NodePathsSchemaScanOperator.java | 12 ++++++++++ .../operator/schema/SchemaFetchMergeOperator.java | 20 +++++++++++++++++ .../operator/schema/SchemaFetchScanOperator.java | 12 ++++++++++ .../operator/schema/SchemaQueryMergeOperator.java | 20 +++++++++++++++++ .../schema/SchemaQueryOrderByHeatOperator.java | 26 ++++++++++++++++++++++ .../operator/schema/SchemaQueryScanOperator.java | 12 ++++++++++ .../operator/schema/TimeSeriesCountOperator.java | 12 ++++++++++ 13 files changed, 194 insertions(+) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java index a9176d658c..f10c28a813 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java @@ -133,4 +133,24 @@ public class CountMergeOperator implements ProcessOperator { public boolean isFinished() { return isFinished; } + + @Override + public long calculateMaxPeekMemory() { + long childrenMaxPeekMemory = 0; + for (Operator child : children) { + childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory()); + } + + return childrenMaxPeekMemory; + } + + @Override + public long calculateMaxReturnSize() { + long childrenMaxReturnSize = 0; + for (Operator child : children) { + childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize()); + } + + return childrenMaxReturnSize; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java index 28c6cd268e..87748916e4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java @@ -87,4 +87,16 @@ public class DevicesCountOperator implements SourceOperator { public boolean isFinished() { return isFinished; } + + @Override + public long calculateMaxPeekMemory() { + // the integer used for count + return 4L; + } + + @Override + public long calculateMaxReturnSize() { + // the integer used for count + return 4L; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java index 9389035065..ff11550d5b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java @@ -32,6 +32,8 @@ import org.apache.iotdb.tsfile.utils.Binary; import java.util.Map; +import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + public class LevelTimeSeriesCountOperator implements SourceOperator { private final PlanNodeId sourceId; private final OperatorContext operatorContext; @@ -115,4 +117,14 @@ public class LevelTimeSeriesCountOperator implements SourceOperator { public boolean isFinished() { return isFinished; } + + @Override + public long calculateMaxPeekMemory() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } + + @Override + public long calculateMaxReturnSize() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java index f97f299279..4363735bb8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java @@ -37,6 +37,7 @@ import java.util.TreeSet; import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; public class NodeManageMemoryMergeOperator implements ProcessOperator { private final OperatorContext operatorContext; @@ -130,4 +131,16 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator { public boolean isFinished() { return !isReadingMemory && child.isFinished(); } + + @Override + public long calculateMaxPeekMemory() { + // todo calculate the result based on all the scan node; currently, this is shadowed by + // schemaQueryMergeNode + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } + + @Override + public long calculateMaxReturnSize() { + return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java index 2af62ce754..9be2d3aa72 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java @@ -97,4 +97,14 @@ public class NodePathsConvertOperator implements ProcessOperator { public boolean isFinished() { return child.isFinished(); } + + @Override + public long calculateMaxPeekMemory() { + return child.calculateMaxReturnSize() + child.calculateMaxPeekMemory(); + } + + @Override + public long calculateMaxReturnSize() { + return child.calculateMaxReturnSize(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java index 98df95be4a..9e78ef28ed 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java @@ -32,6 +32,7 @@ import java.util.HashSet; import java.util.Set; import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; public class NodePathsCountOperator implements ProcessOperator { @@ -98,4 +99,16 @@ public class NodePathsCountOperator implements ProcessOperator { public boolean isFinished() { return isFinished; } + + @Override + public long calculateMaxPeekMemory() { + // todo calculate the result based on all the scan node; currently, this is shadowed by + // schemaQueryMergeNode + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } + + @Override + public long calculateMaxReturnSize() { + return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java index 2db64f704c..54a2d02755 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java @@ -35,6 +35,8 @@ import org.apache.iotdb.tsfile.utils.Binary; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + public class NodePathsSchemaScanOperator implements SourceOperator { private final PlanNodeId sourceId; @@ -120,4 +122,14 @@ public class NodePathsSchemaScanOperator implements SourceOperator { public PlanNodeId getSourceId() { return sourceId; } + + @Override + public long calculateMaxPeekMemory() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } + + @Override + public long calculateMaxReturnSize() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java index 92b023eabb..74c89789e9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java @@ -122,4 +122,24 @@ public class SchemaFetchMergeOperator implements ProcessOperator { new BinaryColumn( 1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())})); } + + @Override + public long calculateMaxPeekMemory() { + long childrenMaxPeekMemory = 0; + for (Operator child : children) { + childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory()); + } + + return childrenMaxPeekMemory; + } + + @Override + public long calculateMaxReturnSize() { + long childrenMaxReturnSize = 0; + for (Operator child : children) { + childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize()); + } + + return childrenMaxReturnSize; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java index 3a22065879..da10e01a2e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java @@ -44,6 +44,8 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + public class SchemaFetchScanOperator implements SourceOperator { private static final Logger logger = LoggerFactory.getLogger(SchemaFetchScanOperator.class); @@ -128,4 +130,14 @@ public class SchemaFetchScanOperator implements SourceOperator { new BinaryColumn( 1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())})); } + + @Override + public long calculateMaxPeekMemory() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } + + @Override + public long calculateMaxReturnSize() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java index 519cdce27e..aedf1ab0c6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java @@ -80,4 +80,24 @@ public class SchemaQueryMergeOperator implements ProcessOperator { child.close(); } } + + @Override + public long calculateMaxPeekMemory() { + long childrenMaxPeekMemory = 0; + for (Operator child : children) { + childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory()); + } + + return childrenMaxPeekMemory; + } + + @Override + public long calculateMaxReturnSize() { + long childrenMaxReturnSize = 0; + for (Operator child : children) { + childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize()); + } + + return childrenMaxReturnSize; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java index 5b65e10cc8..155bcdb640 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java @@ -177,4 +177,30 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator { public boolean isFinished() { return isFinished; } + + @Override + public long calculateMaxPeekMemory() { + long maxPeekMemory = 0; + + for (Operator child : operators) { + maxPeekMemory += child.calculateMaxReturnSize(); + } + + for (Operator child : operators) { + maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory()); + } + + return maxPeekMemory; + } + + @Override + public long calculateMaxReturnSize() { + long maxReturnSize = 0; + + for (Operator child : operators) { + maxReturnSize += child.calculateMaxReturnSize(); + } + + return maxReturnSize; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java index b318c5e7db..bcca707c3e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java @@ -24,6 +24,8 @@ import org.apache.iotdb.db.mpp.execution.operator.source.SourceOperator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + public abstract class SchemaQueryScanOperator implements SourceOperator { protected OperatorContext operatorContext; @@ -109,4 +111,14 @@ public abstract class SchemaQueryScanOperator implements SourceOperator { public PlanNodeId getSourceId() { return sourceId; } + + @Override + public long calculateMaxPeekMemory() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } + + @Override + public long calculateMaxReturnSize() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java index 0fac3e052a..377071ceda 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java @@ -103,4 +103,16 @@ public class TimeSeriesCountOperator implements SourceOperator { public boolean isFinished() { return isFinished; } + + @Override + public long calculateMaxPeekMemory() { + // the integer used for count + return 4L; + } + + @Override + public long calculateMaxReturnSize() { + // the integer used for count + return 4L; + } }
