This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a0f31fe1634 Fix ref count of IoTConsensus request not decreased in 
allocation failure (#16169)
a0f31fe1634 is described below

commit a0f31fe163473ea85a05cd9ea4bb01085cd39f04
Author: Jiang Tian <[email protected]>
AuthorDate: Thu Aug 14 17:56:46 2025 +0800

    Fix ref count of IoTConsensus request not decreased in allocation failure 
(#16169)
    
    * fix IoTConsensus memory management
    
    * Fix ref count of IoTConsensus request not decreased in allocation failure
    
    * fix log level
---
 .../logdispatcher/IoTConsensusMemoryManager.java   | 100 ++++++++++++++++++--
 .../consensus/iot/logdispatcher/LogDispatcher.java |  22 ++---
 .../consensus/iot/logdispatcher/SyncStatus.java    |  20 +++-
 .../IoTConsensusMemoryManagerTest.java             | 103 +++++++++++++++++++++
 4 files changed, 216 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
index f993ea386da..17074d5e66c 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
 import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock;
 import org.apache.iotdb.commons.memory.IMemoryBlock;
 import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 
 import org.slf4j.Logger;
@@ -41,16 +42,58 @@ public class IoTConsensusMemoryManager {
     MetricService.getInstance().addMetricSet(new 
IoTConsensusMemoryManagerMetrics(this));
   }
 
-  public boolean reserve(IndexedConsensusRequest request, boolean fromQueue) {
+  public boolean reserve(IndexedConsensusRequest request) {
     long prevRef = request.incRef();
     if (prevRef == 0) {
-      return reserve(request.getMemorySize(), fromQueue);
-    } else {
-      return true;
+      boolean reserved = reserve(request.getMemorySize(), true);
+      if (reserved) {
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Reserving {} bytes for request {} succeeds, current total usage 
{}",
+              request.getMemorySize(),
+              request.getSearchIndex(),
+              memoryBlock.getUsedMemoryInBytes());
+        }
+      } else {
+        request.decRef();
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Reserving {} bytes for request {} fails, current total usage 
{}",
+              request.getMemorySize(),
+              request.getSearchIndex(),
+              memoryBlock.getUsedMemoryInBytes());
+        }
+      }
+      return reserved;
+    } else if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Skip memory reservation for {} because its ref count is not 0",
+          request.getSearchIndex());
+    }
+    return true;
+  }
+
+  public boolean reserve(Batch batch) {
+    boolean reserved = reserve(batch.getMemorySize(), false);
+    if (reserved && logger.isDebugEnabled()) {
+      logger.debug(
+          "Reserving {} bytes for batch {}-{} succeeds, current total usage 
{}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          memoryBlock.getUsedMemoryInBytes());
+    } else if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Reserving {} bytes for batch {}-{} fails, current total usage {}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          memoryBlock.getUsedMemoryInBytes());
     }
+    return reserved;
   }
 
-  public boolean reserve(long size, boolean fromQueue) {
+  private boolean reserve(long size, boolean fromQueue) {
     boolean result =
         fromQueue
             ? memoryBlock.allocateIfSufficient(size, maxMemoryRatioForQueue)
@@ -65,14 +108,33 @@ public class IoTConsensusMemoryManager {
     return result;
   }
 
-  public void free(IndexedConsensusRequest request, boolean fromQueue) {
+  public void free(IndexedConsensusRequest request) {
     long prevRef = request.decRef();
     if (prevRef == 1) {
-      free(request.getMemorySize(), fromQueue);
+      free(request.getMemorySize(), true);
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Freed {} bytes for request {}, current total usage {}",
+            request.getMemorySize(),
+            request.getSearchIndex(),
+            memoryBlock.getUsedMemoryInBytes());
+      }
+    }
+  }
+
+  public void free(Batch batch) {
+    free(batch.getMemorySize(), false);
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Freed {} bytes for batch {}-{}, current total usage {}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          getMemorySizeInByte());
     }
   }
 
-  public void free(long size, boolean fromQueue) {
+  private void free(long size, boolean fromQueue) {
     long currentUsedMemory = memoryBlock.release(size);
     if (fromQueue) {
       queueMemorySizeInByte.addAndGet(-size);
@@ -91,6 +153,28 @@ public class IoTConsensusMemoryManager {
     this.maxMemoryRatioForQueue = maxMemoryRatioForQueue;
   }
 
+  @TestOnly
+  public void reset() {
+    this.memoryBlock.release(this.memoryBlock.getUsedMemoryInBytes());
+    this.queueMemorySizeInByte.set(0);
+    this.syncMemorySizeInByte.set(0);
+  }
+
+  @TestOnly
+  public IMemoryBlock getMemoryBlock() {
+    return memoryBlock;
+  }
+
+  @TestOnly
+  public void setMemoryBlock(IMemoryBlock memoryBlock) {
+    this.memoryBlock = memoryBlock;
+  }
+
+  @TestOnly
+  public Double getMaxMemoryRatioForQueue() {
+    return maxMemoryRatioForQueue;
+  }
+
   long getMemorySizeInByte() {
     return memoryBlock.getUsedMemoryInBytes();
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 00cd6d1376f..374691bf38b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -283,7 +283,7 @@ public class LogDispatcher {
 
     /** try to offer a request into queue with memory control. */
     public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
-      if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest, true)) {
+      if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest)) {
         return false;
       }
       boolean success;
@@ -291,19 +291,19 @@ public class LogDispatcher {
         success = pendingEntries.offer(indexedConsensusRequest);
       } catch (Throwable t) {
         // If exception occurs during request offer, the reserved memory 
should be released
-        iotConsensusMemoryManager.free(indexedConsensusRequest, true);
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
         throw t;
       }
       if (!success) {
         // If offer failed, the reserved memory should be released
-        iotConsensusMemoryManager.free(indexedConsensusRequest, true);
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
       }
       return success;
     }
 
     /** try to remove a request from queue with memory control. */
     private void releaseReservedMemory(IndexedConsensusRequest 
indexedConsensusRequest) {
-      iotConsensusMemoryManager.free(indexedConsensusRequest, true);
+      iotConsensusMemoryManager.free(indexedConsensusRequest);
     }
 
     public void stop() {
@@ -323,23 +323,13 @@ public class LogDispatcher {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-      long requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
-        long prevRef = indexedConsensusRequest.decRef();
-        if (prevRef == 1) {
-          requestSize += indexedConsensusRequest.getMemorySize();
-        }
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
       }
       pendingEntries.clear();
-      iotConsensusMemoryManager.free(requestSize, true);
-      requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) {
-        long prevRef = indexedConsensusRequest.decRef();
-        if (prevRef == 1) {
-          requestSize += indexedConsensusRequest.getMemorySize();
-        }
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
       }
-      iotConsensusMemoryManager.free(requestSize, true);
       syncStatus.free();
       MetricService.getInstance().removeMetricSet(logDispatcherThreadMetrics);
     }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index c5a426d88b8..accc9f7667d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -21,12 +21,16 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
 
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
 public class SyncStatus {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SyncStatus.class);
   private final IoTConsensusConfig config;
   private final IndexController controller;
   private final LinkedList<Batch> pendingBatches = new LinkedList<>();
