This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aba13789191747901c4e9b011d4ec5c0f374f857
Author: zyk990424 <[email protected]>
AuthorDate: Wed Aug 10 18:04:36 2022 +0800

    implement memory control for schema operators
---
 .../operator/schema/CountMergeOperator.java        | 20 +++++++++++++++++
 .../operator/schema/DevicesCountOperator.java      | 12 ++++++++++
 .../schema/LevelTimeSeriesCountOperator.java       | 12 ++++++++++
 .../schema/NodeManageMemoryMergeOperator.java      | 13 +++++++++++
 .../operator/schema/NodePathsConvertOperator.java  | 10 +++++++++
 .../operator/schema/NodePathsCountOperator.java    | 13 +++++++++++
 .../schema/NodePathsSchemaScanOperator.java        | 12 ++++++++++
 .../operator/schema/SchemaFetchMergeOperator.java  | 20 +++++++++++++++++
 .../operator/schema/SchemaFetchScanOperator.java   | 12 ++++++++++
 .../operator/schema/SchemaQueryMergeOperator.java  | 20 +++++++++++++++++
 .../schema/SchemaQueryOrderByHeatOperator.java     | 26 ++++++++++++++++++++++
 .../operator/schema/SchemaQueryScanOperator.java   | 12 ++++++++++
 .../operator/schema/TimeSeriesCountOperator.java   | 12 ++++++++++
 13 files changed, 194 insertions(+)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index a9176d658c..f10c28a813 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -133,4 +133,24 @@ public class CountMergeOperator implements ProcessOperator 
{
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, 
child.calculateMaxPeekMemory());
+    }
+
+    return childrenMaxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long childrenMaxReturnSize = 0;
+    for (Operator child : children) {
+      childrenMaxReturnSize = Math.max(childrenMaxReturnSize, 
child.calculateMaxReturnSize());
+    }
+
+    return childrenMaxReturnSize;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
index 28c6cd268e..87748916e4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
@@ -87,4 +87,16 @@ public class DevicesCountOperator implements SourceOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // the integer used for count
+    return 4L;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // the integer used for count
+    return 4L;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
index 9389035065..ff11550d5b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.tsfile.utils.Binary;
 
 import java.util.Map;
 
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class LevelTimeSeriesCountOperator implements SourceOperator {
   private final PlanNodeId sourceId;
   private final OperatorContext operatorContext;
@@ -115,4 +117,14 @@ public class LevelTimeSeriesCountOperator implements 
SourceOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index f97f299279..4363735bb8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -37,6 +37,7 @@ import java.util.TreeSet;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class NodeManageMemoryMergeOperator implements ProcessOperator {
   private final OperatorContext operatorContext;
@@ -130,4 +131,16 @@ public class NodeManageMemoryMergeOperator implements 
ProcessOperator {
   public boolean isFinished() {
     return !isReadingMemory && child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // todo calculate the result based on all the scan node; currently, this 
is shadowed by
+    // schemaQueryMergeNode
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
child.calculateMaxReturnSize());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
index 2af62ce754..9be2d3aa72 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
@@ -97,4 +97,14 @@ public class NodePathsConvertOperator implements 
ProcessOperator {
   public boolean isFinished() {
     return child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxReturnSize() + child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index 98df95be4a..9e78ef28ed 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class NodePathsCountOperator implements ProcessOperator {
 
@@ -98,4 +99,16 @@ public class NodePathsCountOperator implements 
ProcessOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // todo calculate the result based on all the scan node; currently, this 
is shadowed by
+    // schemaQueryMergeNode
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
child.calculateMaxReturnSize());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
index 2db64f704c..54a2d02755 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class NodePathsSchemaScanOperator implements SourceOperator {
   private final PlanNodeId sourceId;
 
@@ -120,4 +122,14 @@ public class NodePathsSchemaScanOperator implements 
SourceOperator {
   public PlanNodeId getSourceId() {
     return sourceId;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 92b023eabb..74c89789e9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -122,4 +122,24 @@ public class SchemaFetchMergeOperator implements 
ProcessOperator {
         new BinaryColumn(
             1, Optional.empty(), new Binary[] {new 
Binary(outputStream.toByteArray())}));
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, 
child.calculateMaxPeekMemory());
+    }
+
+    return childrenMaxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long childrenMaxReturnSize = 0;
+    for (Operator child : children) {
+      childrenMaxReturnSize = Math.max(childrenMaxReturnSize, 
child.calculateMaxReturnSize());
+    }
+
+    return childrenMaxReturnSize;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
index 3a22065879..da10e01a2e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
@@ -44,6 +44,8 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class SchemaFetchScanOperator implements SourceOperator {
 
   private static final Logger logger = 
LoggerFactory.getLogger(SchemaFetchScanOperator.class);
@@ -128,4 +130,14 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
             new BinaryColumn(
                 1, Optional.empty(), new Binary[] {new 
Binary(outputStream.toByteArray())}));
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
index 519cdce27e..aedf1ab0c6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
@@ -80,4 +80,24 @@ public class SchemaQueryMergeOperator implements 
ProcessOperator {
       child.close();
     }
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, 
child.calculateMaxPeekMemory());
+    }
+
+    return childrenMaxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long childrenMaxReturnSize = 0;
+    for (Operator child : children) {
+      childrenMaxReturnSize = Math.max(childrenMaxReturnSize, 
child.calculateMaxReturnSize());
+    }
+
+    return childrenMaxReturnSize;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index 5b65e10cc8..155bcdb640 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -177,4 +177,30 @@ public class SchemaQueryOrderByHeatOperator implements 
ProcessOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 0;
+
+    for (Operator child : operators) {
+      maxPeekMemory += child.calculateMaxReturnSize();
+    }
+
+    for (Operator child : operators) {
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+    }
+
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long maxReturnSize = 0;
+
+    for (Operator child : operators) {
+      maxReturnSize += child.calculateMaxReturnSize();
+    }
+
+    return maxReturnSize;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index b318c5e7db..bcca707c3e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -24,6 +24,8 @@ import 
org.apache.iotdb.db.mpp.execution.operator.source.SourceOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public abstract class SchemaQueryScanOperator implements SourceOperator {
 
   protected OperatorContext operatorContext;
@@ -109,4 +111,14 @@ public abstract class SchemaQueryScanOperator implements 
SourceOperator {
   public PlanNodeId getSourceId() {
     return sourceId;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
index 0fac3e052a..377071ceda 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
@@ -103,4 +103,16 @@ public class TimeSeriesCountOperator implements 
SourceOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // the integer used for count
+    return 4L;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // the integer used for count
+    return 4L;
+  }
 }

Reply via email to