This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new f0ec9c4a071 [To dev/1.3] Cherry-pick some IoTConsensus related commits 
to 1.3 (#16812)
f0ec9c4a071 is described below

commit f0ec9c4a071f309cbc2a2f00fc4d118011b28d9e
Author: Jiang Tian <[email protected]>
AuthorDate: Thu Nov 27 17:21:57 2025 +0800

    [To dev/1.3] Cherry-pick some IoTConsensus related commits to 1.3 (#16812)
    
    * more accurate mermory size (#15713)
    
    * Fix stuck when stopping a DataNode with large unremovable WAL files 
(#15727)
    
    * Fix stuck when stopping a DataNode with large unremovable WAL files
    
    * spotless
    
    * add shutdown hook watcher
    
    * Fix logDispatcher stuck
    
    * add re-interrupt
    
    * Add a multiplier to avoid receiver OOM in IoTConsensus (#16102)
    
    * Fix negative iot queue size & missing search index for deletion & missed 
request when performing empty table deleting (#16022)
    
    * Fix double memory free of iotconsensus queue request during region 
deletion
    
    * Fix missing searchIndex and lost deletion when no TsFile is involved.
    
    * Fix ref count of IoTConsensus request not decreased in allocation failure 
(#16169)
    
    * fix IoTConsensus memory management
    
    * Fix ref count of IoTConsensus request not decreased in allocation failure
    
    * fix log level
    
    * remove irrelevant codes from 2.0
    
    * Remove a table test
    
    * Interrupt wal-delete thread when WALManager is closed (#15442)
    
    ---------
    
    Co-authored-by: Xiangpeng Hu <[email protected]>
---
 .../it/env/cluster/config/MppDataNodeConfig.java   |  12 ++
 .../it/env/cluster/node/AbstractNodeWrapper.java   |   4 +
 .../it/env/remote/config/RemoteDataNodeConfig.java |  10 ++
 .../apache/iotdb/itbase/env/DataNodeConfig.java    |   4 +
 ...IoTDBRegionOperationReliabilityITFramework.java |  23 +++-
 .../iotdb/db/it/IoTDBCustomizedClusterIT.java      | 148 +++++++++++++++++++++
 .../DeserializedBatchIndexedConsensusRequest.java  |   7 +
 .../common/request/IndexedConsensusRequest.java    |  10 ++
 .../consensus/iot/client/DispatchLogHandler.java   |   5 +
 .../iotdb/consensus/iot/logdispatcher/Batch.java   |  25 ++--
 .../logdispatcher/IoTConsensusMemoryManager.java   | 126 ++++++++++++++----
 .../consensus/iot/logdispatcher/LogDispatcher.java |  38 ++++--
 .../consensus/iot/logdispatcher/SyncStatus.java    |  23 +++-
 .../service/IoTConsensusRPCServiceProcessor.java   |   3 +-
 .../plan/node/pipe/PipeEnrichedInsertNode.java     |   4 +-
 .../planner/plan/node/write/DeleteDataNode.java    |   9 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |   3 +-
 .../planner/plan/node/write/InsertRowsNode.java    |   3 +-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   3 +-
 .../plan/planner/plan/node/write/SearchNode.java   |   3 +-
 .../iotdb/db/service/DataNodeShutdownHook.java     |  31 ++++-
 .../storageengine/dataregion/wal/WALManager.java   |  14 +-
 .../src/main/thrift/iotconsensus.thrift            |   2 +
 23 files changed, 437 insertions(+), 73 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 01636b7bf0b..9ac380503cd 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -100,4 +100,16 @@ public class MppDataNodeConfig extends MppBaseConfig 
implements DataNodeConfig {
     setProperty("cache_last_values_for_load", 
String.valueOf(cacheLastValuesForLoad));
     return this;
   }
+
+  @Override
+  public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
+    setProperty("wal_throttle_threshold_in_byte", 
String.valueOf(walThrottleSize));
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setDeleteWalFilesPeriodInMs(long 
deleteWalFilesPeriodInMs) {
+    setProperty("delete_wal_files_period_in_ms", 
String.valueOf(deleteWalFilesPeriodInMs));
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index f1e25f9e1d6..005db9050ff 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -758,4 +758,8 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
       return -1;
     }
   }
+
+  public Process getInstance() {
+    return instance;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index 80d9d98d23d..b2550ae689f 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -63,4 +63,14 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
   public DataNodeConfig setCacheLastValuesForLoad(boolean 
cacheLastValuesForLoad) {
     return this;
   }
+
+  @Override
+  public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setDeleteWalFilesPeriodInMs(long 
deleteWalFilesPeriodInMs) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index c6112a0e639..c27eb369c0d 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -39,4 +39,8 @@ public interface DataNodeConfig {
   DataNodeConfig setLoadLastCacheStrategy(String strategyName);
 
   DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad);
+
+  DataNodeConfig setWalThrottleSize(long walThrottleSize);
+
+  DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
index 0dccc669eeb..1d14574a8ee 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.session.Session;
 
 import org.apache.thrift.TException;
 import org.apache.tsfile.read.common.Field;
+import org.apache.tsfile.utils.Pair;
 import org.awaitility.Awaitility;
 import org.awaitility.core.ConditionTimeoutException;
 import org.junit.After;
@@ -307,7 +308,7 @@ public class IoTDBRegionOperationReliabilityITFramework {
     }
   }
 
-  protected Set<Integer> getAllDataNodes(Statement statement) throws Exception 
{
+  public static Set<Integer> getAllDataNodes(Statement statement) throws 
Exception {
     ResultSet result = statement.executeQuery(SHOW_DATANODES);
     Set<Integer> allDataNodeId = new HashSet<>();
     while (result.next()) {
@@ -444,6 +445,26 @@ public class IoTDBRegionOperationReliabilityITFramework {
     return regionMap;
   }
 
+  public static Map<Integer, Pair<Integer, Set<Integer>>> 
getDataRegionMapWithLeader(
+      Statement statement) throws Exception {
+    ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
+    Map<Integer, Pair<Integer, Set<Integer>>> regionMap = new HashMap<>();
+    while (showRegionsResult.next()) {
+      if (String.valueOf(TConsensusGroupType.DataRegion)
+          .equals(showRegionsResult.getString(ColumnHeaderConstant.TYPE))) {
+        int regionId = 
showRegionsResult.getInt(ColumnHeaderConstant.REGION_ID);
+        int dataNodeId = 
showRegionsResult.getInt(ColumnHeaderConstant.DATA_NODE_ID);
+        Pair<Integer, Set<Integer>> leaderNodesPair =
+            regionMap.computeIfAbsent(regionId, id -> new Pair<>(-1, new 
HashSet<>()));
+        leaderNodesPair.getRight().add(dataNodeId);
+        if 
(showRegionsResult.getString(ColumnHeaderConstant.ROLE).equals("Leader")) {
+          leaderNodesPair.setLeft(dataNodeId);
+        }
+      }
+    }
+    return regionMap;
+  }
+
   public static Map<Integer, Set<Integer>> getAllRegionMap(Statement 
statement) throws Exception {
     ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
     Map<Integer, Set<Integer>> regionMap = new HashMap<>();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
new file mode 100644
index 00000000000..5812e255d16
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Date;
+
+import static org.junit.Assert.fail;
+
+/** Tests that may not be satisfied with the default cluster settings. */
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBCustomizedClusterIT {
+
+  private final Logger logger = 
LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class);
+
+  /**
+   * When the wal size exceeds `walThrottleSize` * 0.8, the timed 
wal-delete-thread will try
+   * deleting wal forever, which will block the DataNode from exiting, because 
task of deleting wal
+   * submitted by the ShutdownHook cannot be executed. This test ensures that 
this blocking is
+   * fixed.
+   */
+  @Test
+  public void testWalThrottleStuck()
+      throws SQLException,
+          IoTDBConnectionException,
+          StatementExecutionException,
+          InterruptedException {
+    SimpleEnv simpleEnv = new SimpleEnv();
+    simpleEnv
+        .getConfig()
+        .getDataNodeConfig()
+        .setWalThrottleSize(1)
+        .setDeleteWalFilesPeriodInMs(100);
+    simpleEnv
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(3)
+        .setSchemaReplicationFactor(3)
+        
.setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus")
+        
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus");
+    try {
+      simpleEnv.initClusterEnvironment(1, 3);
+
+      int leaderIndex = -1;
+      try (Connection connection = simpleEnv.getConnection();
+          Statement statement = connection.createStatement()) {
+        // write the first data
+        statement.execute("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
+        // find the leader of the data region
+        int port = -1;
+
+        ResultSet resultSet = statement.executeQuery("SHOW REGIONS");
+        while (resultSet.next()) {
+          String regionType = resultSet.getString("Type");
+          if (regionType.equals("DataRegion")) {
+            String role = resultSet.getString("Role");
+            if (role.equals("Leader")) {
+              port = resultSet.getInt("RpcPort");
+              break;
+            }
+          }
+        }
+
+        if (port == -1) {
+          fail("Leader not found");
+        }
+
+        for (int i = 0; i < simpleEnv.getDataNodeWrapperList().size(); i++) {
+          if (simpleEnv.getDataNodeWrapperList().get(i).getPort() == port) {
+            leaderIndex = i;
+            break;
+          }
+        }
+      }
+
+      // stop a follower
+      int followerIndex = (leaderIndex + 1) % 
simpleEnv.getDataNodeWrapperList().size();
+      simpleEnv.getDataNodeWrapperList().get(followerIndex).stop();
+      System.out.println(
+          new Date()
+              + ":Stopping data node "
+              + 
simpleEnv.getDataNodeWrapperList().get(followerIndex).getIpAndPortString());
+
+      DataNodeWrapper leader = 
simpleEnv.getDataNodeWrapperList().get(leaderIndex);
+      // write to the leader to generate wal that cannot be synced
+      try (Session session = new Session(leader.getIp(), leader.getPort())) {
+        session.open();
+
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+        session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) 
values (1,1)");
+      }
+
+      // wait for wal-delete thread to be scheduled
+      Thread.sleep(1000);
+
+      // stop the leader
+      leader.getInstance().destroy();
+      System.out.println(new Date() + ":Stopping data node " + 
leader.getIpAndPortString());
+      // confirm the death of the leader
+      long startTime = System.currentTimeMillis();
+      while (leader.isAlive()) {
+        if (System.currentTimeMillis() - startTime > 30000) {
+          fail("Leader does not exit after 30s");
+        }
+      }
+    } finally {
+      simpleEnv.cleanClusterEnvironment();
+    }
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
index 3cb81fa95c7..9cdeaf60c30 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
@@ -29,6 +29,7 @@ public class DeserializedBatchIndexedConsensusRequest
   private final long startSyncIndex;
   private final long endSyncIndex;
   private final List<IConsensusRequest> insertNodes;
+  private long memorySize;
 
   public DeserializedBatchIndexedConsensusRequest(
       long startSyncIndex, long endSyncIndex, int size) {
@@ -52,6 +53,7 @@ public class DeserializedBatchIndexedConsensusRequest
 
   public void add(IConsensusRequest insertNode) {
     this.insertNodes.add(insertNode);
+    this.memorySize += insertNode.getMemorySize();
   }
 
   @Override
@@ -82,4 +84,9 @@ public class DeserializedBatchIndexedConsensusRequest
   public ByteBuffer serializeToByteBuffer() {
     return null;
   }
+
+  @Override
+  public long getMemorySize() {
+    return memorySize;
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 1147abc049e..2bf01d4ef86 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
 
 /** only used for iot consensus. */
 public class IndexedConsensusRequest implements IConsensusRequest {
@@ -34,6 +35,7 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
   private final List<IConsensusRequest> requests;
   private final List<ByteBuffer> serializedRequests;
   private long memorySize = 0;
+  private AtomicLong referenceCnt = new AtomicLong();
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> 
requests) {
     this.searchIndex = searchIndex;
@@ -100,4 +102,12 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
   public int hashCode() {
     return Objects.hash(searchIndex, requests);
   }
+
+  public long incRef() {
+    return referenceCnt.getAndIncrement();
+  }
+
+  public long decRef() {
+    return referenceCnt.getAndDecrement();
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 9b0979bf6be..3b812f6c297 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.consensus.iot.client;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.iot.logdispatcher.Batch;
+import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
 import 
org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher.LogDispatcherThread;
 import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics;
 import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
@@ -89,6 +90,10 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
       }
       completeBatch(batch);
     }
+    if (response.isSetReceiverMemSize()) {
+      
LogDispatcher.getReceiverMemSizeSum().addAndGet(response.getReceiverMemSize());
+      LogDispatcher.getSenderMemSizeSum().addAndGet(batch.getMemorySize());
+    }
     logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() - 
createTime);
   }
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
index 7a556c85a04..72b68ab96ac 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 import org.apache.iotdb.consensus.iot.thrift.TLogEntry;
 
-import java.nio.Buffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -37,7 +36,7 @@ public class Batch {
 
   private long logEntriesNumFromWAL = 0L;
 
-  private long serializedSize;
+  private long memorySize;
   // indicates whether this batch has been successfully synchronized to 
another node
   private boolean synced;
 
@@ -60,14 +59,20 @@ public class Batch {
     if (entry.fromWAL) {
       logEntriesNumFromWAL++;
     }
-    // TODO Maybe we need to add in additional fields for more accurate 
calculations
-    serializedSize +=
-        entry.getData() == null ? 0 : 
entry.getData().stream().mapToInt(Buffer::capacity).sum();
+    memorySize += entry.getMemorySize();
   }
 
   public boolean canAccumulate() {
+    // When reading entries from the WAL, the memory size is calculated based 
on the serialized
+    // size, which can be significantly smaller than the actual size.
+    // Thus, we add a multiplier to sender's memory size to estimate the 
receiver's memory cost.
+    // The multiplier is calculated based on the receiver's feedback.
+    long receiverMemSize = LogDispatcher.getReceiverMemSizeSum().get();
+    long senderMemSize = LogDispatcher.getSenderMemSizeSum().get();
+    double multiplier = senderMemSize > 0 ? (double) receiverMemSize / 
senderMemSize : 1.0;
+    multiplier = Math.max(multiplier, 1.0);
     return logEntries.size() < 
config.getReplication().getMaxLogEntriesNumPerBatch()
-        && serializedSize < config.getReplication().getMaxSizePerBatch();
+        && ((long) (memorySize * multiplier)) < 
config.getReplication().getMaxSizePerBatch();
   }
 
   public long getStartIndex() {
@@ -94,8 +99,8 @@ public class Batch {
     return logEntries.isEmpty();
   }
 
-  public long getSerializedSize() {
-    return serializedSize;
+  public long getMemorySize() {
+    return memorySize;
   }
 
   public long getLogEntriesNumFromWAL() {
@@ -111,8 +116,8 @@ public class Batch {
         + endIndex
         + ", size="
         + logEntries.size()
-        + ", serializedSize="
-        + serializedSize
+        + ", memorySize="
+        + memorySize
         + '}';
   }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
index df54918c60e..bb46f20486f 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
@@ -20,11 +20,12 @@
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
 import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class IoTConsensusMemoryManager {
@@ -39,41 +40,101 @@ public class IoTConsensusMemoryManager {
     MetricService.getInstance().addMetricSet(new 
IoTConsensusMemoryManagerMetrics(this));
   }
 
-  public boolean reserve(long size, boolean fromQueue) {
-    AtomicBoolean result = new AtomicBoolean(false);
-    memorySizeInByte.updateAndGet(
-        memorySize -> {
-          long remainSize =
-              (fromQueue ? maxMemorySizeForQueueInByte : maxMemorySizeInByte) 
- memorySize;
-          if (size > remainSize) {
-            logger.debug(
-                "consensus memory limited. required: {}, used: {}, total: {}",
-                size,
-                memorySize,
-                maxMemorySizeInByte);
-            result.set(false);
-            return memorySize;
-          } else {
-            logger.debug(
-                "{} add {} bytes, total memory size: {} bytes.",
-                Thread.currentThread().getName(),
-                size,
-                memorySize + size);
-            result.set(true);
-            return memorySize + size;
-          }
-        });
-    if (result.get()) {
+  public boolean reserve(IndexedConsensusRequest request) {
+    long prevRef = request.incRef();
+    if (prevRef == 0) {
+      boolean reserved = reserve(request.getMemorySize(), true);
+      if (reserved) {
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Reserving {} bytes for request {} succeeds, current total usage 
{}",
+              request.getMemorySize(),
+              request.getSearchIndex(),
+              memorySizeInByte.get());
+        }
+      } else {
+        request.decRef();
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Reserving {} bytes for request {} fails, current total usage 
{}",
+              request.getMemorySize(),
+              request.getSearchIndex(),
+              memorySizeInByte.get());
+        }
+      }
+      return reserved;
+    } else if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Skip memory reservation for {} because its ref count is not 0",
+          request.getSearchIndex());
+    }
+    return true;
+  }
+
+  public boolean reserve(Batch batch) {
+    boolean reserved = reserve(batch.getMemorySize(), false);
+    if (reserved && logger.isDebugEnabled()) {
+      logger.debug(
+          "Reserving {} bytes for batch {}-{} succeeds, current total usage 
{}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          memorySizeInByte.get());
+    } else if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Reserving {} bytes for batch {}-{} fails, current total usage {}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          memorySizeInByte.get());
+    }
+    return reserved;
+  }
+
+  private boolean reserve(long size, boolean fromQueue) {
+    boolean result = memorySizeInByte.addAndGet(size) < maxMemorySizeInByte;
+    if (result) {
       if (fromQueue) {
-        queueMemorySizeInByte.addAndGet(size);
+        result = queueMemorySizeInByte.addAndGet(size) < 
maxMemorySizeForQueueInByte;
+        if (!result) {
+          queueMemorySizeInByte.addAndGet(-size);
+        }
       } else {
         syncMemorySizeInByte.addAndGet(size);
       }
+    } else {
+      memorySizeInByte.addAndGet(-size);
+    }
+    return result;
+  }
+
+  public void free(IndexedConsensusRequest request) {
+    long prevRef = request.decRef();
+    if (prevRef == 1) {
+      free(request.getMemorySize(), true);
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Freed {} bytes for request {}, current total usage {}",
+            request.getMemorySize(),
+            request.getSearchIndex(),
+            memorySizeInByte.get());
+      }
     }
-    return result.get();
   }
 
-  public void free(long size, boolean fromQueue) {
+  public void free(Batch batch) {
+    free(batch.getMemorySize(), false);
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Freed {} bytes for batch {}-{}, current total usage {}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          getMemorySizeInByte());
+    }
+  }
+
+  private void free(long size, boolean fromQueue) {
     long currentUsedMemory = memorySizeInByte.addAndGet(-size);
     if (fromQueue) {
       queueMemorySizeInByte.addAndGet(-size);
@@ -92,6 +153,13 @@ public class IoTConsensusMemoryManager {
     this.maxMemorySizeForQueueInByte = maxMemorySizeForQueue;
   }
 
+  @TestOnly
+  public void reset() {
+    this.memorySizeInByte.set(0);
+    this.queueMemorySizeInByte.set(0);
+    this.syncMemorySizeInByte.set(0);
+  }
+
   long getMemorySizeInByte() {
     return memorySizeInByte.get();
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 2d4e7cd5e01..374691bf38b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -69,6 +69,9 @@ public class LogDispatcher {
   private final AtomicLong logEntriesFromWAL = new AtomicLong(0);
   private final AtomicLong logEntriesFromQueue = new AtomicLong(0);
 
+  private static final AtomicLong senderMemSizeSum = new AtomicLong(0);
+  private static final AtomicLong receiverMemSizeSum = new AtomicLong(0);
+
   public LogDispatcher(
       IoTConsensusServerImpl impl,
       IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager) 
{
@@ -105,8 +108,8 @@ public class LogDispatcher {
   public synchronized void stop() {
     if (!threads.isEmpty()) {
       threads.forEach(LogDispatcherThread::setStopped);
-      threads.forEach(LogDispatcherThread::processStopped);
       executorService.shutdownNow();
+      threads.forEach(LogDispatcherThread::processStopped);
       int timeout = 10;
       try {
         if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
@@ -280,7 +283,7 @@ public class LogDispatcher {
 
     /** try to offer a request into queue with memory control. */
     public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
-      if 
(!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getMemorySize(), 
true)) {
+      if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest)) {
         return false;
       }
       boolean success;
@@ -288,19 +291,19 @@ public class LogDispatcher {
         success = pendingEntries.offer(indexedConsensusRequest);
       } catch (Throwable t) {
         // If exception occurs during request offer, the reserved memory 
should be released
-        
iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true);
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
         throw t;
       }
       if (!success) {
         // If offer failed, the reserved memory should be released
-        
iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true);
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
       }
       return success;
     }
 
     /** try to remove a request from queue with memory control. */
     private void releaseReservedMemory(IndexedConsensusRequest 
indexedConsensusRequest) {
-      iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), 
true);
+      iotConsensusMemoryManager.free(indexedConsensusRequest);
     }
 
     public void stop() {
@@ -320,17 +323,13 @@ public class LogDispatcher {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-      long requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
-        requestSize += indexedConsensusRequest.getMemorySize();
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
       }
       pendingEntries.clear();
-      iotConsensusMemoryManager.free(requestSize, true);
-      requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) {
-        requestSize += indexedConsensusRequest.getMemorySize();
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
       }
-      iotConsensusMemoryManager.free(requestSize, true);
       syncStatus.free();
       MetricService.getInstance().removeMetricSet(logDispatcherThreadMetrics);
     }
@@ -567,7 +566,8 @@ public class LogDispatcher {
         data.buildSerializedRequests();
         // construct request from wal
         logBatches.addTLogEntry(
-            new TLogEntry(data.getSerializedRequests(), data.getSearchIndex(), 
true));
+            new TLogEntry(
+                data.getSerializedRequests(), data.getSearchIndex(), true, 
data.getMemorySize()));
       }
       // In the case of corrupt Data, we return true so that we can send a 
batch as soon as
       // possible, avoiding potential duplication
@@ -577,7 +577,19 @@ public class LogDispatcher {
     private void constructBatchIndexedFromConsensusRequest(
         IndexedConsensusRequest request, Batch logBatches) {
       logBatches.addTLogEntry(
-          new TLogEntry(request.getSerializedRequests(), 
request.getSearchIndex(), false));
+          new TLogEntry(
+              request.getSerializedRequests(),
+              request.getSearchIndex(),
+              false,
+              request.getMemorySize()));
     }
   }
+
+  public static AtomicLong getReceiverMemSizeSum() {
+    return receiverMemSizeSum;
+  }
+
+  public static AtomicLong getSenderMemSizeSum() {
+    return senderMemSizeSum;
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index e11b6302114..accc9f7667d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -21,12 +21,16 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
 
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
 public class SyncStatus {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SyncStatus.class);
   private final IoTConsensusConfig config;
   private final IndexController controller;
   private final LinkedList<Batch> pendingBatches = new LinkedList<>();
@@ -44,10 +48,19 @@ public class SyncStatus {
    * @throws InterruptedException
    */
   public synchronized void addNextBatch(Batch batch) throws 
InterruptedException {
-    while (pendingBatches.size() >= 
config.getReplication().getMaxPendingBatchesNum()
-        || !iotConsensusMemoryManager.reserve(batch.getSerializedSize(), 
false)) {
+    while ((pendingBatches.size() >= 
config.getReplication().getMaxPendingBatchesNum()
+            || !iotConsensusMemoryManager.reserve(batch))
+        && !Thread.interrupted()) {
       wait();
     }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Reserved {} bytes for batch {}-{}, current total usage {}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          iotConsensusMemoryManager.getMemorySizeInByte());
+    }
     pendingBatches.add(batch);
   }
 
@@ -64,7 +77,7 @@ public class SyncStatus {
       while (current.isSynced()) {
         controller.update(current.getEndIndex(), false);
         iterator.remove();
-        iotConsensusMemoryManager.free(current.getSerializedSize(), false);
+        iotConsensusMemoryManager.free(current);
         if (iterator.hasNext()) {
           current = iterator.next();
         } else {
@@ -77,13 +90,11 @@ public class SyncStatus {
   }
 
   public synchronized void free() {
-    long size = 0;
     for (Batch pendingBatch : pendingBatches) {
-      size += pendingBatch.getSerializedSize();
+      iotConsensusMemoryManager.free(pendingBatch);
     }
     pendingBatches.clear();
     controller.update(0L, true);
-    iotConsensusMemoryManager.free(size, false);
   }
 
   /** Gets the first index that is not currently synchronized. */
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 2bac66738fd..71c14aebaa1 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -129,7 +129,8 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Ifa
         "execute TSyncLogEntriesReq for {} with result {}",
         req.consensusGroupId,
         writeStatus.subStatus);
-    return new TSyncLogEntriesRes(writeStatus.subStatus);
+    return new TSyncLogEntriesRes(writeStatus.subStatus)
+        .setReceiverMemSize(deserializedRequest.getMemorySize());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index e152ed09bda..3f5bdf205b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
 import org.apache.iotdb.db.trigger.executor.TriggerFireVisitor;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -225,8 +226,9 @@ public class PipeEnrichedInsertNode extends InsertNode {
   }
 
   @Override
-  public void setSearchIndex(long searchIndex) {
+  public SearchNode setSearchIndex(final long searchIndex) {
     insertNode.setSearchIndex(searchIndex);
+    return this;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 0e7eae3f9d7..c4c4159f66d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -354,9 +354,10 @@ public class DeleteDataNode extends SearchNode implements 
WALEntryValue {
             .distinct()
             .collect(Collectors.toList());
     return new DeleteDataNode(
-        firstOne.getPlanNodeId(),
-        pathList,
-        firstOne.getDeleteStartTime(),
-        firstOne.getDeleteEndTime());
+            firstOne.getPlanNodeId(),
+            pathList,
+            firstOne.getDeleteStartTime(),
+            firstOne.getDeleteEndTime())
+        .setSearchIndex(firstOne.searchIndex);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 6e401b4d54b..773881e398e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -134,9 +134,10 @@ public class InsertMultiTabletsNode extends InsertNode {
   }
 
   @Override
-  public void setSearchIndex(long index) {
+  public SearchNode setSearchIndex(long index) {
     searchIndex = index;
     insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
+    return this;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index caf86e67bab..7ec81d626df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -128,9 +128,10 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
   }
 
   @Override
-  public void setSearchIndex(long index) {
+  public SearchNode setSearchIndex(long index) {
     searchIndex = index;
     insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+    return this;
   }
 
   public Map<Integer, TSStatus> getResults() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 021e51b69af..c7b193841d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -98,9 +98,10 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
   }
 
   @Override
-  public void setSearchIndex(long index) {
+  public SearchNode setSearchIndex(long index) {
     searchIndex = index;
     insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+    return this;
   }
 
   public TSStatus[] getFailingStatus() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
index ecae2cd7197..e9c614e7a98 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
@@ -45,8 +45,9 @@ public abstract class SearchNode extends WritePlanNode {
   }
 
   /** Search index should start from 1 */
-  public void setSearchIndex(long searchIndex) {
+  public SearchNode setSearchIndex(long searchIndex) {
     this.searchIndex = searchIndex;
+    return this;
   }
 
   public abstract SearchNode merge(List<SearchNode> searchNodes);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 1a6af1cf9b0..4d0f4a5a412 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -52,15 +52,43 @@ public class DataNodeShutdownHook extends Thread {
   private static final Logger logger = 
LoggerFactory.getLogger(DataNodeShutdownHook.class);
 
   private final TDataNodeLocation nodeLocation;
+  private Thread watcherThread;
 
   public DataNodeShutdownHook(TDataNodeLocation nodeLocation) {
     super(ThreadName.DATANODE_SHUTDOWN_HOOK.getName());
     this.nodeLocation = nodeLocation;
   }
 
+  private void startWatcher() {
+    Thread hookThread = Thread.currentThread();
+    watcherThread =
+        new Thread(
+            () -> {
+              while (!Thread.interrupted()) {
+                try {
+                  Thread.sleep(10000);
+                  StackTraceElement[] stackTrace = hookThread.getStackTrace();
+                  StringBuilder stackTraceBuilder =
+                      new StringBuilder("Stack trace of shutdown hook:\n");
+                  for (StackTraceElement traceElement : stackTrace) {
+                    
stackTraceBuilder.append(traceElement.toString()).append("\n");
+                  }
+                  logger.info(stackTraceBuilder.toString());
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  return;
+                }
+              }
+            },
+            "ShutdownHookWatcher");
+    watcherThread.setDaemon(true);
+    watcherThread.start();
+  }
+
   @Override
   public void run() {
     logger.info("DataNode exiting...");
+    startWatcher();
     // Stop external rpc service firstly.
     ExternalRPCService.getInstance().stop();
 
@@ -77,7 +105,6 @@ public class DataNodeShutdownHook extends Thread {
         .equals(ConsensusFactory.RATIS_CONSENSUS)) {
       StorageEngine.getInstance().syncCloseAllProcessor();
     }
-    WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
 
     // We did this work because the RatisConsensus recovery mechanism is 
different from other
     // consensus algorithms, which will replace the underlying storage engine 
based on its
@@ -141,6 +168,8 @@ public class DataNodeShutdownHook extends Thread {
         "DataNode exits. Jvm memory usage: {}",
         MemUtils.bytesCntToStr(
             Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory()));
+
+    watcherThread.interrupt();
   }
 
   private void triggerSnapshotForAllDataRegion() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
index 96d4931fa43..e5fbdbca9af 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
@@ -164,6 +164,7 @@ public class WALManager implements IService {
     logger.info("Start rebooting wal delete thread.");
     if (walDeleteThread != null) {
       shutdownThread(walDeleteThread, ThreadName.WAL_DELETE);
+      walDeleteThread = null;
     }
     logger.info("Stop wal delete thread successfully, and now restart it.");
     registerScheduleTask(0, config.getDeleteWalFilesPeriodInMs());
@@ -177,7 +178,7 @@ public class WALManager implements IService {
     // threshold, the system continues to delete expired files until the disk 
size is smaller than
     // the threshold.
     boolean firstLoop = true;
-    while (firstLoop || shouldThrottle()) {
+    while ((firstLoop || shouldThrottle()) && !Thread.interrupted()) {
       deleteOutdatedFilesInWALNodes();
       if (firstLoop && shouldThrottle()) {
         logger.warn(
@@ -186,6 +187,10 @@ public class WALManager implements IService {
             getThrottleThreshold());
       }
       firstLoop = false;
+      if (Thread.interrupted()) {
+        logger.info("Timed wal delete thread is interrupted.");
+        return;
+      }
     }
   }
 
@@ -264,16 +269,19 @@ public class WALManager implements IService {
     if (config.getWalMode() == WALMode.DISABLE) {
       return;
     }
-
+    logger.info("Stopping WALManager");
     if (walDeleteThread != null) {
       shutdownThread(walDeleteThread, ThreadName.WAL_DELETE);
       walDeleteThread = null;
     }
+    logger.info("Deleting outdated files before exiting");
+    deleteOutdatedFilesInWALNodes();
     clear();
+    logger.info("WALManager stopped");
   }
 
   private void shutdownThread(ExecutorService thread, ThreadName threadName) {
-    thread.shutdown();
+    thread.shutdownNow();
     try {
       if (!thread.awaitTermination(30, TimeUnit.SECONDS)) {
         logger.warn("Waiting thread {} to be terminated is timeout", 
threadName.getName());
diff --git 
a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift 
b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
index d0b4808977e..829443f9552 100644
--- a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
+++ b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
@@ -26,6 +26,7 @@ struct TLogEntry {
   1: required list<binary> data
   2: required i64 searchIndex
   3: required bool fromWAL
+  4: required i64 memorySize
 }
 
 struct TSyncLogEntriesReq {
@@ -37,6 +38,7 @@ struct TSyncLogEntriesReq {
 
 struct TSyncLogEntriesRes {
   1: required list<common.TSStatus> statuses
+  2: optional i64 receiverMemSize
 }
 
 struct TInactivatePeerReq {


Reply via email to