This is an automated email from the ASF dual-hosted git repository.

spricoder pushed a commit to branch feature/memory_heap_adapt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 96c5bad0345d9e9e182c7814f3a7b2f2bc08011d
Author: spricoder <[email protected]>
AuthorDate: Tue Apr 8 12:34:27 2025 +0800

    Add heap memory adaption
---
 .../iotdb/consensus/config/IoTConsensusConfig.java |   2 +-
 .../logdispatcher/IoTConsensusMemoryManager.java   |   2 +-
 .../apache/iotdb/db/conf/DataNodeMemoryConfig.java |  23 +++--
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  12 +--
 .../db/consensus/DataRegionConsensusImpl.java      |   2 +-
 .../db/pipe/resource/memory/PipeMemoryManager.java |   4 +-
 .../analyze/cache/partition/PartitionCache.java    |  13 ++-
 .../cache/schema/DataNodeDevicePathCache.java      |  18 +++-
 .../fetcher/cache/TableDeviceSchemaCache.java      |  23 ++++-
 .../java/org/apache/iotdb/db/service/DataNode.java |   4 +
 .../db/storageengine/buffer/BloomFilterCache.java  |  29 ++++--
 .../iotdb/db/storageengine/buffer/ChunkCache.java  |  30 ++++--
 .../buffer/TimeSeriesMetadataCache.java            |  29 ++++--
 .../storageengine/dataregion/DataRegionInfo.java   |   1 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |  20 ++++
 .../iotdb/commons/conf/CommonDescriptor.java       |   8 ++
 .../commons/memory/AtomicLongMemoryBlock.java      |  30 +++---
 .../apache/iotdb/commons/memory/IMemoryBlock.java  |  48 +++++++--
 .../apache/iotdb/commons/memory/MemoryManager.java |  76 ++++++++++----
 .../memory/MemoryPeriodicalJobExecutor.java        | 111 +++++++++++++++++++++
 .../commons/memory/MemoryRuntimeController.java    | 106 ++++++++++++++++++++
 .../apache/iotdb/commons/service/ServiceType.java  |   2 +-
 22 files changed, 494 insertions(+), 99 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 0621aee23ef..c01553c1391 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -359,7 +359,7 @@ public class IoTConsensusConfig {
       private long checkpointGap = 500;
       private IMemoryBlock consensusMemoryBlock =
           new AtomicLongMemoryBlock(
-              "Consensus-Default", null, Runtime.getRuntime().maxMemory() / 
10);
+              "Consensus-Default", null, Runtime.getRuntime().totalMemory() / 
10);
       private double maxMemoryRatioForQueue = 0.6;
       private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L;
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
index 7bee00588cd..4cb141f192f 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
@@ -33,7 +33,7 @@ public class IoTConsensusMemoryManager {
   private final AtomicLong queueMemorySizeInByte = new AtomicLong(0);
   private final AtomicLong syncMemorySizeInByte = new AtomicLong(0);
   private IMemoryBlock memoryBlock =
-      new AtomicLongMemoryBlock("Consensus-Default", null, 
Runtime.getRuntime().maxMemory() / 10);
+      new AtomicLongMemoryBlock("Consensus-Default", null, 
Runtime.getRuntime().totalMemory() / 10);
   private Double maxMemoryRatioForQueue = 0.6;
 
   private IoTConsensusMemoryManager() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java
index 8de17422cc6..8fb4f86d7e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.ConfigurationFileUtils;
 import org.apache.iotdb.commons.conf.TrimProperties;
 import org.apache.iotdb.commons.memory.MemoryConfig;
 import org.apache.iotdb.commons.memory.MemoryManager;
+import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.MemUtils;
 
 import org.slf4j.Logger;
@@ -59,8 +60,9 @@ public class DataNodeMemoryConfig {
   private int queryThreadCount = Runtime.getRuntime().availableProcessors();
 
   /** Max bytes of each FragmentInstance for DataExchange */
+  // TODO @spricoder : influence dynamic change of memory size
   private long maxBytesPerFragmentInstance =
-      Runtime.getRuntime().maxMemory() * 3 / 10 * 200 / 1001 / 
queryThreadCount;
+      Runtime.getRuntime().totalMemory() * 3 / 10 * 200 / 1001 / 
queryThreadCount;
 
   /** The memory manager of on heap */
   private MemoryManager onHeapMemoryManager;
@@ -151,18 +153,18 @@ public class DataNodeMemoryConfig {
       }
     }
 
-    long storageEngineMemorySize = Runtime.getRuntime().maxMemory() * 3 / 10;
-    long queryEngineMemorySize = Runtime.getRuntime().maxMemory() * 3 / 10;
-    long schemaEngineMemorySize = Runtime.getRuntime().maxMemory() / 10;
-    long consensusMemorySize = Runtime.getRuntime().maxMemory() / 10;
-    long pipeMemorySize = Runtime.getRuntime().maxMemory() / 10;
+    long storageEngineMemorySize = Runtime.getRuntime().totalMemory() * 3 / 10;
+    long queryEngineMemorySize = Runtime.getRuntime().totalMemory() * 3 / 10;
+    long schemaEngineMemorySize = Runtime.getRuntime().totalMemory() / 10;
+    long consensusMemorySize = Runtime.getRuntime().totalMemory() / 10;
+    long pipeMemorySize = Runtime.getRuntime().totalMemory() / 10;
     if (memoryAllocateProportion != null) {
       String[] proportions = memoryAllocateProportion.split(":");
       int proportionSum = 0;
       for (String proportion : proportions) {
         proportionSum += Integer.parseInt(proportion.trim());
       }
-      long maxMemoryAvailable = Runtime.getRuntime().maxMemory();
+      long maxMemoryAvailable = Runtime.getRuntime().totalMemory();
 
       if (proportionSum != 0) {
         storageEngineMemorySize =
@@ -189,9 +191,12 @@ public class DataNodeMemoryConfig {
       }
     }
     onHeapMemoryManager =
-        MemoryConfig.global().getOrCreateMemoryManager("OnHeap", 
Runtime.getRuntime().maxMemory());
+        MemoryConfig.global()
+            .getOrCreateMemoryManager("OnHeap", 
Runtime.getRuntime().totalMemory());
     storageEngineMemoryManager =
-        onHeapMemoryManager.getOrCreateMemoryManager("StorageEngine", 
storageEngineMemorySize);
+        onHeapMemoryManager
+            .getOrCreateMemoryManager("StorageEngine", storageEngineMemorySize)
+            .setMemoryUpdateCallback((before, after) -> 
SystemInfo.getInstance().loadWriteMemory());
     queryEngineMemoryManager =
         onHeapMemoryManager.getOrCreateMemoryManager("QueryEngine", 
queryEngineMemorySize);
     schemaEngineMemoryManager =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index c2d6f90e5ec..cf95e2582c4 100755
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -50,7 +50,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
 import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector;
 import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
-import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm;
 import org.apache.iotdb.external.api.IPropertiesLoader;
@@ -2333,6 +2332,7 @@ public class IoTDBDescriptor {
       for (String proportion : proportions) {
         proportionSum += Integer.parseInt(proportion.trim());
       }
+      // TODO @spricoder: consider whether this part need change when total 
memory is dynamic
       float maxMemoryAvailable = conf.getUdfMemoryBudgetInMB();
       try {
         conf.setUdfReaderMemoryBudgetInMB(
@@ -2630,12 +2630,12 @@ public class IoTDBDescriptor {
     // first we need to release the memory allocated for consensus
     MemoryManager storageEngineMemoryManager = 
memoryConfig.getStorageEngineMemoryManager();
     MemoryManager consensusMemoryManager = 
memoryConfig.getConsensusMemoryManager();
+    long originSize = 
storageEngineMemoryManager.getInitialAllocatedMemorySizeInBytes();
     long newSize =
-        storageEngineMemoryManager.getTotalMemorySizeInBytes()
-            + consensusMemoryManager.getTotalMemorySizeInBytes();
-    consensusMemoryManager.setTotalMemorySizeInBytes(0);
-    storageEngineMemoryManager.setTotalMemorySizeInBytesWithReload(newSize);
-    SystemInfo.getInstance().loadWriteMemory();
+        storageEngineMemoryManager.getInitialAllocatedMemorySizeInBytes()
+            + consensusMemoryManager.getInitialAllocatedMemorySizeInBytes();
+    storageEngineMemoryManager.resizeByRatio((double) newSize / originSize);
+    consensusMemoryManager.resizeByRatio(0);
   }
 
   private static class IoTDBDescriptorHolder {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 38ce6a0f158..22eed36d46c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -118,7 +118,7 @@ public class DataRegionConsensusImpl {
       IMemoryBlock memoryBlock =
           MEMORY_CONFIG
               .getConsensusMemoryManager()
-              .exactAllocate("Consensus", MemoryBlockType.DYNAMIC);
+              .exactAllocate("Consensus", MemoryBlockType.STATIC);
       return ConsensusConfig.newBuilder()
           .setThisNodeId(CONF.getDataNodeId())
           .setThisNode(new TEndPoint(CONF.getInternalAddress(), 
CONF.getDataRegionConsensusPort()))
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 0c7a473950d..8a2e370218b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -52,11 +52,11 @@ public class PipeMemoryManager {
       PipeConfig.getInstance().getPipeMemoryAllocateMinSizeInBytes();
 
   // TODO @spricoder: consider combine memory block and used MemorySizeInBytes
-  private IMemoryBlock memoryBlock =
+  private final IMemoryBlock memoryBlock =
       IoTDBDescriptor.getInstance()
           .getMemoryConfig()
           .getPipeMemoryManager()
-          .exactAllocate("Stream", MemoryBlockType.DYNAMIC);
+          .exactAllocate("Stream", MemoryBlockType.STATIC);
 
   private static final double EXCEED_PROTECT_THRESHOLD = 0.95;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index 15828538c7f..e45ab7a5b3b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.thrift.TException;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.slf4j.Logger;
@@ -120,12 +121,22 @@ public class PartitionCache {
 
   private final CacheMetrics cacheMetrics;
   private final IMemoryBlock memoryBlock;
+  private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
 
   public PartitionCache() {
     this.memoryBlock =
         memoryConfig
             .getPartitionCacheMemoryManager()
-            .exactAllocate("PartitionCache", MemoryBlockType.STATIC);
+            .exactAllocate("PartitionCache", MemoryBlockType.STATIC)
+            .setMemoryUpdateCallback(
+                (oldMemory, newMemory) -> {
+                  memoryUsageCheatFactor.updateAndGet(
+                      factor -> factor / ((double) newMemory / oldMemory));
+                  logger.debug(
+                      "[MemoryUsageCheatFactor] PartitionCache has updated 
from {} to {}.",
+                      oldMemory,
+                      newMemory);
+                });
     this.memoryBlock.allocate(this.memoryBlock.getTotalMemorySizeInBytes());
     // TODO @spricoder: PartitionCache need to be controlled according to 
memory
     this.schemaPartitionCache =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
index 07293bc4ca3..480aee2b179 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Weigher;
+import com.google.common.util.concurrent.AtomicDouble;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,7 @@ public class DataNodeDevicePathCache {
   private static final DataNodeMemoryConfig memoryConfig =
       IoTDBDescriptor.getInstance().getMemoryConfig();
   private final IMemoryBlock devicePathCacheMemoryBlock;
+  private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
 
   private final Cache<String, PartialPath> devicePathCache;
 
@@ -46,14 +48,26 @@ public class DataNodeDevicePathCache {
     devicePathCacheMemoryBlock =
         memoryConfig
             .getDevicePathCacheMemoryManager()
-            .exactAllocate("DevicePathCache", MemoryBlockType.STATIC);
+            .exactAllocate("DevicePathCache", MemoryBlockType.STATIC)
+            .setMemoryUpdateCallback(
+                (oldMemory, newMemory) -> {
+                  memoryUsageCheatFactor.updateAndGet(
+                      factor -> factor / ((double) newMemory / oldMemory));
+                  LOGGER.debug(
+                      "[MemoryUsageCheatFactor]DataNodeDevicePathCache has 
updated from {} to {}.",
+                      oldMemory,
+                      newMemory);
+                });
+    ;
     // TODO @spricoder: later we can find a way to get the byte size of cache
     
devicePathCacheMemoryBlock.allocate(devicePathCacheMemoryBlock.getTotalMemorySizeInBytes());
     devicePathCache =
         Caffeine.newBuilder()
             
.maximumWeight(devicePathCacheMemoryBlock.getTotalMemorySizeInBytes())
             .weigher(
-                (Weigher<String, PartialPath>) (key, val) -> 
(PartialPath.estimateSize(val) + 32))
+                (Weigher<String, PartialPath>)
+                    (key, val) ->
+                        (int) ((PartialPath.estimateSize(val) + 32) * 
memoryUsageCheatFactor.get()))
             .build();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
index a724bc427c8..7ef79e8272f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectN
 import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegion;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
 
+import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.apache.tsfile.read.TimeValuePair;
@@ -106,20 +107,34 @@ public class TableDeviceSchemaCache {
   private final ReentrantReadWriteLock readWriteLock = new 
ReentrantReadWriteLock(false);
 
   private final IMemoryBlock memoryBlock;
+  private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
 
   private TableDeviceSchemaCache() {
     memoryBlock =
         memoryConfig
             .getSchemaCacheMemoryManager()
-            .exactAllocate("TableDeviceSchemaCache", MemoryBlockType.STATIC);
+            .exactAllocate("TableDeviceSchemaCache", MemoryBlockType.STATIC)
+            .setMemoryUpdateCallback(
+                (oldMemory, newMemory) -> {
+                  memoryUsageCheatFactor.updateAndGet(
+                      factor -> factor / ((double) newMemory / oldMemory));
+                  logger.debug(
+                      "[MemoryUsageCheatFactor]TableDeviceSchemaCache has 
updated from {} to {}.",
+                      oldMemory,
+                      newMemory);
+                });
     dualKeyCache =
         new DualKeyCacheBuilder<TableId, IDeviceID, TableDeviceCacheEntry>()
             .cacheEvictionPolicy(
                 
DualKeyCachePolicy.valueOf(config.getDataNodeSchemaCacheEvictionPolicy()))
             .memoryCapacity(memoryBlock.getTotalMemorySizeInBytes())
-            .firstKeySizeComputer(TableId::estimateSize)
-            .secondKeySizeComputer(deviceID -> (int) deviceID.ramBytesUsed())
-            .valueSizeComputer(TableDeviceCacheEntry::estimateSize)
+            .firstKeySizeComputer(
+                tableId -> (int) (tableId.estimateSize() * 
memoryUsageCheatFactor.get()))
+            .secondKeySizeComputer(
+                deviceID -> (int) (deviceID.ramBytesUsed() * 
memoryUsageCheatFactor.get()))
+            .valueSizeComputer(
+                tableDeviceCacheEntry ->
+                    (int) (tableDeviceCacheEntry.estimateSize() * 
memoryUsageCheatFactor.get()))
             .build();
     memoryBlock.allocate(memoryBlock.getTotalMemorySizeInBytes());
     MetricService.getInstance().addMetricSet(new 
TableDeviceSchemaCacheMetrics(this));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index b554eb92419..24c857eba8d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.memory.MemoryRuntimeController;
 import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.service.JMXService;
@@ -245,6 +246,9 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
       // Serialize mutable system properties
       
IoTDBStartCheck.getInstance().serializeMutableSystemPropertiesIfNecessary();
 
+      // Setup memory controller
+      registerManager.register(MemoryRuntimeController.getInstance());
+
       logger.info("IoTDB configuration: {}", config.getConfigMessage());
       logger.info("Congratulations, IoTDB DataNode is set up successfully. 
Now, enjoy yourself!");
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
index 7a41352098b..f3f58a24414 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Weigher;
+import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.utils.BloomFilter;
 import org.apache.tsfile.utils.RamUsageEstimator;
@@ -51,32 +52,44 @@ public class BloomFilterCache {
   private static final Logger DEBUG_LOGGER = 
LoggerFactory.getLogger("QUERY_DEBUG");
   private static final DataNodeMemoryConfig MEMORY_CONFIG =
       IoTDBDescriptor.getInstance().getMemoryConfig();
-  private static final IMemoryBlock CACHE_MEMORY_BLOCK;
+  private static final IMemoryBlock cacheMemoryBlock;
+  private static final AtomicDouble memoryUsageCheatFactor = new 
AtomicDouble(1);
   private static final boolean CACHE_ENABLE = 
MEMORY_CONFIG.isMetaDataCacheEnable();
   private final AtomicLong entryAverageSize = new AtomicLong(0);
 
   private final Cache<BloomFilterCacheKey, BloomFilter> lruCache;
 
   static {
-    CACHE_MEMORY_BLOCK =
+    cacheMemoryBlock =
         MEMORY_CONFIG
             .getBloomFilterCacheMemoryManager()
-            .exactAllocate("BloomFilterCache", MemoryBlockType.STATIC);
+            .exactAllocate("BloomFilterCache", MemoryBlockType.STATIC)
+            .setMemoryUpdateCallback(
+                (oldMemory, newMemory) -> {
+                  memoryUsageCheatFactor.updateAndGet(
+                      factor -> factor / ((double) newMemory / oldMemory));
+                  LOGGER.debug(
+                      "[MemoryUsageCheatFactor]BloomFilterCache has updated 
from {} to {}.",
+                      oldMemory,
+                      newMemory);
+                });
     // TODO @spricoder: find a way to get the size of the BloomFilterCache
-    
CACHE_MEMORY_BLOCK.allocate(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes());
+    cacheMemoryBlock.allocate(cacheMemoryBlock.getTotalMemorySizeInBytes());
   }
 
   private BloomFilterCache() {
     if (CACHE_ENABLE) {
-      LOGGER.info("BloomFilterCache size = {}", 
CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes());
+      LOGGER.info("BloomFilterCache size = {}", 
cacheMemoryBlock.getTotalMemorySizeInBytes());
     }
     lruCache =
         Caffeine.newBuilder()
-            .maximumWeight(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes())
+            .maximumWeight(cacheMemoryBlock.getTotalMemorySizeInBytes())
             .weigher(
                 (Weigher<BloomFilterCacheKey, BloomFilter>)
                     (key, bloomFilter) ->
-                        (int) (key.getRetainedSizeInBytes() + 
bloomFilter.getRetainedSizeInBytes()))
+                        (int)
+                            ((key.getRetainedSizeInBytes() + 
bloomFilter.getRetainedSizeInBytes())
+                                * memoryUsageCheatFactor.get()))
             .recordStats()
             .build();
   }
@@ -131,7 +144,7 @@ public class BloomFilterCache {
   }
 
   public long getMaxMemory() {
-    return CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes();
+    return cacheMemoryBlock.getTotalMemorySizeInBytes();
   }
 
   public double getAverageLoadPenalty() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
index 1c39b993a3c..f84a1da7fe7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Weigher;
+import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.tsfile.file.metadata.statistics.Statistics;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.common.Chunk;
@@ -63,7 +64,8 @@ public class ChunkCache {
   private static final Logger DEBUG_LOGGER = 
LoggerFactory.getLogger("QUERY_DEBUG");
   private static final DataNodeMemoryConfig MEMORY_CONFIG =
       IoTDBDescriptor.getInstance().getMemoryConfig();
-  private static final IMemoryBlock CACHE_MEMORY_BLOCK;
+  private static final IMemoryBlock cacheMemoryBlock;
+  private static final AtomicDouble memoryUsageCheatFactor = new 
AtomicDouble(1);
   private static final boolean CACHE_ENABLE = 
MEMORY_CONFIG.isMetaDataCacheEnable();
 
   private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET =
@@ -73,25 +75,37 @@ public class ChunkCache {
   private final Cache<ChunkCacheKey, Chunk> lruCache;
 
   static {
-    CACHE_MEMORY_BLOCK =
+    cacheMemoryBlock =
         MEMORY_CONFIG
             .getChunkCacheMemoryManager()
-            .exactAllocate("ChunkCache", MemoryBlockType.STATIC);
+            .exactAllocate("ChunkCache", MemoryBlockType.STATIC)
+            .setMemoryUpdateCallback(
+                (oldMemory, newMemory) -> {
+                  memoryUsageCheatFactor.updateAndGet(
+                      factor -> factor / ((double) newMemory / oldMemory));
+                  LOGGER.info(
+                      "[MemoryUsageCheatFactor]ChunkCache has updated from {} 
to {}.",
+                      oldMemory,
+                      newMemory);
+                });
+    ;
     // TODO @spricoder: find a way to get the size of the ChunkCache
-    
CACHE_MEMORY_BLOCK.allocate(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes());
+    cacheMemoryBlock.allocate(cacheMemoryBlock.getTotalMemorySizeInBytes());
   }
 
   private ChunkCache() {
     if (CACHE_ENABLE) {
-      LOGGER.info("ChunkCache size = {}", 
CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes());
+      LOGGER.info("ChunkCache size = {}", 
cacheMemoryBlock.getTotalMemorySizeInBytes());
     }
     lruCache =
         Caffeine.newBuilder()
-            .maximumWeight(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes())
+            .maximumWeight(cacheMemoryBlock.getTotalMemorySizeInBytes())
             .weigher(
                 (Weigher<ChunkCacheKey, Chunk>)
                     (key, chunk) ->
-                        (int) (key.getRetainedSizeInBytes() + 
chunk.getRetainedSizeInBytes()))
+                        (int)
+                            ((key.getRetainedSizeInBytes() + 
chunk.getRetainedSizeInBytes())
+                                * memoryUsageCheatFactor.get()))
             .recordStats()
             .build();
 
@@ -202,7 +216,7 @@ public class ChunkCache {
   }
 
   public long getMaxMemory() {
-    return CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes();
+    return cacheMemoryBlock.getTotalMemorySizeInBytes();
   }
 
   public double getAverageLoadPenalty() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
index f2cd55c5e7d..822f18eaec1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Weigher;
+import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.TimeseriesMetadata;
@@ -67,7 +68,8 @@ public class TimeSeriesMetadataCache {
   private static final Logger DEBUG_LOGGER = 
LoggerFactory.getLogger("QUERY_DEBUG");
   private static final DataNodeMemoryConfig memoryConfig =
       IoTDBDescriptor.getInstance().getMemoryConfig();
-  private static final IMemoryBlock CACHE_MEMORY_BLOCK;
+  private static final IMemoryBlock cacheMemoryBlock;
+  private static final AtomicDouble memoryUsageCheatFactor = new 
AtomicDouble(1);
   private static final boolean CACHE_ENABLE = 
memoryConfig.isMetaDataCacheEnable();
 
   private final Cache<TimeSeriesMetadataCacheKey, TimeseriesMetadata> lruCache;
@@ -79,26 +81,37 @@ public class TimeSeriesMetadataCache {
   private static final String SEPARATOR = "$";
 
   static {
-    CACHE_MEMORY_BLOCK =
+    cacheMemoryBlock =
         memoryConfig
             .getTimeSeriesMetaDataCacheMemoryManager()
-            .exactAllocate("TimeSeriesMetadataCache", MemoryBlockType.STATIC);
+            .exactAllocate("TimeSeriesMetadataCache", MemoryBlockType.STATIC)
+            .setMemoryUpdateCallback(
+                (oldMemory, newMemory) -> {
+                  memoryUsageCheatFactor.updateAndGet(
+                      factor -> factor / ((double) newMemory / oldMemory));
+                  logger.info(
+                      "[MemoryUsageCheatFactor]TimeSeriesMetadataCache has 
updated from {} to {}.",
+                      oldMemory,
+                      newMemory);
+                });
     // TODO @spricoder find a better way to get the size of cache
-    
CACHE_MEMORY_BLOCK.allocate(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes());
+    cacheMemoryBlock.allocate(cacheMemoryBlock.getTotalMemorySizeInBytes());
   }
 
   private TimeSeriesMetadataCache() {
     if (CACHE_ENABLE) {
       logger.info(
-          "TimeSeriesMetadataCache size = {}", 
CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes());
+          "TimeSeriesMetadataCache size = {}", 
cacheMemoryBlock.getTotalMemorySizeInBytes());
     }
     lruCache =
         Caffeine.newBuilder()
-            .maximumWeight(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes())
+            .maximumWeight(cacheMemoryBlock.getTotalMemorySizeInBytes())
             .weigher(
                 (Weigher<TimeSeriesMetadataCacheKey, TimeseriesMetadata>)
                     (key, value) ->
-                        (int) (key.getRetainedSizeInBytes() + 
value.getRetainedSizeInBytes()))
+                        (int)
+                            ((key.getRetainedSizeInBytes() + 
value.getRetainedSizeInBytes())
+                                * memoryUsageCheatFactor.get()))
             .recordStats()
             .build();
     // add metrics
@@ -265,7 +278,7 @@ public class TimeSeriesMetadataCache {
   }
 
   public long getMaxMemory() {
-    return CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes();
+    return cacheMemoryBlock.getTotalMemorySizeInBytes();
   }
 
   public double getAverageLoadPenalty() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionInfo.java
index 0e8e4902c8b..54e7dfda8b6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionInfo.java
@@ -38,6 +38,7 @@ public class DataRegionInfo {
    */
   private final AtomicLong memoryCost;
 
+  // TODO @spricoder dynamic threshold
   /** The threshold of reporting it's size to SystemInfo */
   private final long storageGroupSizeReportThreshold =
       (long)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index ef289d75c19..913f6fb780e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -385,6 +385,10 @@ public class CommonConfig {
 
   private volatile Pattern trustedUriPattern = Pattern.compile("file:.*");
 
+  // Whtether to enable memory adaption
+  private boolean enableMemoryAdapt = false;
+  private long memoryAdaptIntervalInS = 20;
+
   CommonConfig() {
     // Empty constructor
   }
@@ -1821,4 +1825,20 @@ public class CommonConfig {
   public void setTrustedUriPattern(Pattern trustedUriPattern) {
     this.trustedUriPattern = trustedUriPattern;
   }
+
+  public long getMemoryAdaptIntervalInS() {
+    return memoryAdaptIntervalInS;
+  }
+
+  public void setMemoryAdaptIntervalInS(long memoryAdaptIntervalInS) {
+    this.memoryAdaptIntervalInS = memoryAdaptIntervalInS;
+  }
+
+  public boolean isEnableMemoryAdapt() {
+    return enableMemoryAdapt;
+  }
+
+  public void setEnableMemoryAdapt(boolean enableMemoryAdapt) {
+    this.enableMemoryAdapt = enableMemoryAdapt;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index d57c129d0a6..b21e9fc5ac4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -251,6 +251,14 @@ public class CommonDescriptor {
             properties.getProperty(
                 "cluster_device_limit_threshold",
                 String.valueOf(config.getDeviceLimitThreshold()))));
+    config.setEnableMemoryAdapt(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_memory_adapt", 
Boolean.toString(config.isEnableMemoryAdapt()))));
+    config.setMemoryAdaptIntervalInS(
+        Integer.parseInt(
+            properties.getProperty(
+                "memory_check_interval", 
String.valueOf(config.getMemoryAdaptIntervalInS()))));
 
     loadRetryProperties(properties);
     loadBinaryAllocatorProps(properties);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java
index 1376d8ddb6d..e654213ea75 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java
@@ -52,45 +52,45 @@ public class AtomicLongMemoryBlock extends IMemoryBlock {
   }
 
   @Override
-  public long forceAllocateWithoutLimitation(long sizeInByte) {
-    return usedMemoryInBytes.addAndGet(sizeInByte);
+  public long forceAllocateWithoutLimitation(long sizeInBytes) {
+    return usedMemoryInBytes.addAndGet(sizeInBytes);
   }
 
   @Override
-  public boolean allocate(long sizeInByte) {
+  public boolean allocate(long sizeInBytes) {
     AtomicBoolean result = new AtomicBoolean(false);
     usedMemoryInBytes.updateAndGet(
         memCost -> {
-          if (memCost + sizeInByte > totalMemorySizeInBytes) {
+          if (memCost + sizeInBytes > totalMemorySizeInBytes) {
             return memCost;
           }
           result.set(true);
-          return memCost + sizeInByte;
+          return memCost + sizeInBytes;
         });
     return result.get();
   }
 
   @Override
-  public boolean allocateIfSufficient(final long sizeInByte, final double 
maxRatio) {
+  public boolean allocateIfSufficient(final long sizeInBytes, final double 
maxRatio) {
     AtomicBoolean result = new AtomicBoolean(false);
     usedMemoryInBytes.updateAndGet(
         memCost -> {
-          if (memCost + sizeInByte > totalMemorySizeInBytes * maxRatio) {
+          if (memCost + sizeInBytes > totalMemorySizeInBytes * maxRatio) {
             return memCost;
           }
           result.set(true);
-          return memCost + sizeInByte;
+          return memCost + sizeInBytes;
         });
     return result.get();
   }
 
   @Override
-  public boolean allocateUntilAvailable(long sizeInByte, long 
retryIntervalInMillis)
+  public boolean allocateUntilAvailable(long sizeInBytes, long 
retryIntervalInMillis)
       throws InterruptedException {
     long originSize = usedMemoryInBytes.get();
     while (true) {
-      boolean canUpdate = originSize + sizeInByte <= totalMemorySizeInBytes;
-      if (canUpdate && usedMemoryInBytes.compareAndSet(originSize, originSize 
+ sizeInByte)) {
+      boolean canUpdate = originSize + sizeInBytes <= totalMemorySizeInBytes;
+      if (canUpdate && usedMemoryInBytes.compareAndSet(originSize, originSize 
+ sizeInBytes)) {
         break;
       }
       Thread.sleep(TimeUnit.MILLISECONDS.toMillis(retryIntervalInMillis));
@@ -100,16 +100,16 @@ public class AtomicLongMemoryBlock extends IMemoryBlock {
   }
 
   @Override
-  public long release(long sizeInByte) {
+  public long release(long sizeInBytes) {
     return usedMemoryInBytes.updateAndGet(
         memCost -> {
-          if (sizeInByte > memCost) {
+          if (sizeInBytes > memCost) {
             LOGGER.warn(
                 "The memory cost to be released is larger than the memory cost 
of memory block {}",
                 this);
             return 0;
           }
-          return memCost - sizeInByte;
+          return memCost - sizeInBytes;
         });
   }
 
@@ -118,10 +118,12 @@ public class AtomicLongMemoryBlock extends IMemoryBlock {
     this.usedMemoryInBytes.set(usedMemoryInBytes);
   }
 
+  @Override
   public long getUsedMemoryInBytes() {
     return usedMemoryInBytes.get();
   }
 
+  @Override
   /** Get the free memory in byte of this memory block */
   public long getFreeMemoryInBytes() {
     return totalMemorySizeInBytes - usedMemoryInBytes.get();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java
index 416516bf2c7..5d3bf164dfc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java
@@ -22,6 +22,9 @@ package org.apache.iotdb.commons.memory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+
 public abstract class IMemoryBlock implements AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IMemoryBlock.class);
 
@@ -40,46 +43,50 @@ public abstract class IMemoryBlock implements AutoCloseable 
{
   /** The total memory size in byte of this memory block */
   protected long totalMemorySizeInBytes;
 
+  /** Update related parameters when totalMemorySizeInBytes update */
+  protected final AtomicReference<BiConsumer<Long, Long>> memoryUpdateCallback 
=
+      new AtomicReference<>();
+
   /**
    * Forcibly allocate memory without the limit of totalMemorySizeInBytes
    *
-   * @param sizeInByte the size of memory to be allocated, should be positive
+   * @param sizeInBytes the size of memory to be allocated, should be positive
    * @return the number of bytes actually allocated
    */
-  public abstract long forceAllocateWithoutLimitation(final long sizeInByte);
+  public abstract long forceAllocateWithoutLimitation(final long sizeInBytes);
 
   /**
    * Allocate memory managed by this memory block
    *
-   * @param sizeInByte the size of memory to be allocated, should be positive
+   * @param sizeInBytes the size of memory to be allocated, should be positive
    */
-  public abstract boolean allocate(final long sizeInByte);
+  public abstract boolean allocate(final long sizeInBytes);
 
   /**
    * Allocate memory managed by this memory block. if the currently used ratio 
is already above
-   * maxRatio, the allocation will fail".
+   * maxRatio, the allocation will fail.
    *
-   * @param sizeInByte the size of memory to be allocated, should be positive
+   * @param sizeInBytes the size of memory to be allocated, should be positive
    * @param maxRatio the maximum ratio of memory can be allocated
    */
-  public abstract boolean allocateIfSufficient(final long sizeInByte, final 
double maxRatio);
+  public abstract boolean allocateIfSufficient(final long sizeInBytes, final 
double maxRatio);
 
   /**
    * Allocate memory managed by this memory block until the required memory is 
available
    *
-   * @param sizeInByte the size of memory to be allocated, should be positive
+   * @param sizeInBytes the size of memory to be allocated, should be positive
    * @param retryIntervalInMillis the time interval to wait after each 
allocation failure
    */
-  public abstract boolean allocateUntilAvailable(final long sizeInByte, long 
retryIntervalInMillis)
+  public abstract boolean allocateUntilAvailable(final long sizeInBytes, long 
retryIntervalInMillis)
       throws InterruptedException;
 
   /**
    * Try to release memory managed by this memory block
    *
-   * @param sizeInByte the size of memory to be released, should be positive
+   * @param sizeInBytes the size of memory to be released, should be positive
    * @return the used size after release, zero if the release fails
    */
-  public abstract long release(final long sizeInByte);
+  public abstract long release(final long sizeInBytes);
 
   /**
    * Try to set memory usage in byte of this memory block (for test only)
@@ -99,6 +106,25 @@ public abstract class IMemoryBlock implements AutoCloseable 
{
     return name;
   }
 
+  /** Update total memory size by ratio */
+  public long resizeByRatio(double ratio) {
+    long before = this.totalMemorySizeInBytes;
+    this.totalMemorySizeInBytes = (long) (before * ratio);
+    if (memoryUpdateCallback.get() != null) {
+      try {
+        memoryUpdateCallback.get().accept(before, totalMemorySizeInBytes);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to execute the update callback.", e);
+      }
+    }
+    return this.totalMemorySizeInBytes - before;
+  }
+
+  public IMemoryBlock setMemoryUpdateCallback(BiConsumer<Long, Long> 
memoryUpdateCallback) {
+    this.memoryUpdateCallback.set(memoryUpdateCallback);
+    return this;
+  }
+
   /** Update maximum memory size in byte of this memory block */
   public void setTotalMemorySizeInBytes(final long totalMemorySizeInBytes) {
     this.totalMemorySizeInBytes = totalMemorySizeInBytes;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java
index 03fb10d393a..072a45daac6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java
@@ -26,6 +26,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.LongUnaryOperator;
 
 public class MemoryManager {
@@ -46,10 +48,13 @@ public class MemoryManager {
   /** Whether memory management is enabled */
   private final boolean enabled;
 
-  /** The total memory size in byte of memory manager */
+  /** The memory size of this memory manager allocated by parent memory 
manager */
+  private volatile long initialAllocatedMemorySizeInBytes = 0L;
+
+  /** The max memory size of this memory manager */
   private volatile long totalMemorySizeInBytes;
 
-  /** The allocated memory size */
+  /** The allocated memory size of this memory manager */
   private volatile long allocatedMemorySizeInBytes = 0L;
 
   /** The parent memory manager */
@@ -61,10 +66,15 @@ public class MemoryManager {
   /** The allocated memory blocks of this memory manager */
   private final Map<String, IMemoryBlock> allocatedMemoryBlocks = new 
ConcurrentHashMap<>();
 
+  /** Update related parameters when totalMemorySizeInBytes update */
+  private final AtomicReference<BiConsumer<Long, Long>> memoryUpdateCallback =
+      new AtomicReference<>();
+
   @TestOnly
   public MemoryManager(long totalMemorySizeInBytes) {
     this.name = "Test";
     this.parentMemoryManager = null;
+    this.initialAllocatedMemorySizeInBytes = totalMemorySizeInBytes;
     this.totalMemorySizeInBytes = totalMemorySizeInBytes;
     this.enabled = false;
   }
@@ -72,6 +82,7 @@ public class MemoryManager {
   MemoryManager(String name, MemoryManager parentMemoryManager, long 
totalMemorySizeInBytes) {
     this.name = name;
     this.parentMemoryManager = parentMemoryManager;
+    this.initialAllocatedMemorySizeInBytes = totalMemorySizeInBytes;
     this.totalMemorySizeInBytes = totalMemorySizeInBytes;
     this.enabled = false;
   }
@@ -83,6 +94,7 @@ public class MemoryManager {
       boolean enabled) {
     this.name = name;
     this.parentMemoryManager = parentMemoryManager;
+    this.initialAllocatedMemorySizeInBytes = totalMemorySizeInBytes;
     this.totalMemorySizeInBytes = totalMemorySizeInBytes;
     this.enabled = enabled;
   }
@@ -356,24 +368,6 @@ public class MemoryManager {
     return getOrCreateMemoryManager(name, totalMemorySizeInBytes, false);
   }
 
-  /**
-   * Re-allocate memory according to ratio
-   *
-   * @param ratio the ratio of new total memory size to old total memory size
-   */
-  private void reAllocateMemoryAccordingToRatio(double ratio) {
-    // first increase the total memory size of this memory manager
-    this.totalMemorySizeInBytes *= ratio;
-    // then re-allocate memory for all memory blocks
-    for (IMemoryBlock block : allocatedMemoryBlocks.values()) {
-      block.setTotalMemorySizeInBytes((long) 
(block.getTotalMemorySizeInBytes() * ratio));
-    }
-    // finally re-allocate memory for all child memory managers
-    for (Map.Entry<String, MemoryManager> entry : children.entrySet()) {
-      entry.getValue().reAllocateMemoryAccordingToRatio(ratio);
-    }
-  }
-
   /**
    * Get the memory manager with specified names in levels
    *
@@ -458,12 +452,13 @@ public class MemoryManager {
     return totalMemorySizeInBytes;
   }
 
+  @TestOnly
   public void setTotalMemorySizeInBytes(long totalMemorySizeInBytes) {
     this.totalMemorySizeInBytes = totalMemorySizeInBytes;
   }
 
-  public void setTotalMemorySizeInBytesWithReload(long totalMemorySizeInBytes) 
{
-    reAllocateMemoryAccordingToRatio((double) totalMemorySizeInBytes / 
this.totalMemorySizeInBytes);
+  public long getInitialAllocatedMemorySizeInBytes() {
+    return initialAllocatedMemorySizeInBytes;
   }
 
   /** Get available memory size in bytes of memory manager */
@@ -486,8 +481,45 @@ public class MemoryManager {
     return memorySize;
   }
 
+  public MemoryManager setMemoryUpdateCallback(BiConsumer<Long, Long> 
memoryUpdateCallback) {
+    this.memoryUpdateCallback.set(memoryUpdateCallback);
+    return this;
+  }
+
   // endregion
 
+  /**
+   * Resize memory by ratio
+   *
+   * @param ratio the ratio to resize memory, values [0.0, 1.0]
+   */
+  public synchronized void resizeByRatio(double ratio) {
+    // Update initial allocated memory size by ratio
+    long beforeInitialAllocatedMemorySizeInBytes = 
this.initialAllocatedMemorySizeInBytes;
+    this.initialAllocatedMemorySizeInBytes *= ratio;
+    // Update total memory size by actual size
+    long beforeTotalMemorySizeInBytes = this.totalMemorySizeInBytes;
+    this.totalMemorySizeInBytes +=
+        (this.initialAllocatedMemorySizeInBytes - 
beforeInitialAllocatedMemorySizeInBytes);
+    // Get actual ratio of re-allocate memory size
+    double actualRatio = (double) this.totalMemorySizeInBytes / 
beforeTotalMemorySizeInBytes;
+    // Re-allocate memory for all memory blocks
+    for (IMemoryBlock block : allocatedMemoryBlocks.values()) {
+      this.allocatedMemorySizeInBytes += block.resizeByRatio(actualRatio);
+    }
+    // Re-allocate memory for all child memory managers
+    for (Map.Entry<String, MemoryManager> entry : children.entrySet()) {
+      entry.getValue().resizeByRatio(ratio);
+    }
+    if (memoryUpdateCallback.get() != null) {
+      try {
+        memoryUpdateCallback.get().accept(beforeTotalMemorySizeInBytes, 
totalMemorySizeInBytes);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to execute the update callback.", e);
+      }
+    }
+  }
+
   @Override
   public String toString() {
     return "MemoryManager{"
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryPeriodicalJobExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryPeriodicalJobExecutor.java
new file mode 100644
index 00000000000..59662dc5a29
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryPeriodicalJobExecutor.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.memory;
+
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.utils.TestOnly;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class MemoryPeriodicalJobExecutor {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MemoryPeriodicalJobExecutor.class);
+
+  private final ScheduledExecutorService executorService;
+  private final long minIntervalSeconds;
+
+  private long rounds;
+  private Future<?> executorFuture;
+
+  private final List<Pair<WrappedRunnable, Long>> periodicalJobs = new 
CopyOnWriteArrayList<>();
+
+  public MemoryPeriodicalJobExecutor(
+      final ScheduledExecutorService executorService, final long 
minIntervalSeconds) {
+    this.executorService = executorService;
+    this.minIntervalSeconds = minIntervalSeconds;
+  }
+
+  public void register(String id, Runnable periodicalJob, long 
intervalInSeconds) {
+    periodicalJobs.add(
+        new Pair<>(
+            new WrappedRunnable() {
+              @Override
+              public void runMayThrow() {
+                try {
+                  periodicalJob.run();
+                } catch (Exception e) {
+                  LOGGER.warn("Periodical job {} failed.", id, e);
+                }
+              }
+            },
+            Math.max(intervalInSeconds / minIntervalSeconds, 1)));
+    LOGGER.info(
+        "Memory periodical job {} is registered successfully. Interval: {} 
seconds.",
+        id,
+        Math.max(intervalInSeconds / minIntervalSeconds, 1) * 
minIntervalSeconds);
+  }
+
+  public synchronized void start() {
+    if (executorFuture == null) {
+      rounds = 0;
+
+      executorFuture =
+          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+              executorService,
+              this::execute,
+              minIntervalSeconds,
+              minIntervalSeconds,
+              TimeUnit.SECONDS);
+      LOGGER.info("Memory periodical job executor is started successfully.");
+    }
+  }
+
+  protected void execute() {
+    ++rounds;
+
+    for (final Pair<WrappedRunnable, Long> periodicalJob : periodicalJobs) {
+      if (rounds % periodicalJob.right == 0) {
+        periodicalJob.left.run();
+      }
+    }
+  }
+
+  public synchronized void stop() {
+    if (executorFuture != null) {
+      executorFuture.cancel(false);
+      executorFuture = null;
+      LOGGER.info("Memory periodical job executor is stopped successfully.");
+    }
+  }
+
+  @TestOnly
+  public void clear() {
+    periodicalJobs.clear();
+    LOGGER.info("All memory periodical jobs are cleared successfully.");
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryRuntimeController.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryRuntimeController.java
new file mode 100644
index 00000000000..89b1976b00c
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryRuntimeController.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.memory;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class MemoryRuntimeController implements IService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MemoryRuntimeController.class);
+  private static final CommonConfig CONFIG = 
CommonDescriptor.getInstance().getConfig();
+  private static final MemoryManager ON_HEAP_MEMORY_MANAGER =
+      MemoryConfig.global().getMemoryManager("OnHeap");
+  private static final boolean ENABLE_MEMORY_ADAPT = 
CONFIG.isEnableMemoryAdapt();
+  private static final long MEMORY_ADAPT_INTERVAL_IN_S = 
CONFIG.getMemoryAdaptIntervalInS();
+
+  private static final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+  private static final MemoryPeriodicalJobExecutor memoryPeriodicalJobExecutor 
=
+      new MemoryPeriodicalJobExecutor(
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+              ThreadName.MEMORY_PERIODICAL_JOB_EXECUTOR.getName()),
+          MEMORY_ADAPT_INTERVAL_IN_S);
+
+  @Override
+  public void start() throws StartupException {
+    memoryPeriodicalJobExecutor.start();
+
+    // Try to adapt total memory size according to the JVM total memory size
+    if (ENABLE_MEMORY_ADAPT) {
+      LOGGER.info(
+          "Enable automatic memory adapt with an interval of {} s", 
MEMORY_ADAPT_INTERVAL_IN_S);
+      MemoryRuntimeController.getInstance()
+          .registerPeriodicalJob(
+              "MemoryRuntimeAgent#adaptTotalMemory()",
+              this::adaptTotalMemory,
+              MEMORY_ADAPT_INTERVAL_IN_S);
+    }
+
+    isShutdown.set(false);
+  }
+
+  private void adaptTotalMemory() {
+    long totalMemory = Runtime.getRuntime().totalMemory();
+    if (ON_HEAP_MEMORY_MANAGER != null) {
+      long originMemorySize = 
ON_HEAP_MEMORY_MANAGER.getInitialAllocatedMemorySizeInBytes();
+      if (originMemorySize != totalMemory) {
+        LOGGER.info("Total memory size changed from {} to {}", 
originMemorySize, totalMemory);
+        ON_HEAP_MEMORY_MANAGER.resizeByRatio((double) totalMemory / 
originMemorySize);
+      }
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (isShutdown.get()) {
+      return;
+    }
+    isShutdown.set(true);
+
+    memoryPeriodicalJobExecutor.stop();
+  }
+
+  public void registerPeriodicalJob(String id, Runnable periodicalJob, long 
intervalInSeconds) {
+    memoryPeriodicalJobExecutor.register(id, periodicalJob, intervalInSeconds);
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.MEMORY_RUNTIME_CONTROLLER;
+  }
+
+  private static class MemoryRuntimeAgentHolder {
+    private static final MemoryRuntimeController HANDLE = new 
MemoryRuntimeController();
+  }
+
+  public static MemoryRuntimeController getInstance() {
+    return MemoryRuntimeAgentHolder.HANDLE;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 7267c79a665..098891c2937 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -80,7 +80,7 @@ public enum ServiceType {
   PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE(
       "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader"),
   AINode_RPC_SERVICE("Rpc Service for AINode", "AINodeRPCService"),
-  MEMORY_RUNTIME_AGENT("Memory Runtime Agent", "MemoryRuntimeAgent"),
+  MEMORY_RUNTIME_CONTROLLER("Memory Runtime Controller", 
"MemoryRuntimeController"),
   PIPE_RUNTIME_DATA_NODE_AGENT("Pipe Runtime Data Node Agent", 
"PipeRuntimeDataNodeAgent"),
   PIPE_RUNTIME_CONFIG_NODE_AGENT("Pipe Runtime Config Node Agent", 
"PipeRuntimeConfigNodeAgent"),
   SUBSCRIPTION_RUNTIME_AGENT("Subscription Runtime Agent", 
"SubscriptionRuntimeAgent"),

Reply via email to