This is an automated email from the ASF dual-hosted git repository.

spricoder pushed a commit to branch feature/memory_auto
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ca8687b48e068225fe9ae8956f7db2f0073aa93c
Author: spricoder <[email protected]>
AuthorDate: Fri Feb 21 15:37:02 2025 +0800

    add simple auto
---
 .../db/consensus/DataRegionConsensusImpl.java      |   2 +-
 .../db/pipe/resource/memory/PipeMemoryManager.java |   7 +-
 .../queryengine/execution/memory/MemoryPool.java   |   2 +-
 .../analyze/cache/partition/PartitionCache.java    |   2 +-
 .../cache/schema/DataNodeDevicePathCache.java      |   2 +-
 .../plan/planner/LocalExecutionPlanner.java        |   2 +-
 .../fetcher/cache/TableDeviceSchemaCache.java      |   2 +-
 .../rescon/MemSchemaEngineStatistics.java          |   2 +-
 .../db/storageengine/buffer/BloomFilterCache.java  |   2 +-
 .../iotdb/db/storageengine/buffer/ChunkCache.java  |   4 +-
 .../buffer/TimeSeriesMetadataCache.java            |   2 +-
 .../rescon/memory/PrimitiveArrayManager.java       |   2 +-
 .../db/storageengine/rescon/memory/SystemInfo.java |   6 +-
 .../rescon/memory/TimePartitionManager.java        |   2 +-
 .../rescon/memory/TsFileResourceManager.java       |   2 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../apache/iotdb/commons/memory/IMemoryBlock.java  |   4 +
 .../iotdb/commons/memory/MemoryBlockType.java      |   8 +-
 .../apache/iotdb/commons/memory/MemoryManager.java |  83 ++++++++++++++-
 .../memory/MemoryPeriodicalJobExecutor.java        | 111 +++++++++++++++++++++
 .../iotdb/commons/memory/MemoryManagerTest.java    |  20 ++--
 21 files changed, 230 insertions(+), 38 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 84466cc7598..5c7f5ad7242 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -113,7 +113,7 @@ public class DataRegionConsensusImpl {
 
     private static ConsensusConfig buildConsensusConfig() {
       IMemoryBlock memoryBlock =
-          CONF.getConsensusMemoryManager().forceAllocate("Consensus", 
MemoryBlockType.FUNCTION);
+          CONF.getConsensusMemoryManager().forceAllocate("Consensus", 
MemoryBlockType.DYNAMIC);
       return ConsensusConfig.newBuilder()
           .setThisNodeId(CONF.getDataNodeId())
           .setThisNode(new TEndPoint(CONF.getInternalAddress(), 
CONF.getDataRegionConsensusPort()))
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index f60606cc83b..1e441a7e8d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -56,7 +56,7 @@ public class PipeMemoryManager {
       IoTDBDescriptor.getInstance()
           .getConfig()
           .getPipeMemoryManager()
-          .forceAllocate("Stream", MemoryBlockType.FUNCTION);
+          .forceAllocate("Stream", MemoryBlockType.DYNAMIC);
 
   private static final double EXCEED_PROTECT_THRESHOLD = 0.95;
 
@@ -388,8 +388,9 @@ public class PipeMemoryManager {
       return new PipeMemoryBlock(sizeInBytes);
     }
 
-    if (sizeInBytes == 0 || memoryBlock.getTotalMemorySizeInBytes() - 
memoryBlock.getUsedMemoryInBytes()
-        >= sizeInBytes) {
+    if (sizeInBytes == 0
+        || memoryBlock.getTotalMemorySizeInBytes() - 
memoryBlock.getUsedMemoryInBytes()
+            >= sizeInBytes) {
       return registerMemoryBlock(sizeInBytes);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index 9323519e664..1ea742dbbf7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -127,7 +127,7 @@ public class MemoryPool {
   public MemoryPool(String id, MemoryManager memoryManager, long 
maxBytesPerFragmentInstance) {
     this.id = Validate.notNull(id, "id can not be null.");
     this.memoryBlock =
-        memoryManager.forceAllocate(memoryManager.getName(), 
MemoryBlockType.FUNCTION);
+        memoryManager.forceAllocate(memoryManager.getName(), 
MemoryBlockType.DYNAMIC);
     Validate.isTrue(
         this.memoryBlock.getTotalMemorySizeInBytes() > 0L,
         "max bytes should be greater than zero: %d",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index 88acac5e701..143d92e62fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -121,7 +121,7 @@ public class PartitionCache {
     this.memoryBlock =
         config
             .getPartitionCacheMemoryManager()
-            .forceAllocate("PartitionCache", MemoryBlockType.FUNCTION);
+            .forceAllocate("PartitionCache", MemoryBlockType.STATIC);
     this.memoryBlock.allocate(this.memoryBlock.getTotalMemorySizeInBytes());
     // TODO @spricoder: PartitionCache need to be controlled according to 
memory
     this.schemaPartitionCache =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
index 269dc34ad88..fca17a11430 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
@@ -45,7 +45,7 @@ public class DataNodeDevicePathCache {
     devicePathCacheMemoryBlock =
         config
             .getDevicePathCacheMemoryManager()
-            .forceAllocate("DevicePathCache", MemoryBlockType.PERFORMANCE);
+            .forceAllocate("DevicePathCache", MemoryBlockType.STATIC);
     // TODO @spricoder: later we can find a way to get the byte size of cache
     
devicePathCacheMemoryBlock.allocate(devicePathCacheMemoryBlock.getTotalMemorySizeInBytes());
     devicePathCache =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index b3dd13b40ad..93f45bebb5e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -69,7 +69,7 @@ public class LocalExecutionPlanner {
     IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
 
     OPERATORS_MEMORY_BLOCK =
-        CONFIG.getOperatorsMemoryManager().forceAllocate("Operators", 
MemoryBlockType.FUNCTION);
+        CONFIG.getOperatorsMemoryManager().forceAllocate("Operators", 
MemoryBlockType.DYNAMIC);
     MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD =
         (long)
             ((OPERATORS_MEMORY_BLOCK.getTotalMemorySizeInBytes())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
index 9004c43ef5d..604208b1edc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
@@ -108,7 +108,7 @@ public class TableDeviceSchemaCache {
     memoryBlock =
         config
             .getSchemaCacheMemoryManager()
-            .forceAllocate("TableDeviceSchemaCache", MemoryBlockType.FUNCTION);
+            .forceAllocate("TableDeviceSchemaCache", MemoryBlockType.STATIC);
     dualKeyCache =
         new DualKeyCacheBuilder<TableId, IDeviceID, TableDeviceCacheEntry>()
             .cacheEvictionPolicy(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaEngineStatistics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaEngineStatistics.java
index f34e8a6f898..51d6ad337a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaEngineStatistics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaEngineStatistics.java
@@ -59,7 +59,7 @@ public class MemSchemaEngineStatistics implements 
ISchemaEngineStatistics {
         IoTDBDescriptor.getInstance()
             .getConfig()
             .getSchemaRegionMemoryManager()
-            .forceAllocate("SchemaRegion", MemoryBlockType.FUNCTION);
+            .forceAllocate("SchemaRegion", MemoryBlockType.DYNAMIC);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
index 5a396b85a4c..be2a48c19aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
@@ -60,7 +60,7 @@ public class BloomFilterCache {
     CACHE_MEMORY_BLOCK =
         CONFIG
             .getBloomFilterCacheMemoryManager()
-            .forceAllocate("BloomFilterCache", MemoryBlockType.PERFORMANCE);
+            .forceAllocate("BloomFilterCache", MemoryBlockType.STATIC);
     // TODO @spricoder: find a way to get the size of the BloomFilterCache
     
CACHE_MEMORY_BLOCK.allocate(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
index dc0ada9f8ea..d964a635b8e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
@@ -73,9 +73,7 @@ public class ChunkCache {
 
   static {
     CACHE_MEMORY_BLOCK =
-        CONFIG
-            .getChunkCacheMemoryManager()
-            .forceAllocate("ChunkCache", MemoryBlockType.PERFORMANCE);
+        CONFIG.getChunkCacheMemoryManager().forceAllocate("ChunkCache", 
MemoryBlockType.STATIC);
     // TODO @spricoder: find a way to get the size of the ChunkCache
     
CACHE_MEMORY_BLOCK.allocate(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
index 2b02bd300c6..3ba8e8a4af3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
@@ -87,7 +87,7 @@ public class TimeSeriesMetadataCache {
     CACHE_MEMORY_BLOCK =
         config
             .getTimeSeriesMetaDataCacheMemoryManager()
-            .forceAllocate("TimeSeriesMetadataCache", 
MemoryBlockType.PERFORMANCE);
+            .forceAllocate("TimeSeriesMetadataCache", MemoryBlockType.STATIC);
     // TODO @spricoder find a better way to get the size of cache
     
CACHE_MEMORY_BLOCK.allocate(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
index 9e198a1f601..1b9b523dd12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
@@ -55,7 +55,7 @@ public class PrimitiveArrayManager {
   private static final IMemoryBlock POOLED_ARRAYS_MEMORY_BLOCK =
       CONFIG
           .getBufferedArraysMemoryManager()
-          .forceAllocate("BufferedArrays", MemoryBlockType.FUNCTION);
+          .forceAllocate("BufferedArrays", MemoryBlockType.DYNAMIC);
 
   /** threshold total size of arrays for all data types */
   private static final double POOLED_ARRAYS_MEMORY_THRESHOLD =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
index a079e01bd9a..7819e9b5a53 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
@@ -78,13 +78,13 @@ public class SystemInfo {
 
   private SystemInfo() {
     compactionMemoryBlock =
-        config.getCompactionMemoryManager().forceAllocate("Compaction", 
MemoryBlockType.FUNCTION);
+        config.getCompactionMemoryManager().forceAllocate("Compaction", 
MemoryBlockType.DYNAMIC);
     walBufferQueueMemoryBlock =
-        config.getWalBufferQueueManager().forceAllocate("WalBufferQueue", 
MemoryBlockType.FUNCTION);
+        config.getWalBufferQueueManager().forceAllocate("WalBufferQueue", 
MemoryBlockType.DYNAMIC);
     directBufferMemoryBlock =
         config
             .getDirectBufferMemoryManager()
-            .forceAllocate("DirectBuffer", MemoryBlockType.FUNCTION);
+            .forceAllocate("DirectBuffer", MemoryBlockType.DYNAMIC);
     loadWriteMemory();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
index 65194d0fbe8..85e96683413 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
@@ -51,7 +51,7 @@ public class TimePartitionManager {
         IoTDBDescriptor.getInstance()
             .getConfig()
             .getTimePartitionInfoMemoryManager()
-            .forceAllocate("TimePartitionInfoMemoryBlock", 
MemoryBlockType.FUNCTION);
+            .forceAllocate("TimePartitionInfoMemoryBlock", 
MemoryBlockType.DYNAMIC);
   }
 
   public void registerTimePartitionInfo(TimePartitionInfo timePartitionInfo) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
index 62bb52a45c6..2cecc1ed3a6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
@@ -49,7 +49,7 @@ public class TsFileResourceManager {
 
   private TsFileResourceManager() {
     memoryBlock =
-        CONFIG.getTimeIndexMemoryManager().forceAllocate("TimeIndex", 
MemoryBlockType.FUNCTION);
+        CONFIG.getTimeIndexMemoryManager().forceAllocate("TimeIndex", 
MemoryBlockType.DYNAMIC);
   }
 
   @TestOnly
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index ec4a1a7c2cd..3965622078c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -149,6 +149,7 @@ public enum ThreadName {
   SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),
   WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
   STATEFUL_TRIGGER_INFORMATION_UPDATER("Stateful-Trigger-Information-Updater"),
+  MEMORY_PERIODICAL_JOB_EXECUTOR("Memory-Periodical-Job-Executor"),
   // -------------------------- JVM --------------------------
   // NOTICE: The thread name of jvm cannot be edited here!
   // We list the thread name here just for distinguishing what module the 
thread belongs to.
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java
index 8e6437460e1..829b27292b3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java
@@ -106,6 +106,10 @@ public abstract class IMemoryBlock implements 
AutoCloseable {
     this.totalMemorySizeInBytes = totalMemorySizeInBytes;
   }
 
+  public void resizeByRatio(double ratio) {
+    totalMemorySizeInBytes = (long) (totalMemorySizeInBytes * ratio);
+  }
+
   /** Get the maximum memory size in byte of this memory block */
   public long getTotalMemorySizeInBytes() {
     return totalMemorySizeInBytes;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlockType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlockType.java
index 50d22eee50d..6a20600713e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlockType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlockType.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.commons.memory;
 
 public enum MemoryBlockType {
   NONE,
-  // function related memory
-  FUNCTION,
-  // performance related memory
-  PERFORMANCE,
+  // static memory
+  STATIC,
+  // dynamic memory
+  DYNAMIC,
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java
index 58142d9f490..e591d618fbe 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.commons.memory;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.utils.TestOnly;
 
 import org.slf4j.Logger;
@@ -46,6 +48,9 @@ public class MemoryManager {
   /** Whether memory management is enabled */
   private final boolean enable;
 
+  /** The total allocate memory size in byte of memory manager */
+  private final long allocateTotalMemorySizeInBytes;
+
   /** The total memory size in byte of memory manager */
   private long totalMemorySizeInBytes;
 
@@ -65,6 +70,7 @@ public class MemoryManager {
   public MemoryManager(long totalMemorySizeInBytes) {
     this.name = "Test";
     this.parentMemoryManager = null;
+    this.allocateTotalMemorySizeInBytes = totalMemorySizeInBytes;
     this.totalMemorySizeInBytes = totalMemorySizeInBytes;
     this.enable = false;
   }
@@ -73,6 +79,7 @@ public class MemoryManager {
       String name, MemoryManager parentMemoryManager, long 
totalMemorySizeInBytes) {
     this.name = name;
     this.parentMemoryManager = parentMemoryManager;
+    this.allocateTotalMemorySizeInBytes = totalMemorySizeInBytes;
     this.totalMemorySizeInBytes = totalMemorySizeInBytes;
     this.enable = false;
   }
@@ -81,6 +88,7 @@ public class MemoryManager {
       String name, MemoryManager parentMemoryManager, long 
totalMemorySizeInBytes, boolean enable) {
     this.name = name;
     this.parentMemoryManager = parentMemoryManager;
+    this.allocateTotalMemorySizeInBytes = totalMemorySizeInBytes;
     this.totalMemorySizeInBytes = totalMemorySizeInBytes;
     this.enable = enable;
   }
@@ -446,6 +454,10 @@ public class MemoryManager {
     this.totalMemorySizeInBytes = totalMemorySizeInBytes;
   }
 
+  public void expandTotalMemorySizeInBytes(long totalMemorySizeInBytes) {
+    this.totalMemorySizeInBytes += totalMemorySizeInBytes;
+  }
+
   public void setTotalMemorySizeInBytesWithReload(long totalMemorySizeInBytes) 
{
     reAllocateMemoryAccordingToRatio((double) totalMemorySizeInBytes / 
this.totalMemorySizeInBytes);
   }
@@ -460,6 +472,11 @@ public class MemoryManager {
     return allocatedMemorySizeInBytes;
   }
 
+  /** Get used memory ratio */
+  public double getUsedMemoryRatio() {
+    return (double) getUsedMemorySizeInBytes() / totalMemorySizeInBytes;
+  }
+
   /** Get actual used memory size in bytes of memory manager */
   public long getUsedMemorySizeInBytes() {
     long memorySize =
@@ -482,8 +499,16 @@ public class MemoryManager {
 
     private static final MemoryManager GLOBAL =
         new MemoryManager("GlobalMemoryManager", null, 
Runtime.getRuntime().totalMemory());
-
-    private MemoryManagerHolder() {}
+    private static final MemoryPeriodicalJobExecutor EXECUTOR =
+        new MemoryPeriodicalJobExecutor(
+            IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+                ThreadName.MEMORY_PERIODICAL_JOB_EXECUTOR.getName()),
+            20);
+
+    private MemoryManagerHolder() {
+      EXECUTOR.register(
+          "GlobalMemoryManager#updateAllocate()", 
MemoryManagerHolder.GLOBAL::updateAllocate, 60);
+    }
   }
 
   // endregion
@@ -514,10 +539,62 @@ public class MemoryManager {
     sb.append(this);
     LOGGER.info(sb.toString());
     for (IMemoryBlock block : allocatedMemoryBlocks.values()) {
-      block.print(index + 1);
+      block.print(index + 2);
     }
     for (MemoryManager child : children.values()) {
       child.print(index + 1);
     }
   }
+
+  /** Whether is able to shrink */
+  public synchronized long shrink() {
+    long shrinkSize =
+        Math.min(
+            getAvailableMemorySizeInBytes() / 10,
+            totalMemorySizeInBytes - allocatedMemorySizeInBytes * 9 / 10);
+    totalMemorySizeInBytes -= shrinkSize;
+    return shrinkSize;
+  }
+
+  /** Whether is available to shrink */
+  public boolean isAvailableToShrink() {
+    return allocateTotalMemorySizeInBytes - totalMemorySizeInBytes
+        < allocateTotalMemorySizeInBytes / 10;
+  }
+
+  public void updateAllocate() {
+    if (children.isEmpty()) {
+      double ratio = (double) totalMemorySizeInBytes / 
allocateTotalMemorySizeInBytes;
+      for (IMemoryBlock memoryBlock : allocatedMemoryBlocks.values()) {
+        memoryBlock.resizeByRatio(ratio);
+      }
+    } else {
+      MemoryManager higherMemoryManager = null;
+      MemoryManager lowerMemoryManager = null;
+      // search the highest and lowest memory manager
+      for (MemoryManager child : children.values()) {
+        if (higherMemoryManager == null) {
+          higherMemoryManager = child;
+          lowerMemoryManager = child;
+        } else {
+          if (child.getUsedMemorySizeInBytes() > 
higherMemoryManager.getUsedMemorySizeInBytes()) {
+            higherMemoryManager = child;
+          }
+          if (lowerMemoryManager.isAvailableToShrink()
+              && child.getUsedMemorySizeInBytes() < 
lowerMemoryManager.getUsedMemorySizeInBytes()) {
+            lowerMemoryManager = child;
+          }
+        }
+      }
+      if (higherMemoryManager != null && 
!higherMemoryManager.equals(lowerMemoryManager)) {
+        // transfer
+        long transferSize = lowerMemoryManager.shrink();
+        higherMemoryManager.expandTotalMemorySizeInBytes(transferSize);
+        LOGGER.info("Transfer Memory Size from {} to {}", higherMemoryManager, 
lowerMemoryManager);
+      }
+      for (MemoryManager memoryManager : children.values()) {
+        memoryManager.updateAllocate();
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryPeriodicalJobExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryPeriodicalJobExecutor.java
new file mode 100644
index 00000000000..59662dc5a29
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryPeriodicalJobExecutor.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.memory;
+
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.utils.TestOnly;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class MemoryPeriodicalJobExecutor {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MemoryPeriodicalJobExecutor.class);
+
+  private final ScheduledExecutorService executorService;
+  private final long minIntervalSeconds;
+
+  private long rounds;
+  private Future<?> executorFuture;
+
+  private final List<Pair<WrappedRunnable, Long>> periodicalJobs = new 
CopyOnWriteArrayList<>();
+
+  public MemoryPeriodicalJobExecutor(
+      final ScheduledExecutorService executorService, final long 
minIntervalSeconds) {
+    this.executorService = executorService;
+    this.minIntervalSeconds = minIntervalSeconds;
+  }
+
+  public void register(String id, Runnable periodicalJob, long 
intervalInSeconds) {
+    periodicalJobs.add(
+        new Pair<>(
+            new WrappedRunnable() {
+              @Override
+              public void runMayThrow() {
+                try {
+                  periodicalJob.run();
+                } catch (Exception e) {
+                  LOGGER.warn("Periodical job {} failed.", id, e);
+                }
+              }
+            },
+            Math.max(intervalInSeconds / minIntervalSeconds, 1)));
+    LOGGER.info(
+        "Memory periodical job {} is registered successfully. Interval: {} 
seconds.",
+        id,
+        Math.max(intervalInSeconds / minIntervalSeconds, 1) * 
minIntervalSeconds);
+  }
+
+  public synchronized void start() {
+    if (executorFuture == null) {
+      rounds = 0;
+
+      executorFuture =
+          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+              executorService,
+              this::execute,
+              minIntervalSeconds,
+              minIntervalSeconds,
+              TimeUnit.SECONDS);
+      LOGGER.info("Memory periodical job executor is started successfully.");
+    }
+  }
+
+  protected void execute() {
+    ++rounds;
+
+    for (final Pair<WrappedRunnable, Long> periodicalJob : periodicalJobs) {
+      if (rounds % periodicalJob.right == 0) {
+        periodicalJob.left.run();
+      }
+    }
+  }
+
+  public synchronized void stop() {
+    if (executorFuture != null) {
+      executorFuture.cancel(false);
+      executorFuture = null;
+      LOGGER.info("Memory periodical job executor is stopped successfully.");
+    }
+  }
+
+  @TestOnly
+  public void clear() {
+    periodicalJobs.clear();
+    LOGGER.info("All memory periodical jobs are cleared successfully.");
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryManagerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryManagerTest.java
index 7630d86f911..fa777526120 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryManagerTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryManagerTest.java
@@ -49,22 +49,22 @@ public class MemoryManagerTest {
 
     // create memoryBlock1 in the size of 20 from globalMemoryManager
     IMemoryBlock memoryBlock1 =
-        GLOBAL_MEMORY_MANAGER.forceAllocate("Block1", 20, 
MemoryBlockType.FUNCTION);
+        GLOBAL_MEMORY_MANAGER.forceAllocate("Block1", 20, 
MemoryBlockType.STATIC);
     Assert.assertEquals(80, 
GLOBAL_MEMORY_MANAGER.getAvailableMemorySizeInBytes());
     Assert.assertEquals(20, 
GLOBAL_MEMORY_MANAGER.getAllocatedMemorySizeInBytes());
     Assert.assertEquals("Block1", memoryBlock1.getName());
     Assert.assertEquals(20, memoryBlock1.getTotalMemorySizeInBytes());
-    Assert.assertEquals(MemoryBlockType.FUNCTION, 
memoryBlock1.getMemoryBlockType());
+    Assert.assertEquals(MemoryBlockType.STATIC, 
memoryBlock1.getMemoryBlockType());
 
     // create memoryBlock2 in the size of 10 from globalMemoryManager
     IMemoryBlock memoryBlock2 =
         GLOBAL_MEMORY_MANAGER.forceAllocateIfSufficient(
-            "Block2", 10, 0.9f, MemoryBlockType.PERFORMANCE);
+            "Block2", 10, 0.9f, MemoryBlockType.DYNAMIC);
     Assert.assertEquals(70, 
GLOBAL_MEMORY_MANAGER.getAvailableMemorySizeInBytes());
     Assert.assertEquals(30, 
GLOBAL_MEMORY_MANAGER.getAllocatedMemorySizeInBytes());
     Assert.assertEquals("Block2", memoryBlock2.getName());
     Assert.assertEquals(10, memoryBlock2.getTotalMemorySizeInBytes());
-    Assert.assertEquals(MemoryBlockType.PERFORMANCE, 
memoryBlock2.getMemoryBlockType());
+    Assert.assertEquals(MemoryBlockType.DYNAMIC, 
memoryBlock2.getMemoryBlockType());
 
     // create subMemoryManager in the size of 50 from globalMemoryManager
     MemoryManager subMemoryManager =
@@ -83,11 +83,11 @@ public class MemoryManagerTest {
 
     // create memoryBlock4 in the size of 50 from subMemoryManager
     IMemoryBlock memoryBlock4 =
-        subMemoryManager.tryAllocate("Block4", 100, size -> size / 2, 
MemoryBlockType.FUNCTION);
+        subMemoryManager.tryAllocate("Block4", 100, size -> size / 2, 
MemoryBlockType.STATIC);
     Assert.assertEquals(0, 
GLOBAL_MEMORY_MANAGER.getAvailableMemorySizeInBytes());
     Assert.assertEquals("Block4", memoryBlock4.getName());
     Assert.assertEquals(50, memoryBlock4.getTotalMemorySizeInBytes());
-    Assert.assertEquals(MemoryBlockType.FUNCTION, 
memoryBlock4.getMemoryBlockType());
+    Assert.assertEquals(MemoryBlockType.STATIC, 
memoryBlock4.getMemoryBlockType());
 
     Assert.assertEquals(0, GLOBAL_MEMORY_MANAGER.getUsedMemorySizeInBytes());
 
@@ -143,17 +143,17 @@ public class MemoryManagerTest {
 
     // create memoryBlock when enable
     IMemoryBlock memoryBlock5 =
-        subMemoryManager.forceAllocate("Block5", 10, MemoryBlockType.FUNCTION);
+        subMemoryManager.forceAllocate("Block5", 10, MemoryBlockType.DYNAMIC);
     Assert.assertEquals(10, memoryBlock5.getTotalMemorySizeInBytes());
     Assert.assertEquals(20, subMemoryManager.getAvailableMemorySizeInBytes());
     Assert.assertNull(
-        subMemoryManager.forceAllocateIfSufficient("Block6", 5, 0.6f, 
MemoryBlockType.FUNCTION));
+        subMemoryManager.forceAllocateIfSufficient("Block6", 5, 0.6f, 
MemoryBlockType.STATIC));
     IMemoryBlock memoryBlock6 =
-        subMemoryManager.forceAllocateIfSufficient("Block6", 5, 0.8f, 
MemoryBlockType.FUNCTION);
+        subMemoryManager.forceAllocateIfSufficient("Block6", 5, 0.8f, 
MemoryBlockType.DYNAMIC);
     Assert.assertEquals(5, memoryBlock6.getTotalMemorySizeInBytes());
     Assert.assertEquals(15, subMemoryManager.getAvailableMemorySizeInBytes());
     IMemoryBlock memoryBlock7 =
-        subMemoryManager2.tryAllocate("Block7", 5, size -> size / 2, 
MemoryBlockType.FUNCTION);
+        subMemoryManager2.tryAllocate("Block7", 5, size -> size / 2, 
MemoryBlockType.STATIC);
     Assert.assertEquals(5, memoryBlock7.getTotalMemorySizeInBytes());
     Assert.assertEquals(15, subMemoryManager2.getAvailableMemorySizeInBytes());
 

Reply via email to