This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 11b7a5f041ea3eafc19d21369b7dc9aac88b8212
Author: zyk990424 <[email protected]>
AuthorDate: Fri Aug 12 11:34:57 2022 +0800

    add retained size calculation
---
 .../operator/schema/CountMergeOperator.java          |  9 +++++++++
 .../operator/schema/DevicesCountOperator.java        |  5 +++++
 .../schema/LevelTimeSeriesCountOperator.java         |  5 +++++
 .../schema/NodeManageMemoryMergeOperator.java        |  7 ++++++-
 .../operator/schema/NodePathsConvertOperator.java    |  5 +++++
 .../operator/schema/NodePathsCountOperator.java      |  7 ++++++-
 .../operator/schema/NodePathsSchemaScanOperator.java |  5 +++++
 .../operator/schema/SchemaFetchMergeOperator.java    |  9 +++++++++
 .../operator/schema/SchemaFetchScanOperator.java     | 20 +++++++++++---------
 .../operator/schema/SchemaQueryMergeOperator.java    |  9 +++++++++
 .../schema/SchemaQueryOrderByHeatOperator.java       | 14 ++++++++++++++
 .../operator/schema/SchemaQueryScanOperator.java     |  9 ++++++++-
 .../operator/schema/TimeSeriesCountOperator.java     |  5 +++++
 13 files changed, 97 insertions(+), 12 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index f10c28a813..7524b7c455 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -153,4 +153,13 @@ public class CountMergeOperator implements ProcessOperator 
{
 
     return childrenMaxReturnSize;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long retainedSize = 0L;
+    for (Operator child : children) {
+      retainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return retainedSize;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
index 87748916e4..7a22a1573e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
@@ -99,4 +99,9 @@ public class DevicesCountOperator implements SourceOperator {
     // the integer used for count
     return 4L;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
index ff11550d5b..4cc64cff07 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
@@ -127,4 +127,9 @@ public class LevelTimeSeriesCountOperator implements 
SourceOperator {
   public long calculateMaxReturnSize() {
     return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index 4363735bb8..b7688b023a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -136,11 +136,16 @@ public class NodeManageMemoryMergeOperator implements 
ProcessOperator {
   public long calculateMaxPeekMemory() {
     // todo calculate the result based on all the scan node; currently, this 
is shadowed by
     // schemaQueryMergeNode
-    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
 
   @Override
   public long calculateMaxReturnSize() {
     return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
child.calculateMaxReturnSize());
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
index 9be2d3aa72..f240186cdc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
@@ -107,4 +107,9 @@ public class NodePathsConvertOperator implements 
ProcessOperator {
   public long calculateMaxReturnSize() {
     return child.calculateMaxReturnSize();
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index 9e78ef28ed..37b05c4e58 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -104,11 +104,16 @@ public class NodePathsCountOperator implements 
ProcessOperator {
   public long calculateMaxPeekMemory() {
     // todo calculate the result based on all the scan node; currently, this 
is shadowed by
     // schemaQueryMergeNode
-    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
 
   @Override
   public long calculateMaxReturnSize() {
     return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
child.calculateMaxReturnSize());
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
index 54a2d02755..598ed0c953 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
@@ -132,4 +132,9 @@ public class NodePathsSchemaScanOperator implements 
SourceOperator {
   public long calculateMaxReturnSize() {
     return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 74c89789e9..96ff8cbc11 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -142,4 +142,13 @@ public class SchemaFetchMergeOperator implements 
ProcessOperator {
 
     return childrenMaxReturnSize;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long retainedSize = 0L;
+    for (Operator child : children) {
+      retainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return retainedSize;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
index da10e01a2e..4e07c4178c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
@@ -57,7 +57,6 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
 
   private final ISchemaRegion schemaRegion;
 
-  private TsBlock tsBlock;
   private boolean isFinished = false;
 
   public SchemaFetchScanOperator(
@@ -85,12 +84,11 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
     }
     isFinished = true;
     try {
-      fetchSchema();
+      return fetchSchema();
     } catch (MetadataException e) {
       logger.error("Error occurred during execute SchemaFetchOperator {}", 
sourceId, e);
       throw new RuntimeException(e);
     }
-    return tsBlock;
   }
 
   @Override
@@ -108,7 +106,7 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
     return sourceId;
   }
 
-  private void fetchSchema() throws MetadataException {
+  private TsBlock fetchSchema() throws MetadataException {
     ClusterSchemaTree schemaTree = new ClusterSchemaTree();
     List<PartialPath> partialPathList = patternTree.getAllPathPatterns();
     for (PartialPath path : partialPathList) {
@@ -124,11 +122,10 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
     } catch (IOException e) {
       // Totally memory operation. This case won't happen.
     }
-    this.tsBlock =
-        new TsBlock(
-            new TimeColumn(1, new long[] {0}),
-            new BinaryColumn(
-                1, Optional.empty(), new Binary[] {new 
Binary(outputStream.toByteArray())}));
+    return new TsBlock(
+        new TimeColumn(1, new long[] {0}),
+        new BinaryColumn(
+            1, Optional.empty(), new Binary[] {new 
Binary(outputStream.toByteArray())}));
   }
 
   @Override
@@ -140,4 +137,9 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
   public long calculateMaxReturnSize() {
     return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
index aedf1ab0c6..f2671034e8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
@@ -100,4 +100,13 @@ public class SchemaQueryMergeOperator implements 
ProcessOperator {
 
     return childrenMaxReturnSize;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long retainedSize = 0L;
+    for (Operator child : children) {
+      retainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return retainedSize;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index 155bcdb640..778ea8270d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -203,4 +203,18 @@ public class SchemaQueryOrderByHeatOperator implements 
ProcessOperator {
 
     return maxReturnSize;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long retainedSize = 0L;
+
+    for (Operator child : operators) {
+      retainedSize += child.calculateMaxReturnSize();
+    }
+
+    for (Operator child : operators) {
+      retainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return retainedSize;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index bcca707c3e..e9ea46ab5f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -88,7 +88,9 @@ public abstract class SchemaQueryScanOperator implements 
SourceOperator {
   @Override
   public TsBlock next() {
     hasCachedTsBlock = false;
-    return tsBlock;
+    TsBlock result = tsBlock;
+    tsBlock = null;
+    return result;
   }
 
   @Override
@@ -121,4 +123,9 @@ public abstract class SchemaQueryScanOperator implements 
SourceOperator {
   public long calculateMaxReturnSize() {
     return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
index 377071ceda..165cfefa71 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
@@ -115,4 +115,9 @@ public class TimeSeriesCountOperator implements 
SourceOperator {
     // the integer used for count
     return 4L;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }

Reply via email to