This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch fix_iot_memory_2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 27d8b27947521a9c9c0bbaacf444d2859781a324
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Aug 13 17:29:30 2025 +0800

    Fix ref count of IoTConsensus request not decreased in allocation failure
---
 .../logdispatcher/IoTConsensusMemoryManager.java   |  88 ++++++++++++------
 .../IoTConsensusMemoryManagerTest.java             | 103 +++++++++++++++++++++
 2 files changed, 163 insertions(+), 28 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
index 9a9d38946fe..17074d5e66c 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
 import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock;
 import org.apache.iotdb.commons.memory.IMemoryBlock;
 import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 
 import org.slf4j.Logger;
@@ -46,38 +47,43 @@ public class IoTConsensusMemoryManager {
     if (prevRef == 0) {
       boolean reserved = reserve(request.getMemorySize(), true);
       if (reserved) {
-        logger.info(
-            "Reserving {} bytes for request {} succeeds, current total usage 
{}",
-            request.getMemorySize(),
-            request.getSearchIndex(),
-            memoryBlock.getUsedMemoryInBytes());
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Reserving {} bytes for request {} succeeds, current total usage 
{}",
+              request.getMemorySize(),
+              request.getSearchIndex(),
+              memoryBlock.getUsedMemoryInBytes());
+        }
       } else {
-        logger.info(
-            "Reserving {} bytes for request {} fails, current total usage {}",
-            request.getMemorySize(),
-            request.getSearchIndex(),
-            memoryBlock.getUsedMemoryInBytes());
+        request.decRef();
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Reserving {} bytes for request {} fails, current total usage 
{}",
+              request.getMemorySize(),
+              request.getSearchIndex(),
+              memoryBlock.getUsedMemoryInBytes());
+        }
       }
       return reserved;
-    } else {
-      logger.info(
+    } else if (logger.isDebugEnabled()) {
+      logger.debug(
           "Skip memory reservation for {} because its ref count is not 0",
           request.getSearchIndex());
-      return true;
     }
+    return true;
   }
 
   public boolean reserve(Batch batch) {
     boolean reserved = reserve(batch.getMemorySize(), false);
-    if (reserved) {
-      logger.info(
+    if (reserved && logger.isDebugEnabled()) {
+      logger.debug(
           "Reserving {} bytes for batch {}-{} succeeds, current total usage 
{}",
           batch.getMemorySize(),
           batch.getStartIndex(),
           batch.getEndIndex(),
           memoryBlock.getUsedMemoryInBytes());
-    } else {
-      logger.info(
+    } else if (logger.isDebugEnabled()) {
+      logger.debug(
           "Reserving {} bytes for batch {}-{} fails, current total usage {}",
           batch.getMemorySize(),
           batch.getStartIndex(),
@@ -106,22 +112,26 @@ public class IoTConsensusMemoryManager {
     long prevRef = request.decRef();
     if (prevRef == 1) {
       free(request.getMemorySize(), true);
-      logger.info(
-          "Freed {} bytes for request {}, current total usage {}",
-          request.getMemorySize(),
-          request.getSearchIndex(),
-          memoryBlock.getUsedMemoryInBytes());
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Freed {} bytes for request {}, current total usage {}",
+            request.getMemorySize(),
+            request.getSearchIndex(),
+            memoryBlock.getUsedMemoryInBytes());
+      }
     }
   }
 
   public void free(Batch batch) {
     free(batch.getMemorySize(), false);
-    logger.info(
-        "Freed {} bytes for batch {}-{}, current total usage {}",
-        batch.getMemorySize(),
-        batch.getStartIndex(),
-        batch.getEndIndex(),
-        getMemorySizeInByte());
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Freed {} bytes for batch {}-{}, current total usage {}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          getMemorySizeInByte());
+    }
   }
 
   private void free(long size, boolean fromQueue) {
@@ -143,6 +153,28 @@ public class IoTConsensusMemoryManager {
     this.maxMemoryRatioForQueue = maxMemoryRatioForQueue;
   }
 
+  @TestOnly
+  public void reset() {
+    this.memoryBlock.release(this.memoryBlock.getUsedMemoryInBytes());
+    this.queueMemorySizeInByte.set(0);
+    this.syncMemorySizeInByte.set(0);
+  }
+
+  @TestOnly
+  public IMemoryBlock getMemoryBlock() {
+    return memoryBlock;
+  }
+
+  @TestOnly
+  public void setMemoryBlock(IMemoryBlock memoryBlock) {
+    this.memoryBlock = memoryBlock;
+  }
+
+  @TestOnly
+  public Double getMaxMemoryRatioForQueue() {
+    return maxMemoryRatioForQueue;
+  }
+
   long getMemorySizeInByte() {
     return memoryBlock.getUsedMemoryInBytes();
   }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java
new file mode 100644
index 00000000000..3d89943772e
--- /dev/null
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.iot.logdispatcher;
+
+import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock;
+import org.apache.iotdb.commons.memory.IMemoryBlock;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class IoTConsensusMemoryManagerTest {
+
+  private IMemoryBlock previousMemoryBlock;
+  private long memoryBlockSize = 16 * 1024L;
+
+  @Before
+  public void setUp() throws Exception {
+    previousMemoryBlock = 
IoTConsensusMemoryManager.getInstance().getMemoryBlock();
+    IoTConsensusMemoryManager.getInstance()
+        .setMemoryBlock(new AtomicLongMemoryBlock("Test", null, 
memoryBlockSize));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    
IoTConsensusMemoryManager.getInstance().setMemoryBlock(previousMemoryBlock);
+  }
+
+  @Test
+  public void testSingleReserveAndRelease() {
+    testReserveAndRelease(1);
+  }
+
+  @Test
+  public void testMultiReserveAndRelease() {
+    testReserveAndRelease(3);
+  }
+
+  private void testReserveAndRelease(int numReservation) {
+    int allocationSize = 1;
+    long allocatedSize = 0;
+    List<IndexedConsensusRequest> requestList = new ArrayList<>();
+    while (true) {
+      IndexedConsensusRequest request =
+          new IndexedConsensusRequest(
+              0,
+              Collections.singletonList(
+                  new 
ByteBufferConsensusRequest(ByteBuffer.allocate(allocationSize))));
+      request.buildSerializedRequests();
+      if (allocatedSize + allocationSize
+          <= memoryBlockSize
+              * 
IoTConsensusMemoryManager.getInstance().getMaxMemoryRatioForQueue()) {
+        for (int i = 0; i < numReservation; i++) {
+          assertTrue(IoTConsensusMemoryManager.getInstance().reserve(request));
+          requestList.add(request);
+        }
+      } else {
+        for (int i = 0; i < numReservation; i++) {
+          
assertFalse(IoTConsensusMemoryManager.getInstance().reserve(request));
+        }
+        break;
+      }
+      allocatedSize += allocationSize;
+    }
+
+    assertTrue(
+        IoTConsensusMemoryManager.getInstance().getMemorySizeInByte()
+            <= memoryBlockSize
+                * 
IoTConsensusMemoryManager.getInstance().getMaxMemoryRatioForQueue());
+    for (IndexedConsensusRequest indexedConsensusRequest : requestList) {
+      IoTConsensusMemoryManager.getInstance().free(indexedConsensusRequest);
+    }
+    assertEquals(0, 
IoTConsensusMemoryManager.getInstance().getMemorySizeInByte());
+  }
+}

Reply via email to