This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch 
fix_negative_iot_queue_size_and_delete_empty_file
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ef499339a842512d6c79f0cc1af775d19ae8fb82
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Jul 23 17:18:29 2025 +0800

    Fix double memory free of iotconsensus queue request during region deletion
---
 .../common/request/IndexedConsensusRequest.java    | 10 ++++++++++
 .../logdispatcher/IoTConsensusMemoryManager.java   | 21 +++++++++++++++++++++
 .../consensus/iot/logdispatcher/LogDispatcher.java | 22 ++++++++++++++++------
 3 files changed, 47 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 1147abc049e..2bf01d4ef86 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
 
 /** only used for iot consensus. */
 public class IndexedConsensusRequest implements IConsensusRequest {
@@ -34,6 +35,7 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
   private final List<IConsensusRequest> requests;
   private final List<ByteBuffer> serializedRequests;
   private long memorySize = 0;
+  private AtomicLong referenceCnt = new AtomicLong();
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> 
requests) {
     this.searchIndex = searchIndex;
@@ -100,4 +102,12 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
   public int hashCode() {
     return Objects.hash(searchIndex, requests);
   }
+
+  public long incRef() {
+    return referenceCnt.getAndIncrement();
+  }
+
+  public long decRef() {
+    return referenceCnt.getAndDecrement();
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
index 7bee00588cd..03ef5f3a5f1 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock;
 import org.apache.iotdb.commons.memory.IMemoryBlock;
 import org.apache.iotdb.commons.service.metric.MetricService;
 
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +41,17 @@ public class IoTConsensusMemoryManager {
     MetricService.getInstance().addMetricSet(new 
IoTConsensusMemoryManagerMetrics(this));
   }
 
+  public boolean reserve(IndexedConsensusRequest request, boolean fromQueue) {
+    synchronized (request) {
+      long prevRef = request.incRef();
+      if (prevRef == 0) {
+        return reserve(request.getMemorySize(), fromQueue);
+      } else {
+        return true;
+      }
+    }
+  }
+
   public boolean reserve(long size, boolean fromQueue) {
     boolean result =
         fromQueue
@@ -55,6 +67,15 @@ public class IoTConsensusMemoryManager {
     return result;
   }
 
+  public void free(IndexedConsensusRequest request, boolean fromQueue) {
+    synchronized (request) {
+      long prevRef = request.decRef();
+      if (prevRef == 0) {
+        free(request.getMemorySize(), fromQueue);
+      }
+    }
+  }
+
   public void free(long size, boolean fromQueue) {
     long currentUsedMemory = memoryBlock.release(size);
     if (fromQueue) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index e196df43209..106c5b743a3 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -280,7 +280,7 @@ public class LogDispatcher {
 
     /** try to offer a request into queue with memory control. */
     public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
-      if 
(!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getMemorySize(), 
true)) {
+      if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest, true)) {
         return false;
       }
       boolean success;
@@ -288,19 +288,19 @@ public class LogDispatcher {
         success = pendingEntries.offer(indexedConsensusRequest);
       } catch (Throwable t) {
         // If exception occurs during request offer, the reserved memory 
should be released
-        
iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true);
+        iotConsensusMemoryManager.free(indexedConsensusRequest, true);
         throw t;
       }
       if (!success) {
         // If offer failed, the reserved memory should be released
-        
iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true);
+        iotConsensusMemoryManager.free(indexedConsensusRequest, true);
       }
       return success;
     }
 
     /** try to remove a request from queue with memory control. */
     private void releaseReservedMemory(IndexedConsensusRequest 
indexedConsensusRequest) {
-      iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), 
true);
+      iotConsensusMemoryManager.free(indexedConsensusRequest, true);
     }
 
     public void stop() {
@@ -322,13 +322,23 @@ public class LogDispatcher {
       }
       long requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
-        requestSize += indexedConsensusRequest.getMemorySize();
+        synchronized (indexedConsensusRequest) {
+          long prevRef = indexedConsensusRequest.decRef();
+          if (prevRef == 1) {
+            requestSize += indexedConsensusRequest.getMemorySize();
+          }
+        }
       }
       pendingEntries.clear();
       iotConsensusMemoryManager.free(requestSize, true);
       requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) {
-        requestSize += indexedConsensusRequest.getMemorySize();
+        synchronized (indexedConsensusRequest) {
+          long prevRef = indexedConsensusRequest.decRef();
+          if (prevRef == 1) {
+            requestSize += indexedConsensusRequest.getMemorySize();
+          }
+        }
       }
       iotConsensusMemoryManager.free(requestSize, true);
       syncStatus.free();

Reply via email to