This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1073715a081 Add RPC auto resizing buffer memory control (#17911)
1073715a081 is described below

commit 1073715a081f4e67d3b2963694a8fca049805d6d
Author: Jiang Tian <[email protected]>
AuthorDate: Thu Jun 25 15:39:10 2026 +0800

    Add RPC auto resizing buffer memory control (#17911)
    
    * ver1
    
    * ver2
    
    * ver3
    
    * ver4
    
    * ver4
    
    * update default config value
    
    * Disable when proportion <= 0
    
    * fix review
    
    * disable mem-control in ITs
    
    * Replace auto_resizing_buffer_memory_proportion with 
datanode_memory_proportion
    
    * Add AutoResizingBuffer memory metrics
    
    * Fix it
---
 .../db/it/IoTDBAutoResizingBufferMemoryIT.java     | 274 +++++++++++++++++++++
 .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java  |   2 +-
 .../manual/AbstractPipeTableModelDualManualIT.java |   2 +
 .../manual/basic/IoTDBPipePermissionIT.java        |   2 +
 .../manual/basic/IoTDBPipeProtocolIT.java          |   6 +
 .../tablemodel/manual/basic/IoTDBPipeSourceIT.java |   2 +
 .../manual/basic/IoTDBPipeWithLoadIT.java          |   2 +
 .../manual/enhanced/IoTDBPipeAutoConflictIT.java   |   2 +
 .../manual/enhanced/IoTDBPipeAutoDropIT.java       |   2 +
 .../manual/enhanced/IoTDBPipeClusterIT.java        |   2 +
 .../enhanced/IoTDBPipeSinkCompressionIT.java       |   4 +-
 .../auto/AbstractPipeDualTreeModelAutoIT.java      |   6 +-
 .../treemodel/auto/basic/IoTDBPipeAutoSplitIT.java |   2 +
 .../treemodel/auto/basic/IoTDBPipeProcessorIT.java |   2 +
 .../treemodel/auto/basic/IoTDBPipeProtocolIT.java  |   6 +
 .../IoTDBPipeReceiverAutoCreateDisabledIT.java     |   1 +
 .../treemodel/auto/basic/IoTDBPipeSourceIT.java    |   2 +
 .../auto/enhanced/IoTDBPipeAutoConflictIT.java     |   2 +
 .../auto/enhanced/IoTDBPipeClusterIT.java          |   2 +
 .../auto/enhanced/IoTDBPipeIdempotentIT.java       |   2 +
 .../auto/enhanced/IoTDBPipeSinkCompressionIT.java  |   2 +
 .../auto/enhanced/IoTDBPipeWithLoadIT.java         |   2 +
 .../manual/AbstractPipeDualTreeModelManualIT.java  |   4 +-
 .../manual/IoTDBPipeMetaHistoricalIT.java          |   4 +-
 .../manual/IoTDBPipeMetaLeaderChangeIT.java        |   7 +-
 .../treemodel/manual/IoTDBPipePermissionIT.java    |   4 +-
 .../iotdb/pipe/it/single/AbstractPipeSingleIT.java |   1 +
 .../single/IoTDBLegacyPipeReceiverSecurityIT.java  |   1 +
 .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java     |   1 +
 .../pipe/it/single/IoTDBPipePermissionIT.java      |   1 +
 .../pipe/it/triple/AbstractPipeTripleManualIT.java |   5 +-
 .../relational/it/db/it/IoTDBLoadTsFileIT.java     |   1 +
 iotdb-client/client-go                             |   2 +-
 .../en/org/apache/iotdb/rpc/i18n/RpcMessages.java  |   6 +
 .../zh/org/apache/iotdb/rpc/i18n/RpcMessages.java  |   6 +
 .../org/apache/iotdb/rpc/AutoResizingBuffer.java   |  53 +++-
 .../iotdb/rpc/AutoResizingBufferMemoryControl.java |  25 +-
 .../iotdb/rpc/AutoResizingBufferMemoryManager.java | 110 +++++++++
 .../iotdb/rpc/AutoScalingBufferReadTransport.java  |  18 +-
 .../iotdb/rpc/AutoScalingBufferWriteTransport.java |  20 +-
 .../rpc/TCompressedElasticFramedTransport.java     |  29 ++-
 .../apache/iotdb/rpc/TElasticFramedTransport.java  |  38 ++-
 .../iotdb/rpc/TSnappyElasticFramedTransport.java   |   8 +-
 .../rpc/TimeoutChangeableTFastFramedTransport.java |   6 +-
 .../TimeoutChangeableTSnappyFramedTransport.java   |   6 +-
 .../apache/iotdb/rpc/AutoResizingBufferTest.java   | 193 +++++++++++++++
 .../iotdb/confignode/service/ConfigNode.java       |   2 +
 .../apache/iotdb/consensus/ratis/utils/Utils.java  |  23 +-
 .../apache/iotdb/db/conf/DataNodeMemoryConfig.java |   9 +
 .../conf/iotdb-system.properties.template          |   1 +
 .../iotdb/commons/conf/CommonDescriptor.java       |   1 -
 .../memory/AutoResizingBufferMemoryMetrics.java    | 112 +++++++++
 .../apache/iotdb/commons/memory/MemoryConfig.java  |  87 ++++++-
 pom.xml                                            |   1 +
 54 files changed, 1048 insertions(+), 66 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBAutoResizingBufferMemoryIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBAutoResizingBufferMemoryIT.java
new file mode 100644
index 00000000000..ead744f5497
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBAutoResizingBufferMemoryIT.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBAutoResizingBufferMemoryIT {
+
+  private static final int DATANODE_MAX_HEAP_SIZE_IN_MB = 256;
+  private static final int AUTO_RESIZING_BUFFER_COUNT_PER_CONNECTION = 2;
+  private static final int CONNECTION_COUNT_OVERFLOW_MARGIN = 1;
+  private static final int INITIAL_GROWING_REQUEST_PAYLOAD_SIZE = 16 * 1024;
+  private static final int MAX_GROWING_REQUEST_PAYLOAD_SIZE =
+      calculateNextPowerOfTwo(calculateAutoResizingBufferMemorySizeInBytes());
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getDataNodeJVMConfig()
+        .setMaxHeapSize(DATANODE_MAX_HEAP_SIZE_IN_MB);
+    EnvFactory.getEnv().initClusterEnvironment();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void 
testNewConnectionsWithWritesAreRejectedWhenBufferMemoryIsExhausted()
+      throws Exception {
+    List<Connection> heldConnections = new ArrayList<>();
+    boolean rejected = false;
+
+    try {
+      try (Connection connection = EnvFactory.getEnv().getConnection();
+          Statement statement = connection.createStatement()) {
+        statement.execute("CREATE DATABASE root.auto_resizing_buffer_reject");
+        statement.execute(
+            "CREATE TIMESERIES root.auto_resizing_buffer_reject.d1.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN");
+      }
+
+      int connectionCountToExhaustBufferMemory =
+          calculateConnectionCountToExhaustAutoResizingBufferMemory();
+      for (int i = 0; i < connectionCountToExhaustBufferMemory; i++) {
+        try {
+          Connection connection = EnvFactory.getEnv().getConnection();
+          heldConnections.add(connection);
+          try (Statement statement = connection.createStatement()) {
+            statement.execute(
+                String.format(
+                    "INSERT INTO root.auto_resizing_buffer_reject.d1(time, s1) 
VALUES (%d, %d)",
+                    i + 1, i));
+          }
+        } catch (Exception e) {
+          rejected = true;
+          break;
+        }
+      }
+    } finally {
+      for (Connection connection : heldConnections) {
+        closeQuietly(connection);
+      }
+    }
+
+    Assert.assertTrue(
+        "Expected new connections with writes to be rejected after 
AutoResizingBuffer memory is exhausted",
+        rejected);
+  }
+
+  @Test
+  public void testGrowingRequestsAreRejectedWhenBufferMemoryIsExhausted() 
throws Exception {
+    boolean rejected = false;
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE 
root.auto_resizing_buffer_growing_request");
+      statement.execute(
+          "CREATE TIMESERIES root.auto_resizing_buffer_growing_request.d1.s1 
WITH DATATYPE=TEXT, ENCODING=PLAIN");
+    }
+
+    // do not use connection because its inevitable retry will slow down the 
test
+    try (ThriftClientContext clientContext = ThriftClientContext.open()) {
+      int payloadSize = INITIAL_GROWING_REQUEST_PAYLOAD_SIZE;
+      while (payloadSize <= MAX_GROWING_REQUEST_PAYLOAD_SIZE) {
+        try {
+          clientContext.executeStatement(
+              String.format(
+                  "INSERT INTO 
root.auto_resizing_buffer_growing_request.d1(time, s1) VALUES (%d, '%s')",
+                  payloadSize, repeat('a', payloadSize)));
+        } catch (Exception e) {
+          rejected = true;
+          clientContext.markBroken();
+          break;
+        }
+        payloadSize = payloadSize << 1;
+      }
+    }
+
+    Assert.assertTrue(
+        "Expected a growing request to be rejected after AutoResizingBuffer 
memory is exhausted",
+        rejected);
+  }
+
+  private static void closeQuietly(AutoCloseable closeable) {
+    if (closeable == null) {
+      return;
+    }
+    try {
+      closeable.close();
+    } catch (Exception ignored) {
+      // ignored
+    }
+  }
+
+  private static int 
calculateConnectionCountToExhaustAutoResizingBufferMemory() {
+    int autoResizingBufferInitialSizePerConnection =
+        AUTO_RESIZING_BUFFER_COUNT_PER_CONNECTION * 
RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
+    return (int)
+        (calculateAutoResizingBufferMemorySizeInBytes() / 
autoResizingBufferInitialSizePerConnection
+            + CONNECTION_COUNT_OVERFLOW_MARGIN);
+  }
+
+  private static long calculateAutoResizingBufferMemorySizeInBytes() {
+    return (DATANODE_MAX_HEAP_SIZE_IN_MB * 1024L * 1024L * 5 / 100); // 5% of 
the max heap size;
+  }
+
+  private static int calculateNextPowerOfTwo(long value) {
+    int result = 1;
+    while (result < value) {
+      result <<= 1;
+    }
+    return result;
+  }
+
+  private static String repeat(char character, int count) {
+    char[] chars = new char[count];
+    Arrays.fill(chars, character);
+    return new String(chars);
+  }
+
+  private static TSOpenSessionReq createOpenSessionReq() {
+    TSOpenSessionReq req = new TSOpenSessionReq();
+    req.setUsername(SessionConfig.DEFAULT_USER);
+    req.setPassword(SessionConfig.DEFAULT_PASSWORD);
+    req.setZoneId(ZoneId.systemDefault().toString());
+    req.putToConfiguration("version", 
IoTDBConstant.ClientVersion.V_1_0.toString());
+    req.putToConfiguration("sql_dialect", "tree");
+    return req;
+  }
+
+  private static class ThriftClientContext implements AutoCloseable {
+
+    private final TTransport transport;
+    private final IClientRPCService.Client client;
+    private final long sessionId;
+    private final long statementId;
+    private boolean broken;
+
+    private ThriftClientContext(
+        TTransport transport, IClientRPCService.Client client, long sessionId, 
long statementId) {
+      this.transport = transport;
+      this.client = client;
+      this.sessionId = sessionId;
+      this.statementId = statementId;
+    }
+
+    private static ThriftClientContext open() throws Exception {
+      DataNodeWrapper dataNode = EnvFactory.getEnv().getDataNodeWrapper(0);
+      TTransport transport =
+          DeepCopyRpcTransportFactory.INSTANCE.getTransport(
+              dataNode.getIp(), dataNode.getPort(), 0);
+      transport.open();
+      IClientRPCService.Client client =
+          new IClientRPCService.Client(new TBinaryProtocol(transport));
+      TSOpenSessionResp openSessionResp = 
client.openSession(createOpenSessionReq());
+      RpcUtils.verifySuccess(openSessionResp.getStatus());
+      long sessionId = openSessionResp.getSessionId();
+      return new ThriftClientContext(
+          transport, client, sessionId, client.requestStatementId(sessionId));
+    }
+
+    private void executeStatement(String sql) throws Exception {
+      TSExecuteStatementResp resp =
+          client.executeStatementV2(new TSExecuteStatementReq(sessionId, sql, 
statementId));
+      RpcUtils.verifySuccess(resp.getStatus());
+    }
+
+    private void markBroken() {
+      broken = true;
+    }
+
+    @Override
+    public void close() throws Exception {
+      try {
+        if (!broken) {
+          client.closeOperation(new 
TSCloseOperationReq(sessionId).setStatementId(statementId));
+          client.closeSession(new TSCloseSessionReq(sessionId));
+        }
+      } finally {
+        transport.close();
+      }
+    }
+  }
+
+  private static boolean checkConfigFileContains(AbstractNodeWrapper 
nodeWrapper, String content) {
+    try {
+      String systemPropertiesPath =
+          nodeWrapper.getNodePath()
+              + File.separator
+              + "conf"
+              + File.separator
+              + CommonConfig.SYSTEM_CONFIG_NAME;
+      return new String(Files.readAllBytes(new 
File(systemPropertiesPath).toPath()))
+          .contains(content);
+    } catch (Exception ignore) {
+      return false;
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 19a4214bed2..1eda0f06226 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -86,7 +86,7 @@ public class IoTDBLoadTsFileIT {
     
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(PARTITION_INTERVAL);
     
EnvFactory.getEnv().getConfig().getCommonConfig().setEnforceStrongPassword(false);
     
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
-    
EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("1:10:1:1:1:1");
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("1:10:1:1:1:0");
     EnvFactory.getEnv()
         .getConfig()
         .getDataNodeConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
index 3b3fae80902..e304fc81d96 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
@@ -46,6 +46,7 @@ public abstract class AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setEnforceStrongPassword(false)
@@ -56,6 +57,7 @@ public abstract class AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setEnforceStrongPassword(false)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
index f233596fc60..dc5b3218be7 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
@@ -65,6 +65,7 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setDefaultSchemaRegionGroupNumPerDatabase(1)
         .setTimestampPrecision("ms")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -77,6 +78,7 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setTimestampPrecision("ms")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
index 3fce11bf51f..647d5ab1515 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
@@ -71,6 +71,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(dataRegionConsensus)
@@ -83,6 +84,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(configNodeConsensus)
         .setSchemaRegionConsensusProtocolClass(schemaRegionConsensus)
         .setDataRegionConsensusProtocolClass(dataRegionConsensus)
@@ -167,6 +169,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -179,6 +182,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -361,6 +365,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setPipeAirGapReceiverEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -377,6 +382,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setPipeAirGapReceiverEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
index 022d71d5487..b47a3540635 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
@@ -65,6 +65,7 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         // Disable sender compaction for tsfile determination in loose range 
test
@@ -80,6 +81,7 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDnConnectionTimeoutMs(600000)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
index 57b5846fc4b..c1f466d9a72 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
@@ -63,6 +63,7 @@ public class IoTDBPipeWithLoadIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         // Disable sender compaction to test mods
@@ -77,6 +78,7 @@ public class IoTDBPipeWithLoadIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDnConnectionTimeoutMs(600000)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
index de0f0c460ee..404f74058dc 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
@@ -59,6 +59,7 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeTableModelDualManualIT
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -69,6 +70,7 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeTableModelDualManualIT
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
index ff906d8a5af..22d756a8b89 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
@@ -70,6 +70,7 @@ public class IoTDBPipeAutoDropIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setEnforceStrongPassword(false)
@@ -79,6 +80,7 @@ public class IoTDBPipeAutoDropIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setEnforceStrongPassword(false)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index 27297c06471..6fe48f1d866 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -83,6 +83,7 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -98,6 +99,7 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setDataReplicationFactor(2)
         .setSchemaReplicationFactor(3)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
index 547db349e2f..25495364e25 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -71,17 +71,19 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeTableModelDualManual
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDnConnectionTimeoutMs(600000)
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false)
         .setPipeAutoSplitFullEnabled(false);
-    
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+    
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:0");
     receiverEnv
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setPipeAirGapReceiverEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
index a7fae02f6d1..24e9b04d6ff 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
@@ -54,23 +54,25 @@ public abstract class AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setEnforceStrongPassword(false)
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false)
         .setPipeAutoSplitFullEnabled(false);
-    
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+    
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:0");
     receiverEnv
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false)
         .setPipeAutoSplitFullEnabled(false);
-    
receiverEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:9:1");
+    
receiverEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:9:0");
 
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
index 97e18346836..048f9a19180 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
@@ -61,6 +61,7 @@ public class IoTDBPipeAutoSplitIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setEnforceStrongPassword(false)
@@ -71,6 +72,7 @@ public class IoTDBPipeAutoSplitIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
index dfa7957a574..d17d73516fa 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
@@ -58,6 +58,7 @@ public class IoTDBPipeProcessorIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setTimestampPrecision("ms")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -67,6 +68,7 @@ public class IoTDBPipeProcessorIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
index 8a7ac4f1947..92874b6d539 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
@@ -71,6 +71,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(dataRegionConsensus)
@@ -82,6 +83,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(configNodeConsensus)
         .setSchemaRegionConsensusProtocolClass(schemaRegionConsensus)
         .setDataRegionConsensusProtocolClass(dataRegionConsensus)
@@ -169,6 +171,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -178,6 +181,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -355,6 +359,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setPipeAirGapReceiverEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -368,6 +373,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setPipeAirGapReceiverEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
index 72646f81b86..158dd552c20 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
@@ -66,6 +66,7 @@ public class IoTDBPipeReceiverAutoCreateDisabledIT extends 
AbstractPipeDualTreeM
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setDataReplicationFactor(1)
         .setSchemaReplicationFactor(1);
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index d93cb5e42d4..9af7cc84316 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -67,6 +67,7 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         // Disable sender compaction for tsfile determination in loose range 
test
@@ -81,6 +82,7 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
index f1a623143aa..6953a5a23f4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
@@ -59,6 +59,7 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -68,6 +69,7 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
index 22b6d07f44a..15ae9fca4c5 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
@@ -84,6 +84,7 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -99,6 +100,7 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setDataReplicationFactor(2)
         .setSchemaReplicationFactor(3)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
index 33a68d43ff2..fb8945471b7 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
@@ -62,6 +62,7 @@ public class IoTDBPipeIdempotentIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         // Limit the schemaRegion number to 1 to guarantee the after sql 
executed on the same region
         // of the tested idempotent sql.
         .setDefaultSchemaRegionGroupNumPerDatabase(1)
@@ -77,6 +78,7 @@ public class IoTDBPipeIdempotentIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setSchemaRegionGroupExtensionPolicy("CUSTOM")
         .setDataRegionGroupExtensionPolicy("CUSTOM")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
index 3900506a658..739148f1523 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -69,6 +69,7 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeDualTreeModelAutoIT
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
@@ -79,6 +80,7 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeDualTreeModelAutoIT
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setPipeAirGapReceiverEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
index 26790479f22..0a5e3b0da74 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
@@ -56,6 +56,7 @@ public class IoTDBPipeWithLoadIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         // Disable sender compaction to test mods
@@ -68,6 +69,7 @@ public class IoTDBPipeWithLoadIT extends 
AbstractPipeDualTreeModelAutoIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
index 11f70d944b6..0fec803c2d3 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
@@ -54,17 +54,19 @@ public abstract class AbstractPipeDualTreeModelManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false)
         .setPipeAutoSplitFullEnabled(false);
-    
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+    
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:0");
 
     receiverEnv
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
index 5f936cda516..06b4a72ccf7 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
@@ -57,6 +57,7 @@ public class IoTDBPipeMetaHistoricalIT extends 
AbstractPipeDualTreeModelManualIT
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setDefaultSchemaRegionGroupNumPerDatabase(1)
         .setTimestampPrecision("ms")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -64,11 +65,12 @@ public class IoTDBPipeMetaHistoricalIT extends 
AbstractPipeDualTreeModelManualIT
         .setDnConnectionTimeoutMs(600000)
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
-    
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+    
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:0");
     receiverEnv
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setTimestampPrecision("ms")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaLeaderChangeIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaLeaderChangeIT.java
index 119f7ed59c2..9645ff718df 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaLeaderChangeIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaLeaderChangeIT.java
@@ -53,6 +53,7 @@ public class IoTDBPipeMetaLeaderChangeIT extends 
AbstractPipeDualTreeModelManual
     senderEnv
         .getConfig()
         .getCommonConfig()
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -60,7 +61,11 @@ public class IoTDBPipeMetaLeaderChangeIT extends 
AbstractPipeDualTreeModelManual
 
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
-    receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
+        .setDnConnectionTimeoutMs(600000);
 
     senderEnv.initClusterEnvironment(3, 3, 180);
     receiverEnv.initClusterEnvironment();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index 23d0b48d440..4548c12cef3 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -63,6 +63,7 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualTreeModelManualIT {
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setDefaultSchemaRegionGroupNumPerDatabase(1)
         .setTimestampPrecision("ms")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -71,11 +72,12 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualTreeModelManualIT {
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false)
         .setPipeAutoSplitFullEnabled(false);
-    
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+    
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:0");
     receiverEnv
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setTimestampPrecision("ms")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
index 3ade13c7209..61d4f0157e4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
@@ -36,6 +36,7 @@ abstract class AbstractPipeSingleIT {
     env.getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false)
         .setPipeAutoSplitFullEnabled(false);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
index 8647e776b50..2b1ce913770 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
@@ -54,6 +54,7 @@ public class IoTDBLegacyPipeReceiverSecurityIT {
 
   @BeforeClass
   public static void setUp() {
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("3:3:1:1:1:0");
     EnvFactory.getEnv().initClusterEnvironment();
   }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index b1d0a4dda73..fdee0e30c88 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -74,6 +74,7 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
     env.getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setPipeMemoryManagementEnabled(false)
         .setDataReplicationFactor(1)
         .setSchemaReplicationFactor(1)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
index c7fa9d12c4e..45fa762b02b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
@@ -51,6 +51,7 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeSingleIT {
     env.getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setPipeMemoryManagementEnabled(false)
         .setDataReplicationFactor(1)
         .setSchemaReplicationFactor(1)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
index f4e63e1d2f8..c1d92c7916b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
@@ -48,16 +48,18 @@ abstract class AbstractPipeTripleManualIT {
     env1.getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false)
         .setPipeAutoSplitFullEnabled(false);
-    
env1.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+    
env1.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:0");
 
     env2.getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
@@ -67,6 +69,7 @@ abstract class AbstractPipeTripleManualIT {
     env3.getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
+        .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
index b351ace5a4d..c4444c4bf55 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
@@ -71,6 +71,7 @@ public class IoTDBLoadTsFileIT {
     tmpDir = new File(Files.createTempDirectory("load").toUri());
     
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(PARTITION_INTERVAL);
     
EnvFactory.getEnv().getConfig().getCommonConfig().setEnforceStrongPassword(false);
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("3:3:1:1:1:0");
     EnvFactory.getEnv()
         .getConfig()
         .getDataNodeConfig()
diff --git a/iotdb-client/client-go b/iotdb-client/client-go
index dc64b1a7648..2ea2655e090 160000
--- a/iotdb-client/client-go
+++ b/iotdb-client/client-go
@@ -1 +1 @@
-Subproject commit dc64b1a7648d3c505c10eed5419f422bb49f1def
+Subproject commit 2ea2655e090dcefd12bf1a789a51c8df9a28fa24
diff --git 
a/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java
 
b/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java
index 0e545164e8d..5c89472be5b 100644
--- 
a/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java
+++ 
b/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java
@@ -50,6 +50,12 @@ public final class RpcMessages {
   public static final String COULD_NOT_LOAD_KEYSTORE =
       "Could not load keystore or truststore file";
 
+  // AutoResizingBuffer
+  public static final String AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED =
+      "AutoResizingBuffer was interrupted while allocating %d bytes";
+  public static final String AUTO_RESIZING_BUFFER_ALLOCATE_FAILED =
+      "AutoResizingBuffer failed to allocate %d bytes after %d retries";
+
   // IoTDBRpcDataSet / IoTDBJDBCDataSet
   public static final String CLOSE_OPERATION_SERVER_ERROR =
       "Error occurs for close operation in server side because ";
diff --git 
a/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java
 
b/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java
index 03faa1dbb83..d12d4593a39 100644
--- 
a/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java
+++ 
b/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java
@@ -44,6 +44,12 @@ public final class RpcMessages {
   // BaseRpcTransportFactory
   public static final String COULD_NOT_LOAD_KEYSTORE = "无法加载密钥库或信任库文件";
 
+  // AutoResizingBuffer
+  public static final String AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED =
+      "AutoResizingBuffer 分配 %d 字节内存时被中断";
+  public static final String AUTO_RESIZING_BUFFER_ALLOCATE_FAILED =
+      "AutoResizingBuffer 在 %d 次重试后仍无法分配 %d 字节内存";
+
   // IoTDBRpcDataSet / IoTDBJDBCDataSet
   public static final String CLOSE_OPERATION_SERVER_ERROR = "服务端关闭操作失败,原因:";
   public static final String CLOSE_OPERATION_CONNECTION_ERROR = 
"连接服务端执行关闭操作时出错 ";
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
index 180d05b3d93..397a5843171 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.rpc;
 
+import java.io.IOException;
 import java.util.Arrays;
 
 /**
@@ -31,33 +32,38 @@ import java.util.Arrays;
  * required size < current capacity * 0.6, and such small requests last for 
more than 5 times,
  * shrink to the middle of the required size and current capacity.
  */
-class AutoResizingBuffer {
+class AutoResizingBuffer implements AutoCloseable {
 
   private byte[] array;
   private int bufTooLargeCounter = RpcUtils.MAX_BUFFER_OVERSIZE_TIME;
   private final int initialCapacity;
   private long lastShrinkTime;
+  private int accountedCapacity;
+  private boolean closed;
 
-  public AutoResizingBuffer(int initialCapacity) {
+  public AutoResizingBuffer(int initialCapacity) throws IOException {
+    AutoResizingBufferMemoryManager.allocate(initialCapacity);
     this.array = new byte[initialCapacity];
     this.initialCapacity = initialCapacity;
+    this.accountedCapacity = initialCapacity;
   }
 
-  public void resizeIfNecessary(int size) {
+  public void resizeIfNecessary(int size) throws IOException {
+    reserveCurrentCapacityIfReleased();
     final int currentCapacity = this.array.length;
     final double loadFactor = 0.6;
     if (currentCapacity < size) {
       // Increase by a factor of 1.5x
       int growCapacity = currentCapacity + (currentCapacity >> 1);
       int newCapacity = Math.max(growCapacity, size);
-      this.array = Arrays.copyOf(array, newCapacity);
+      resize(newCapacity);
       bufTooLargeCounter = RpcUtils.MAX_BUFFER_OVERSIZE_TIME;
     } else if (size > initialCapacity
         && currentCapacity * loadFactor > size
         && bufTooLargeCounter-- <= 0
         && System.currentTimeMillis() - lastShrinkTime > 
RpcUtils.MIN_SHRINK_INTERVAL) {
       // do not resize if it is reading the request size and do not shrink too 
often
-      array = Arrays.copyOf(array, size + (currentCapacity - size) / 2);
+      resize(size + (currentCapacity - size) / 2);
       bufTooLargeCounter = RpcUtils.MAX_BUFFER_OVERSIZE_TIME;
       lastShrinkTime = System.currentTimeMillis();
     }
@@ -66,4 +72,41 @@ class AutoResizingBuffer {
   public byte[] array() {
     return this.array;
   }
+
+  @Override
+  public void close() {
+    if (!closed) {
+      AutoResizingBufferMemoryManager.release(accountedCapacity);
+      accountedCapacity = 0;
+      closed = true;
+    }
+  }
+
+  private void resize(int newCapacity) throws IOException {
+    final int currentCapacity = array.length;
+    if (newCapacity > currentCapacity) {
+      final int delta = newCapacity - currentCapacity;
+      AutoResizingBufferMemoryManager.allocate(delta);
+      try {
+        array = Arrays.copyOf(array, newCapacity);
+        accountedCapacity += delta;
+      } catch (RuntimeException | Error e) {
+        AutoResizingBufferMemoryManager.release(delta);
+        throw e;
+      }
+    } else if (newCapacity < currentCapacity) {
+      array = Arrays.copyOf(array, newCapacity);
+      final int delta = currentCapacity - newCapacity;
+      AutoResizingBufferMemoryManager.release(delta);
+      accountedCapacity -= delta;
+    }
+  }
+
+  private void reserveCurrentCapacityIfReleased() throws IOException {
+    if (closed) {
+      AutoResizingBufferMemoryManager.allocate(array.length);
+      accountedCapacity = array.length;
+      closed = false;
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryControl.java
similarity index 57%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
copy to 
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryControl.java
index dc1ca4e5473..49128727a0b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryControl.java
@@ -17,27 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.memory;
+package org.apache.iotdb.rpc;
 
-public class MemoryConfig {
-  private final MemoryManager globalMemoryManager =
-      new MemoryManager("GlobalMemoryManager", null, 
Runtime.getRuntime().totalMemory());
+/** Memory accounting hook for {@link AutoResizingBuffer}. */
+public interface AutoResizingBufferMemoryControl {
 
-  private MemoryConfig() {
-    // singleton
-  }
+  boolean allocate(long sizeInBytes);
 
-  public static MemoryManager global() {
-    return MemoryConfigHolder.INSTANCE.globalMemoryManager;
-  }
-
-  public static MemoryConfig getInstance() {
-    return MemoryConfigHolder.INSTANCE;
-  }
-
-  private static class MemoryConfigHolder {
-    private static final MemoryConfig INSTANCE = new MemoryConfig();
-
-    private MemoryConfigHolder() {}
-  }
+  void release(long sizeInBytes);
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java
new file mode 100644
index 00000000000..d95d51adb48
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.iotdb.rpc.i18n.RpcMessages;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
+public final class AutoResizingBufferMemoryManager {
+  private static final int MEMORY_ALLOCATE_MAX_RETRIES = 5;
+  private static final long DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS = 
2_000L;
+
+  private static final AutoResizingBufferMemoryControl NO_OP_MEMORY_CONTROL =
+      new AutoResizingBufferMemoryControl() {
+        @Override
+        public boolean allocate(long sizeInBytes) {
+          return true;
+        }
+
+        @Override
+        public void release(long sizeInBytes) {
+          // Do nothing.
+        }
+      };
+
+  private static volatile AutoResizingBufferMemoryControl memoryControl = 
NO_OP_MEMORY_CONTROL;
+  private static volatile long memoryAllocateRetryIntervalInMs =
+      DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS;
+  private static final AtomicLong memoryAllocationCount = new AtomicLong();
+  private static final AtomicLong memoryAllocationFailureCount = new 
AtomicLong();
+
+  private AutoResizingBufferMemoryManager() {
+    // Utility class.
+  }
+
+  public static void setMemoryControl(AutoResizingBufferMemoryControl 
memoryControl) {
+    AutoResizingBufferMemoryManager.memoryControl = 
Objects.requireNonNull(memoryControl);
+  }
+
+  static void resetMemoryControl() {
+    memoryControl = NO_OP_MEMORY_CONTROL;
+    memoryAllocateRetryIntervalInMs = 
DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS;
+    memoryAllocationCount.set(0);
+    memoryAllocationFailureCount.set(0);
+  }
+
+  static void setMemoryAllocateRetryIntervalInMs(long 
memoryAllocateRetryIntervalInMs) {
+    AutoResizingBufferMemoryManager.memoryAllocateRetryIntervalInMs =
+        memoryAllocateRetryIntervalInMs;
+  }
+
+  static void allocate(long sizeInBytes) throws IOException {
+    if (sizeInBytes <= 0) {
+      return;
+    }
+    for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) {
+      if (memoryControl.allocate(sizeInBytes)) {
+        memoryAllocationCount.incrementAndGet();
+        return;
+      }
+      try {
+        Thread.sleep(memoryAllocateRetryIntervalInMs);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(
+            
String.format(RpcMessages.AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED, 
sizeInBytes), e);
+      }
+    }
+    memoryAllocationFailureCount.incrementAndGet();
+    throw new IOException(
+        String.format(
+            RpcMessages.AUTO_RESIZING_BUFFER_ALLOCATE_FAILED,
+            sizeInBytes,
+            MEMORY_ALLOCATE_MAX_RETRIES));
+  }
+
+  static void release(long sizeInBytes) {
+    if (sizeInBytes <= 0) {
+      return;
+    }
+    memoryControl.release(sizeInBytes);
+  }
+
+  public static long getMemoryAllocationCount() {
+    return memoryAllocationCount.get();
+  }
+
+  public static long getMemoryAllocationFailureCount() {
+    return memoryAllocationFailureCount.get();
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
index 35833321ba3..035f8753de1 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
@@ -23,18 +23,24 @@ import org.apache.thrift.TConfiguration;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
+import java.io.IOException;
+
 public class AutoScalingBufferReadTransport extends NonOpenTransport {
 
   private final AutoResizingBuffer buf;
   private int pos = 0;
   private int limit = 0;
 
-  public AutoScalingBufferReadTransport(int initialCapacity) {
+  public AutoScalingBufferReadTransport(int initialCapacity) throws 
IOException {
     this.buf = new AutoResizingBuffer(initialCapacity);
   }
 
   public void fill(TTransport inTrans, int length) throws TTransportException {
-    buf.resizeIfNecessary(length);
+    try {
+      buf.resizeIfNecessary(length);
+    } catch (IOException e) {
+      throw new TTransportException(e);
+    }
     inTrans.readAll(buf.array(), 0, length);
     pos = 0;
     limit = length;
@@ -89,10 +95,16 @@ public class AutoScalingBufferReadTransport extends 
NonOpenTransport {
     return limit - pos;
   }
 
-  public void resizeIfNecessary(int size) {
+  public void resizeIfNecessary(int size) throws IOException {
     buf.resizeIfNecessary(size);
   }
 
+  @Override
+  public void close() {
+    super.close();
+    buf.close();
+  }
+
   public void limit(int newLimit) {
     this.limit = newLimit;
   }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
index ae833fad239..24278067fcd 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.rpc;
 import org.apache.thrift.TConfiguration;
 import org.apache.thrift.transport.TTransportException;
 
+import java.io.IOException;
+
 /**
  * Note that this class is mainly copied from class {@link
  * org.apache.thrift.transport.AutoExpandingBufferWriteTransport}. since that 
class does not support
@@ -32,7 +34,7 @@ public class AutoScalingBufferWriteTransport extends 
NonOpenTransport {
   private final AutoResizingBuffer buf;
   private int pos;
 
-  public AutoScalingBufferWriteTransport(int initialCapacity) {
+  public AutoScalingBufferWriteTransport(int initialCapacity) throws 
IOException {
     this.buf = new AutoResizingBuffer(initialCapacity);
     this.pos = 0;
   }
@@ -43,8 +45,12 @@ public class AutoScalingBufferWriteTransport extends 
NonOpenTransport {
   }
 
   @Override
-  public void write(byte[] toWrite, int off, int len) {
-    buf.resizeIfNecessary(pos + len);
+  public void write(byte[] toWrite, int off, int len) throws 
TTransportException {
+    try {
+      buf.resizeIfNecessary(pos + len);
+    } catch (IOException e) {
+      throw new TTransportException(e);
+    }
     System.arraycopy(toWrite, off, buf.array(), pos, len);
     pos += len;
   }
@@ -57,7 +63,13 @@ public class AutoScalingBufferWriteTransport extends 
NonOpenTransport {
     pos = 0;
   }
 
-  public void resizeIfNecessary(int size) {
+  @Override
+  public void close() {
+    super.close();
+    buf.close();
+  }
+
+  public void resizeIfNecessary(int size) throws IOException {
     buf.resizeIfNecessary(size);
   }
 
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index 62abc28e470..88c9683db97 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -34,10 +34,27 @@ public abstract class TCompressedElasticFramedTransport 
extends TElasticFramedTr
       TTransport underlying,
       int thriftDefaultBufferSize,
       int thriftMaxFrameSize,
-      boolean copyBinary) {
+      boolean copyBinary)
+      throws TTransportException {
     super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
-    writeCompressBuffer = new 
AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
-    readCompressBuffer = new 
AutoScalingBufferReadTransport(thriftDefaultBufferSize);
+    try {
+      writeCompressBuffer = new 
AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
+      readCompressBuffer = new 
AutoScalingBufferReadTransport(thriftDefaultBufferSize);
+    } catch (IOException e) {
+      closeAllocatedBuffers();
+      throw new TTransportException(e);
+    }
+  }
+
+  @Override
+  protected void closeAllocatedBuffers() {
+    super.closeAllocatedBuffers();
+    if (writeCompressBuffer != null) {
+      writeCompressBuffer.close();
+    }
+    if (readCompressBuffer != null) {
+      readCompressBuffer.close();
+    }
   }
 
   @Override
@@ -80,7 +97,11 @@ public abstract class TCompressedElasticFramedTransport 
extends TElasticFramedTr
 
     writeBuffer.reset();
     if (thriftDefaultBufferSize < length) {
-      writeBuffer.resizeIfNecessary(thriftDefaultBufferSize);
+      try {
+        writeBuffer.resizeIfNecessary(thriftDefaultBufferSize);
+      } catch (IOException e) {
+        throw new TTransportException(e);
+      }
     }
     underlying.flush();
   }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 6a174f86b6a..db3d2cc7d95 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -32,6 +32,7 @@ import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLHandshakeException;
 
 import java.io.EOFException;
+import java.io.IOException;
 import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
@@ -74,7 +75,7 @@ public class TElasticFramedTransport extends TTransport {
     }
 
     @Override
-    public TTransport getTransport(TTransport trans) {
+    public TTransport getTransport(TTransport trans) throws 
TTransportException {
       return new TElasticFramedTransport(
           trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
     }
@@ -84,13 +85,19 @@ public class TElasticFramedTransport extends TTransport {
       TTransport underlying,
       int thriftDefaultBufferSize,
       int thriftMaxFrameSize,
-      boolean copyBinary) {
+      boolean copyBinary)
+      throws TTransportException {
     this.underlying = underlying;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
     this.thriftMaxFrameSize = thriftMaxFrameSize;
     this.copyBinary = copyBinary;
-    readBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize);
-    writeBuffer = new AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
+    try {
+      readBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize);
+      writeBuffer = new 
AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
+    } catch (IOException e) {
+      closeAllocatedBuffers();
+      throw new TTransportException(e);
+    }
   }
 
   protected final int thriftDefaultBufferSize;
@@ -115,7 +122,20 @@ public class TElasticFramedTransport extends TTransport {
 
   @Override
   public void close() {
-    underlying.close();
+    try {
+      underlying.close();
+    } finally {
+      closeAllocatedBuffers();
+    }
+  }
+
+  protected void closeAllocatedBuffers() {
+    if (readBuffer != null) {
+      readBuffer.close();
+    }
+    if (writeBuffer != null) {
+      writeBuffer.close();
+    }
   }
 
   @Override
@@ -263,7 +283,11 @@ public class TElasticFramedTransport extends TTransport {
     underlying.write(writeBuffer.getBuffer(), 0, length);
     writeBuffer.reset();
     if (length > thriftDefaultBufferSize) {
-      writeBuffer.resizeIfNecessary(thriftDefaultBufferSize);
+      try {
+        writeBuffer.resizeIfNecessary(thriftDefaultBufferSize);
+      } catch (IOException e) {
+        throw new TTransportException(e);
+      }
     }
     underlying.flush();
   }
@@ -292,7 +316,7 @@ public class TElasticFramedTransport extends TTransport {
   }
 
   @Override
-  public void write(byte[] buf, int off, int len) {
+  public void write(byte[] buf, int off, int len) throws TTransportException {
     writeBuffer.write(buf, off, len);
   }
 
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index 79ebc7983e3..fd9d4ffbba3 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.rpc;
 
 import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 import org.xerial.snappy.Snappy;
 
 import java.io.IOException;
@@ -41,13 +42,13 @@ public class TSnappyElasticFramedTransport extends 
TCompressedElasticFramedTrans
     }
 
     @Override
-    public TTransport getTransport(TTransport trans) {
+    public TTransport getTransport(TTransport trans) throws 
TTransportException {
       return new TSnappyElasticFramedTransport(
           trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
     }
   }
 
-  public TSnappyElasticFramedTransport(TTransport underlying) {
+  public TSnappyElasticFramedTransport(TTransport underlying) throws 
TTransportException {
     this(underlying, RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, 
RpcUtils.THRIFT_FRAME_MAX_SIZE, true);
   }
 
@@ -55,7 +56,8 @@ public class TSnappyElasticFramedTransport extends 
TCompressedElasticFramedTrans
       TTransport underlying,
       int thriftDefaultBufferSize,
       int thriftMaxFrameSize,
-      boolean copyBinary) {
+      boolean copyBinary)
+      throws TTransportException {
     super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
   }
 
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
index 77c2bbf6b97..505e1a99405 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.rpc;
 
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
 import java.net.SocketException;
@@ -31,7 +32,8 @@ public class TimeoutChangeableTFastFramedTransport extends 
TElasticFramedTranspo
   private final TSocket underlyingSocket;
 
   public TimeoutChangeableTFastFramedTransport(
-      TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize, 
boolean copyBinary) {
+      TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize, 
boolean copyBinary)
+      throws TTransportException {
     super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
     this.underlyingSocket = underlying;
   }
@@ -65,7 +67,7 @@ public class TimeoutChangeableTFastFramedTransport extends 
TElasticFramedTranspo
     }
 
     @Override
-    public TTransport getTransport(TTransport trans) {
+    public TTransport getTransport(TTransport trans) throws 
TTransportException {
       if (trans instanceof TSocket) {
         return new TimeoutChangeableTFastFramedTransport(
             (TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize, 
copyBinary);
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
index 168f52662aa..ecd807e38d6 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.rpc;
 
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
 import java.net.SocketException;
@@ -31,7 +32,8 @@ public class TimeoutChangeableTSnappyFramedTransport extends 
TSnappyElasticFrame
   private final TSocket underlyingSocket;
 
   public TimeoutChangeableTSnappyFramedTransport(
-      TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize, 
boolean copyBinary) {
+      TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize, 
boolean copyBinary)
+      throws TTransportException {
     super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
     this.underlyingSocket = underlying;
   }
@@ -65,7 +67,7 @@ public class TimeoutChangeableTSnappyFramedTransport extends 
TSnappyElasticFrame
     }
 
     @Override
-    public TTransport getTransport(TTransport trans) {
+    public TTransport getTransport(TTransport trans) throws 
TTransportException {
       if (trans instanceof TSocket) {
         return new TimeoutChangeableTSnappyFramedTransport(
             (TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize, 
copyBinary);
diff --git 
a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java
 
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java
new file mode 100644
index 00000000000..800881208ee
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AutoResizingBufferTest {
+
+  private final TestMemoryControl memoryControl = new TestMemoryControl();
+
+  @After
+  public void tearDown() {
+    AutoResizingBufferMemoryManager.resetMemoryControl();
+  }
+
+  @Test
+  public void testAllocateAndReleaseMemoryWhenResizing() throws Exception {
+    AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+
+    AutoResizingBuffer buffer = new AutoResizingBuffer(100);
+    Assert.assertEquals(100, memoryControl.getUsedMemoryInBytes());
+
+    buffer.resizeIfNecessary(200);
+    Assert.assertEquals(200, buffer.array().length);
+    Assert.assertEquals(200, memoryControl.getUsedMemoryInBytes());
+
+    setLastShrinkTime(buffer, System.currentTimeMillis() - 
RpcUtils.MIN_SHRINK_INTERVAL - 1);
+    for (int i = 0; i <= RpcUtils.MAX_BUFFER_OVERSIZE_TIME; i++) {
+      buffer.resizeIfNecessary(101);
+    }
+    Assert.assertEquals(150, buffer.array().length);
+    Assert.assertEquals(150, memoryControl.getUsedMemoryInBytes());
+
+    buffer.close();
+    Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+  }
+
+  @Test
+  public void testThrowIOExceptionWhenMemoryIsInsufficient() throws Exception {
+    memoryControl.setLimit(120);
+    AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+    AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1);
+
+    AutoResizingBuffer buffer = new AutoResizingBuffer(100);
+    try {
+      buffer.resizeIfNecessary(200);
+      Assert.fail("Expected IOException");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("100"));
+      Assert.assertTrue(e.getMessage().contains("5"));
+    } finally {
+      Assert.assertEquals(100, memoryControl.getUsedMemoryInBytes());
+      buffer.close();
+    }
+  }
+
+  @Test
+  public void testMemoryAllocationMetrics() throws Exception {
+    memoryControl.setLimit(120);
+    AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+    AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1);
+
+    AutoResizingBuffer buffer = new AutoResizingBuffer(100);
+    Assert.assertEquals(1, 
AutoResizingBufferMemoryManager.getMemoryAllocationCount());
+    Assert.assertEquals(0, 
AutoResizingBufferMemoryManager.getMemoryAllocationFailureCount());
+
+    try {
+      buffer.resizeIfNecessary(200);
+      Assert.fail("Expected IOException");
+    } catch (IOException ignored) {
+      Assert.assertEquals(1, 
AutoResizingBufferMemoryManager.getMemoryAllocationCount());
+      Assert.assertEquals(1, 
AutoResizingBufferMemoryManager.getMemoryAllocationFailureCount());
+    } finally {
+      buffer.close();
+    }
+  }
+
+  @Test
+  public void testElasticFramedTransportReleasesMemoryWhenClosed() throws 
Exception {
+    AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+
+    TElasticFramedTransport transport =
+        new TElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true);
+    Assert.assertEquals(200, memoryControl.getUsedMemoryInBytes());
+
+    transport.close();
+    Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+  }
+
+  @Test
+  public void testElasticFramedTransportReleasesMemoryWhenConstructorFails() {
+    memoryControl.setLimit(150);
+    AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+    AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1);
+
+    try {
+      new TElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true);
+      Assert.fail("Expected TTransportException");
+    } catch (TTransportException e) {
+      Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+    }
+  }
+
+  @Test
+  public void testSnappyElasticFramedTransportReleasesMemory() throws 
Exception {
+    AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+
+    TSnappyElasticFramedTransport transport =
+        new TSnappyElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, 
true);
+    Assert.assertEquals(400, memoryControl.getUsedMemoryInBytes());
+
+    transport.close();
+    Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+  }
+
+  @Test
+  public void 
testSnappyElasticFramedTransportReleasesMemoryWhenConstructorFails() {
+    memoryControl.setLimit(350);
+    AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+    AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1);
+
+    try {
+      new TSnappyElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true);
+      Assert.fail("Expected TTransportException");
+    } catch (TTransportException e) {
+      Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+    }
+  }
+
+  private void setLastShrinkTime(AutoResizingBuffer buffer, long 
lastShrinkTime) throws Exception {
+    Field field = AutoResizingBuffer.class.getDeclaredField("lastShrinkTime");
+    field.setAccessible(true);
+    field.set(buffer, lastShrinkTime);
+  }
+
+  private static class TestMemoryControl implements 
AutoResizingBufferMemoryControl {
+
+    private final AtomicLong usedMemoryInBytes = new AtomicLong();
+    private long limit = Long.MAX_VALUE;
+
+    @Override
+    public boolean allocate(long sizeInBytes) {
+      while (true) {
+        long current = usedMemoryInBytes.get();
+        long next = current + sizeInBytes;
+        if (next > limit) {
+          return false;
+        }
+        if (usedMemoryInBytes.compareAndSet(current, next)) {
+          return true;
+        }
+      }
+    }
+
+    @Override
+    public void release(long sizeInBytes) {
+      usedMemoryInBytes.addAndGet(-sizeInBytes);
+    }
+
+    private long getUsedMemoryInBytes() {
+      return usedMemoryInBytes.get();
+    }
+
+    private void setLimit(long limit) {
+      this.limit = limit;
+    }
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index da766541786..d3effb28e9b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.memory.MemoryConfig;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.service.ServiceType;
@@ -142,6 +143,7 @@ public class ConfigNode extends ServerCommandLine 
implements ConfigNodeMBean {
       LOGGER.info(ConfigNodeMessages.STARTING_IOTDB, 
IoTDBConstant.VERSION_WITH_BUILD);
       ConfigNodeStartupCheck checks = new 
ConfigNodeStartupCheck(IoTDBConstant.CN_ROLE);
       checks.startUpCheck();
+      MemoryConfig.getInstance();
     } catch (StartupException | ConfigurationException | IOException e) {
       LOGGER.error(ConfigNodeMessages.MEET_ERROR_WHEN_DOING_START_CHECKING, e);
       throw new IoTDBException(ConfigNodeMessages.ERROR_STARTING, -1);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
index 442ac484064..cb71ed3fa4d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
@@ -58,8 +58,10 @@ import javax.net.ssl.TrustManager;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.AccessDeniedException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -184,10 +186,23 @@ public class Utils {
 
   public static ByteBuffer serializeTSStatus(TSStatus status) throws 
TException {
     AutoScalingBufferWriteTransport byteBuffer =
-        new AutoScalingBufferWriteTransport(TEMP_BUFFER_SIZE);
-    TCompactProtocol protocol = new TCompactProtocol(byteBuffer);
-    status.write(protocol);
-    return ByteBuffer.wrap(byteBuffer.getBuffer());
+        createAutoScalingBufferWriteTransport(TEMP_BUFFER_SIZE);
+    try {
+      TCompactProtocol protocol = new TCompactProtocol(byteBuffer);
+      status.write(protocol);
+      return ByteBuffer.wrap(Arrays.copyOf(byteBuffer.getBuffer(), 
byteBuffer.getPos()));
+    } finally {
+      byteBuffer.close();
+    }
+  }
+
+  private static AutoScalingBufferWriteTransport 
createAutoScalingBufferWriteTransport(
+      int initialCapacity) throws TException {
+    try {
+      return new AutoScalingBufferWriteTransport(initialCapacity);
+    } catch (IOException e) {
+      throw new TException(e);
+    }
   }
 
   public static TSStatus deserializeFrom(ByteBuffer buffer) throws TException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java
index 3fd49c707fe..07e31454691 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java
@@ -168,6 +168,7 @@ public class DataNodeMemoryConfig {
     long schemaEngineMemorySize = Runtime.getRuntime().maxMemory() / 10;
     long consensusMemorySize = Runtime.getRuntime().maxMemory() / 10;
     long pipeMemorySize = Runtime.getRuntime().maxMemory() / 10;
+    long autoResizingBufferMemorySize = Runtime.getRuntime().maxMemory() / 20;
     if (memoryAllocateProportion != null) {
       String[] proportions = memoryAllocateProportion.split(":");
       int proportionSum = 0;
@@ -189,6 +190,11 @@ public class DataNodeMemoryConfig {
         if (proportions.length >= 6) {
           pipeMemorySize =
               maxMemoryAvailable * Integer.parseInt(proportions[4].trim()) / 
proportionSum;
+          autoResizingBufferMemorySize =
+              maxMemoryAvailable
+                  * Integer.parseInt(proportions[proportions.length - 
1].trim())
+                  / proportionSum
+                  / 2;
         } else {
           pipeMemorySize =
               (maxMemoryAvailable
@@ -211,6 +217,8 @@ public class DataNodeMemoryConfig {
     consensusMemoryManager =
         onHeapMemoryManager.getOrCreateMemoryManager("Consensus", 
consensusMemorySize);
     pipeMemoryManager = onHeapMemoryManager.getOrCreateMemoryManager("Pipe", 
pipeMemorySize);
+    MemoryConfig.getInstance()
+        .setAutoResizingBufferMemoryControl(onHeapMemoryManager, 
autoResizingBufferMemorySize);
     LOGGER.info(
         "initial allocateMemoryForWrite = {}",
         storageEngineMemoryManager.getTotalMemorySizeInBytes());
@@ -224,6 +232,7 @@ public class DataNodeMemoryConfig {
         consensusMemoryManager.getTotalMemorySizeInBytes());
     LOGGER.info(
         "initial allocateMemoryForPipe = {}", 
pipeMemoryManager.getTotalMemorySizeInBytes());
+    LOGGER.info("initial allocateMemoryForAutoResizingBuffer = {}", 
autoResizingBufferMemorySize);
 
     initSchemaMemoryAllocate(schemaEngineMemoryManager, properties);
     initStorageEngineAllocate(storageEngineMemoryManager, properties);
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index facf07fb85f..f176c0e1761 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -796,6 +796,7 @@ partition_table_recover_max_read_megabytes_per_second=10
 # Memory Allocation Ratio: StorageEngine, QueryEngine, SchemaEngine, 
Consensus, StreamingEngine and Free Memory.
 # The parameter form is a:b:c:d:e:f, where a, b, c, d, e and f are integers. 
for example: 1:1:1:1:1:1 , 6:2:1:1:1:1
 # If you have high level of writing pressure and low level of reading 
pressure, please adjust it to for example 6:1:1:1:1:1
+# 50% of Free Memory is used by AutoResizingBuffer memory control.
 # effectiveMode: restart
 datanode_memory_proportion=3:3:1:1:1:1
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 824ec639ef6..eb24b0eeef8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -335,7 +335,6 @@ public class CommonDescriptor {
             properties.getProperty(
                 "cluster_device_limit_threshold",
                 String.valueOf(config.getDeviceLimitThreshold()))));
-
     config.setPathLogMaxSize(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AutoResizingBufferMemoryMetrics.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AutoResizingBufferMemoryMetrics.java
new file mode 100644
index 00000000000..4cc3fd3cd27
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AutoResizingBufferMemoryMetrics.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.memory;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+import org.apache.iotdb.rpc.AutoResizingBufferMemoryManager;
+
+public class AutoResizingBufferMemoryMetrics implements IMetricSet {
+
+  private static final String ALLOCATION_COUNT = 
"auto_resizing_buffer_allocation_count";
+  private static final String ALLOCATION_FAILURE_COUNT =
+      "auto_resizing_buffer_allocation_failure_count";
+  private static final String TOTAL_MEMORY = 
"auto_resizing_buffer_total_memory";
+  private static final String USED_MEMORY = "auto_resizing_buffer_used_memory";
+  private static final String AVAILABLE_MEMORY = 
"auto_resizing_buffer_available_memory";
+
+  private final MemoryConfig memoryConfig;
+
+  public AutoResizingBufferMemoryMetrics(MemoryConfig memoryConfig) {
+    this.memoryConfig = memoryConfig;
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    metricService.createAutoGauge(
+        Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+        MetricLevel.IMPORTANT,
+        AutoResizingBufferMemoryManager.class,
+        ignored -> AutoResizingBufferMemoryManager.getMemoryAllocationCount(),
+        Tag.NAME.toString(),
+        ALLOCATION_COUNT);
+    metricService.createAutoGauge(
+        Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+        MetricLevel.IMPORTANT,
+        AutoResizingBufferMemoryManager.class,
+        ignored -> 
AutoResizingBufferMemoryManager.getMemoryAllocationFailureCount(),
+        Tag.NAME.toString(),
+        ALLOCATION_FAILURE_COUNT);
+    metricService.createAutoGauge(
+        Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+        MetricLevel.IMPORTANT,
+        memoryConfig,
+        MemoryConfig::getAutoResizingBufferMemoryTotalSizeInBytes,
+        Tag.NAME.toString(),
+        TOTAL_MEMORY);
+    metricService.createAutoGauge(
+        Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+        MetricLevel.IMPORTANT,
+        memoryConfig,
+        MemoryConfig::getAutoResizingBufferMemoryUsedSizeInBytes,
+        Tag.NAME.toString(),
+        USED_MEMORY);
+    metricService.createAutoGauge(
+        Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+        MetricLevel.IMPORTANT,
+        memoryConfig,
+        MemoryConfig::getAutoResizingBufferMemoryAvailableSizeInBytes,
+        Tag.NAME.toString(),
+        AVAILABLE_MEMORY);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+        Tag.NAME.toString(),
+        ALLOCATION_COUNT);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+        Tag.NAME.toString(),
+        ALLOCATION_FAILURE_COUNT);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+        Tag.NAME.toString(),
+        TOTAL_MEMORY);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+        Tag.NAME.toString(),
+        USED_MEMORY);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+        Tag.NAME.toString(),
+        AVAILABLE_MEMORY);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
index dc1ca4e5473..c828120eab5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
@@ -19,12 +19,24 @@
 
 package org.apache.iotdb.commons.memory;
 
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.rpc.AutoResizingBufferMemoryControl;
+import org.apache.iotdb.rpc.AutoResizingBufferMemoryManager;
+
 public class MemoryConfig {
+  private static final String AUTO_RESIZING_BUFFER_MEMORY_MANAGER_NAME = 
"AutoResizingBuffer";
+  private static final String AUTO_RESIZING_BUFFER_MEMORY_BLOCK_NAME = 
"AutoResizingBufferBlock";
+
   private final MemoryManager globalMemoryManager =
       new MemoryManager("GlobalMemoryManager", null, 
Runtime.getRuntime().totalMemory());
 
+  private MemoryManager autoResizingBufferMemoryManagerParent;
+  private IMemoryBlock autoResizingBufferMemoryBlock;
+  private boolean isAutoResizingBufferMemoryControlEnabled;
+
   private MemoryConfig() {
-    // singleton
+    initAutoResizingBufferMemoryControl();
+    MetricService.getInstance().addMetricSet(new 
AutoResizingBufferMemoryMetrics(this));
   }
 
   public static MemoryManager global() {
@@ -40,4 +52,77 @@ public class MemoryConfig {
 
     private MemoryConfigHolder() {}
   }
+
+  private void initAutoResizingBufferMemoryControl() {
+    AutoResizingBufferMemoryManager.setMemoryControl(
+        new AutoResizingBufferMemoryControl() {
+          @Override
+          public synchronized boolean allocate(long sizeInBytes) {
+            IMemoryBlock memoryBlock = getAutoResizingBufferMemoryBlock();
+            if (memoryBlock == null) {
+              return true;
+            }
+            return memoryBlock.allocate(sizeInBytes);
+          }
+
+          @Override
+          public synchronized void release(long sizeInBytes) {
+            IMemoryBlock memoryBlock = getAutoResizingBufferMemoryBlock();
+            if (memoryBlock != null) {
+              memoryBlock.release(sizeInBytes);
+            }
+          }
+        });
+  }
+
+  public synchronized void setAutoResizingBufferMemoryControl(
+      MemoryManager parentMemoryManager, long memorySizeInBytes) {
+    if (autoResizingBufferMemoryManagerParent != null) {
+      autoResizingBufferMemoryManagerParent.releaseChildMemoryManager(
+          AUTO_RESIZING_BUFFER_MEMORY_MANAGER_NAME);
+    }
+    autoResizingBufferMemoryManagerParent = parentMemoryManager;
+    autoResizingBufferMemoryBlock = null;
+
+    if (memorySizeInBytes <= 0) {
+      isAutoResizingBufferMemoryControlEnabled = false;
+      return;
+    }
+
+    MemoryManager autoResizingBufferMemoryManager =
+        parentMemoryManager.getOrCreateMemoryManager(
+            AUTO_RESIZING_BUFFER_MEMORY_MANAGER_NAME, memorySizeInBytes, true);
+    if (autoResizingBufferMemoryManager == null) {
+      isAutoResizingBufferMemoryControlEnabled = false;
+      return;
+    }
+    autoResizingBufferMemoryBlock =
+        autoResizingBufferMemoryManager.exactAllocate(
+            AUTO_RESIZING_BUFFER_MEMORY_BLOCK_NAME, memorySizeInBytes, 
MemoryBlockType.DYNAMIC);
+    isAutoResizingBufferMemoryControlEnabled = true;
+  }
+
+  private synchronized IMemoryBlock getAutoResizingBufferMemoryBlock() {
+    if (!isAutoResizingBufferMemoryControlEnabled
+        || autoResizingBufferMemoryBlock == null
+        || autoResizingBufferMemoryBlock.isReleased()) {
+      return null;
+    }
+    return autoResizingBufferMemoryBlock;
+  }
+
+  public synchronized long getAutoResizingBufferMemoryTotalSizeInBytes() {
+    IMemoryBlock memoryBlock = getAutoResizingBufferMemoryBlock();
+    return memoryBlock == null ? 0 : memoryBlock.getTotalMemorySizeInBytes();
+  }
+
+  public synchronized long getAutoResizingBufferMemoryUsedSizeInBytes() {
+    IMemoryBlock memoryBlock = getAutoResizingBufferMemoryBlock();
+    return memoryBlock == null ? 0 : memoryBlock.getUsedMemoryInBytes();
+  }
+
+  public synchronized long getAutoResizingBufferMemoryAvailableSizeInBytes() {
+    IMemoryBlock memoryBlock = getAutoResizingBufferMemoryBlock();
+    return memoryBlock == null ? 0 : memoryBlock.getFreeMemoryInBytes();
+  }
 }
diff --git a/pom.xml b/pom.xml
index 3e0aa631a45..ac80880bd42 100644
--- a/pom.xml
+++ b/pom.xml
@@ -770,6 +770,7 @@
                             <exclude>**/.gitmodules</exclude>
                             <exclude>**/.git-blame-ignore-revs</exclude>
                             <exclude>**/git.properties</exclude>
+                            <exclude>AGENTS.md</exclude>
                             <!-- Maven related files -->
                             <exclude>**/target/**</exclude>
                             <!-- Eclipse related files -->

Reply via email to