This is an automated email from the ASF dual-hosted git repository.

Wei-hao-Li pushed a commit to branch lwh/fixMemoryBlock
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 85ca0d1893d38b8543b28bc75338852d4867413c
Author: Weihao Li <[email protected]>
AuthorDate: Thu Apr 30 10:38:49 2026 +0800

    fix allocate of MemoryBlock
    
    Signed-off-by: Weihao Li <[email protected]>
---
 .../commons/memory/AtomicLongMemoryBlock.java      | 41 +++++++++++-----------
 .../iotdb/commons/memory/MemoryBlockTest.java      | 41 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java
index 1376d8ddb6d..9663eb0e689 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java
@@ -23,7 +23,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class AtomicLongMemoryBlock extends IMemoryBlock {
@@ -58,30 +57,30 @@ public class AtomicLongMemoryBlock extends IMemoryBlock {
 
   @Override
   public boolean allocate(long sizeInByte) {
-    AtomicBoolean result = new AtomicBoolean(false);
-    usedMemoryInBytes.updateAndGet(
-        memCost -> {
-          if (memCost + sizeInByte > totalMemorySizeInBytes) {
-            return memCost;
-          }
-          result.set(true);
-          return memCost + sizeInByte;
-        });
-    return result.get();
+    long prev;
+    long next;
+    do {
+      prev = usedMemoryInBytes.get();
+      if (prev + sizeInByte > totalMemorySizeInBytes) {
+        return false;
+      }
+      next = prev + sizeInByte;
+    } while (!usedMemoryInBytes.compareAndSet(prev, next));
+    return true;
   }
 
   @Override
   public boolean allocateIfSufficient(final long sizeInByte, final double 
maxRatio) {
-    AtomicBoolean result = new AtomicBoolean(false);
-    usedMemoryInBytes.updateAndGet(
-        memCost -> {
-          if (memCost + sizeInByte > totalMemorySizeInBytes * maxRatio) {
-            return memCost;
-          }
-          result.set(true);
-          return memCost + sizeInByte;
-        });
-    return result.get();
+    long prev;
+    long next;
+    do {
+      prev = usedMemoryInBytes.get();
+      if (prev + sizeInByte > totalMemorySizeInBytes * maxRatio) {
+        return false;
+      }
+      next = prev + sizeInByte;
+    } while (!usedMemoryInBytes.compareAndSet(prev, next));
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryBlockTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryBlockTest.java
index dee41447aad..9291858c4a6 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryBlockTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryBlockTest.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.commons.memory;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class MemoryBlockTest {
 
   @Test
@@ -64,4 +67,42 @@ public class MemoryBlockTest {
     memoryBlock1.close();
     memoryBlock2.close();
   }
+
+  @Test
+  public void testAllocateNoPhantomSuccessUnderContention() throws 
InterruptedException {
+    final int rounds = 2000;
+    for (int i = 0; i < rounds; i++) {
+      AtomicLongMemoryBlock memoryBlock = new 
AtomicLongMemoryBlock("ContentionBlock", null, 1);
+      AtomicInteger successCount = new AtomicInteger(0);
+      CountDownLatch startLatch = new CountDownLatch(1);
+      CountDownLatch doneLatch = new CountDownLatch(2);
+
+      Runnable allocateTask =
+          () -> {
+            try {
+              startLatch.await();
+              if (memoryBlock.allocate(1)) {
+                successCount.incrementAndGet();
+              }
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new RuntimeException(e);
+            } finally {
+              doneLatch.countDown();
+            }
+          };
+
+      Thread t1 = new Thread(allocateTask, "memory-block-test-1");
+      Thread t2 = new Thread(allocateTask, "memory-block-test-2");
+      t1.start();
+      t2.start();
+      startLatch.countDown();
+      doneLatch.await();
+
+      Assert.assertEquals(1, successCount.get());
+      Assert.assertEquals(1, memoryBlock.getUsedMemoryInBytes());
+      Assert.assertEquals(0, memoryBlock.release(1));
+      Assert.assertEquals(0, memoryBlock.getUsedMemoryInBytes());
+    }
+  }
 }

Reply via email to