This is an automated email from the ASF dual-hosted git repository. Wei-hao-Li pushed a commit to branch lwh/fixMemoryBlock in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 85ca0d1893d38b8543b28bc75338852d4867413c Author: Weihao Li <[email protected]> AuthorDate: Thu Apr 30 10:38:49 2026 +0800 fix allocate of MemoryBlock Signed-off-by: Weihao Li <[email protected]> --- .../commons/memory/AtomicLongMemoryBlock.java | 41 +++++++++++----------- .../iotdb/commons/memory/MemoryBlockTest.java | 41 ++++++++++++++++++++++ 2 files changed, 61 insertions(+), 21 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java index 1376d8ddb6d..9663eb0e689 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AtomicLongMemoryBlock.java @@ -23,7 +23,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class AtomicLongMemoryBlock extends IMemoryBlock { @@ -58,30 +57,30 @@ public class AtomicLongMemoryBlock extends IMemoryBlock { @Override public boolean allocate(long sizeInByte) { - AtomicBoolean result = new AtomicBoolean(false); - usedMemoryInBytes.updateAndGet( - memCost -> { - if (memCost + sizeInByte > totalMemorySizeInBytes) { - return memCost; - } - result.set(true); - return memCost + sizeInByte; - }); - return result.get(); + long prev; + long next; + do { + prev = usedMemoryInBytes.get(); + if (prev + sizeInByte > totalMemorySizeInBytes) { + return false; + } + next = prev + sizeInByte; + } while (!usedMemoryInBytes.compareAndSet(prev, next)); + return true; } @Override public boolean allocateIfSufficient(final long sizeInByte, final double maxRatio) { - AtomicBoolean result = new AtomicBoolean(false); - usedMemoryInBytes.updateAndGet( - memCost -> { - if (memCost + sizeInByte > totalMemorySizeInBytes * maxRatio) { - return memCost; - } - result.set(true); - return memCost + sizeInByte; - }); - return result.get(); + long prev; + long next; + do { + prev = usedMemoryInBytes.get(); + if (prev + sizeInByte > totalMemorySizeInBytes * maxRatio) { + return false; + } + next = prev + sizeInByte; + } while (!usedMemoryInBytes.compareAndSet(prev, next)); + return true; } @Override diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryBlockTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryBlockTest.java index dee41447aad..9291858c4a6 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryBlockTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryBlockTest.java @@ -21,6 +21,9 @@ package org.apache.iotdb.commons.memory; import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + public class MemoryBlockTest { @Test @@ -64,4 +67,42 @@ public class MemoryBlockTest { memoryBlock1.close(); memoryBlock2.close(); } + + @Test + public void testAllocateNoPhantomSuccessUnderContention() throws InterruptedException { + final int rounds = 2000; + for (int i = 0; i < rounds; i++) { + AtomicLongMemoryBlock memoryBlock = new AtomicLongMemoryBlock("ContentionBlock", null, 1); + AtomicInteger successCount = new AtomicInteger(0); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(2); + + Runnable allocateTask = + () -> { + try { + startLatch.await(); + if (memoryBlock.allocate(1)) { + successCount.incrementAndGet(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } finally { + doneLatch.countDown(); + } + }; + + Thread t1 = new Thread(allocateTask, "memory-block-test-1"); + Thread t2 = new Thread(allocateTask, "memory-block-test-2"); + t1.start(); + t2.start(); + startLatch.countDown(); + doneLatch.await(); + + Assert.assertEquals(1, successCount.get()); + Assert.assertEquals(1, memoryBlock.getUsedMemoryInBytes()); + Assert.assertEquals(0, memoryBlock.release(1)); + Assert.assertEquals(0, memoryBlock.getUsedMemoryInBytes()); + } + } }