@@ -45,10 +49,18 @@ public class SyncStatus {
    */
   public synchronized void addNextBatch(Batch batch) throws 
InterruptedException {
     while ((pendingBatches.size() >= 
config.getReplication().getMaxPendingBatchesNum()
-            || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), 
false))
+            || !iotConsensusMemoryManager.reserve(batch))
         && !Thread.interrupted()) {
       wait();
     }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Reserved {} bytes for batch {}-{}, current total usage {}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          iotConsensusMemoryManager.getMemorySizeInByte());
+    }
     pendingBatches.add(batch);
   }
 
@@ -65,7 +77,7 @@ public class SyncStatus {
       while (current.isSynced()) {
         controller.update(current.getEndIndex(), false);
         iterator.remove();
-        iotConsensusMemoryManager.free(current.getMemorySize(), false);
+        iotConsensusMemoryManager.free(current);
         if (iterator.hasNext()) {
           current = iterator.next();
         } else {
@@ -78,13 +90,11 @@ public class SyncStatus {
   }
 
   public synchronized void free() {
-    long size = 0;
     for (Batch pendingBatch : pendingBatches) {
-      size += pendingBatch.getMemorySize();
+      iotConsensusMemoryManager.free(pendingBatch);
     }
     pendingBatches.clear();
     controller.update(0L, true);
-    iotConsensusMemoryManager.free(size, false);
   }
 
   /** Gets the first index that is not currently synchronized. */
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java
new file mode 100644
index 00000000000..3d89943772e
--- /dev/null
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.iot.logdispatcher;
+
+import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock;
+import org.apache.iotdb.commons.memory.IMemoryBlock;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class IoTConsensusMemoryManagerTest {
+
+  private IMemoryBlock previousMemoryBlock;
+  private long memoryBlockSize = 16 * 1024L;
+
+  @Before
+  public void setUp() throws Exception {
+    previousMemoryBlock = 
IoTConsensusMemoryManager.getInstance().getMemoryBlock();
+    IoTConsensusMemoryManager.getInstance()
+        .setMemoryBlock(new AtomicLongMemoryBlock("Test", null, 
memoryBlockSize));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    
IoTConsensusMemoryManager.getInstance().setMemoryBlock(previousMemoryBlock);
+  }
+
+  @Test
+  public void testSingleReserveAndRelease() {
+    testReserveAndRelease(1);
+  }
+
+  @Test
+  public void testMultiReserveAndRelease() {
+    testReserveAndRelease(3);
+  }
+
+  private void testReserveAndRelease(int numReservation) {
+    int allocationSize = 1;
+    long allocatedSize = 0;
+    List<IndexedConsensusRequest> requestList = new ArrayList<>();
+    while (true) {
+      IndexedConsensusRequest request =
+          new IndexedConsensusRequest(
+              0,
+              Collections.singletonList(
+                  new 
ByteBufferConsensusRequest(ByteBuffer.allocate(allocationSize))));
+      request.buildSerializedRequests();
+      if (allocatedSize + allocationSize
+          <= memoryBlockSize
+              * 
IoTConsensusMemoryManager.getInstance().getMaxMemoryRatioForQueue()) {
+        for (int i = 0; i < numReservation; i++) {
+          assertTrue(IoTConsensusMemoryManager.getInstance().reserve(request));
+          requestList.add(request);
+        }
+      } else {
+        for (int i = 0; i < numReservation; i++) {
+          
assertFalse(IoTConsensusMemoryManager.getInstance().reserve(request));
+        }
+        break;
+      }
+      allocatedSize += allocationSize;
+    }
+
+    assertTrue(
+        IoTConsensusMemoryManager.getInstance().getMemorySizeInByte()
+            <= memoryBlockSize
+                * 
IoTConsensusMemoryManager.getInstance().getMaxMemoryRatioForQueue());
+    for (IndexedConsensusRequest indexedConsensusRequest : requestList) {
+      IoTConsensusMemoryManager.getInstance().free(indexedConsensusRequest);
+    }
+    assertEquals(0, 
IoTConsensusMemoryManager.getInstance().getMemorySizeInByte());
+  }
+}

Reply via email to