This is an automated email from the ASF dual-hosted git repository. spricoder pushed a commit to branch feature/memory_collect in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5b5fe775f65a34a21f6d5d237c92dd29ea42ffbd Author: spricoder <[email protected]> AuthorDate: Thu Jan 16 16:06:21 2025 +0800 add version2 memory --- .../{IIoTDBMemoryBlock.java => IMemoryBlock.java} | 38 ++++-- .../{IoTDBMemoryBlock.java => MemoryBlock.java} | 52 ++------- ...DBMemoryBlockType.java => MemoryBlockType.java} | 2 +- ...DBMemoryException.java => MemoryException.java} | 14 +-- ...{IoTDBMemoryManager.java => MemoryManager.java} | 130 ++++++++++++--------- 5 files changed, 124 insertions(+), 112 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IIoTDBMemoryBlock.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java similarity index 61% rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IIoTDBMemoryBlock.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java index cbccd7c0fca..92ccd40a772 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IIoTDBMemoryBlock.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java @@ -22,33 +22,55 @@ package org.apache.iotdb.commons.memory; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -public abstract class IIoTDBMemoryBlock implements AutoCloseable { - protected IoTDBMemoryManager memoryManager; - protected IoTDBMemoryBlockType memoryBlockType; +public abstract class IMemoryBlock implements AutoCloseable { + /** The memory manager that manages this memory block */ + protected MemoryManager memoryManager; + + /** The reentrant lock of memory block */ protected final ReentrantLock lock = new ReentrantLock(); + + /** The type of this memory block */ + protected MemoryBlockType memoryBlockType; + + /** The maximum memory size in byte of this memory block */ protected long maxMemorySizeInByte = 0; + + /** The memory usage in byte of this memory block */ protected final AtomicLong memoryUsageInBytes = new AtomicLong(0); + + /** The flag that indicates whether this memory block is released */ protected volatile boolean isReleased = false; - public abstract boolean useMemory(final long size); + /** Try to record memory managed by this memory block */ + public abstract void recordMemory(final long size); + /** Update maximum memory size in byte of this memory block */ + public void setMaxMemorySizeInByte(final long maxMemorySizeInByte) { + this.maxMemorySizeInByte = maxMemorySizeInByte; + } + + /** Get the maximum memory size in byte of this memory block */ public long getMaxMemorySizeInByte() { return maxMemorySizeInByte; } + /** Get the memory usage in byte of this memory block */ public long getMemoryUsageInBytes() { return memoryUsageInBytes.get(); } - public void setMaxMemorySizeInByte(final long maxMemorySizeInByte) { - this.maxMemorySizeInByte = maxMemorySizeInByte; - } - + /** Get whether this memory block is released */ public boolean isReleased() { return isReleased; } + /** Mark this memory block as released */ public void markAsReleased() { isReleased = true; } + + /** Get the type of this memory block */ + public MemoryBlockType getMemoryBlockType() { + return memoryBlockType; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTDBMemoryBlock.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlock.java similarity index 60% rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTDBMemoryBlock.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlock.java index 76d2754ed31..98fe531db5e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTDBMemoryBlock.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlock.java @@ -23,56 +23,28 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -public class IoTDBMemoryBlock extends IIoTDBMemoryBlock { - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBMemoryBlock.class); +public class MemoryBlock extends IMemoryBlock { + private static final Logger LOGGER = LoggerFactory.getLogger(MemoryBlock.class); - public IoTDBMemoryBlock(final IoTDBMemoryManager memoryManager, final long maxMemorySizeInByte) { + public MemoryBlock(final MemoryManager memoryManager, final long maxMemorySizeInByte) { this.memoryManager = memoryManager; this.maxMemorySizeInByte = maxMemorySizeInByte; - this.memoryBlockType = IoTDBMemoryBlockType.NONE; + this.memoryBlockType = MemoryBlockType.NONE; } - public IoTDBMemoryBlock( - final IoTDBMemoryManager memoryManager, + public MemoryBlock( + final MemoryManager memoryManager, final long maxMemorySizeInByte, - final IoTDBMemoryBlockType memoryBlockType) { + final MemoryBlockType memoryBlockType) { this.memoryManager = memoryManager; this.maxMemorySizeInByte = maxMemorySizeInByte; this.memoryBlockType = memoryBlockType; } @Override - public boolean useMemory(final long size) { - if (size <= 0) { - memoryUsageInBytes.addAndGet(-size); - return true; - } else { - - AtomicBoolean result = new AtomicBoolean(false); - memoryUsageInBytes.updateAndGet( - memorySize -> { - if (size > maxMemorySizeInByte - memorySize) { - LOGGER.debug( - "consensus memory limited. required: {}, used: {}, total: {}", - size, - memorySize, - maxMemorySizeInByte); - result.set(false); - return memorySize; - } else { - LOGGER.debug( - "{} add {} bytes, total memory size: {} bytes.", - Thread.currentThread().getName(), - size, - memorySize + size); - result.set(true); - return memorySize + size; - } - }); - return result.get(); - } + public void recordMemory(final long size) { + memoryUsageInBytes.addAndGet(size); } @Override @@ -80,9 +52,9 @@ public class IoTDBMemoryBlock extends IIoTDBMemoryBlock { return "IoTDBMemoryBlock{" + "memoryBlockType=" + memoryBlockType - + "maxMemorySizeInByte=" + + ", maxMemorySizeInByte=" + maxMemorySizeInByte - + "memoryUsageInBytes=" + + ", memoryUsageInBytes=" + memoryUsageInBytes + ", isReleased=" + isReleased @@ -95,7 +67,7 @@ public class IoTDBMemoryBlock extends IIoTDBMemoryBlock { while (true) { try { - if (lock.tryLock(50, TimeUnit.MICROSECONDS)) { + if (lock.tryLock(100, TimeUnit.MICROSECONDS)) { try { memoryManager.release(this); if (isInterrupted) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTDBMemoryBlockType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlockType.java similarity index 96% rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTDBMemoryBlockType.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlockType.java index 0b12b77c400..38e78659b1b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTDBMemoryBlockType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlockType.java @@ -19,7 +19,7 @@ package org.apache.iotdb.commons.memory; -public enum IoTDBMemoryBlockType { +public enum MemoryBlockType { NONE, FUNCTION, PERFORMANCE, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTDBMemoryException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryException.java similarity index 81% rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTDBMemoryException.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryException.java index 0a4a1f89ea4..915915518fb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTDBMemoryException.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryException.java @@ -26,22 +26,22 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Objects; -public class IoTDBMemoryException extends RuntimeException { +public class MemoryException extends RuntimeException { private final long timestamp; private static final long VERSION = 1L; - public IoTDBMemoryException(final String message) { + public MemoryException(final String message) { super(message); this.timestamp = System.currentTimeMillis(); } - public IoTDBMemoryException(final String message, final long timeStamp) { + public MemoryException(final String message, final long timeStamp) { super(message); this.timestamp = timeStamp; } - public IoTDBMemoryException(final String message, final Throwable cause) { + public MemoryException(final String message, final Throwable cause) { super(message, cause); this.timestamp = System.currentTimeMillis(); } @@ -52,9 +52,9 @@ public class IoTDBMemoryException extends RuntimeException { @Override public boolean equals(Object obj) { - return obj instanceof IoTDBMemoryException - && Objects.equals(getMessage(), ((IoTDBMemoryException) obj).getMessage()) - && Objects.equals(getTimestamp(), ((IoTDBMemoryException) obj).getTimestamp()); + return obj instanceof MemoryException + && Objects.equals(getMessage(), ((MemoryException) obj).getMessage()) + && Objects.equals(getTimestamp(), ((MemoryException) obj).getTimestamp()); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTDBMemoryManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java similarity index 53% rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTDBMemoryManager.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java index 45c453ca571..86074ef1f70 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTDBMemoryManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java @@ -26,43 +26,55 @@ import java.util.HashSet; import java.util.Set; import java.util.function.LongUnaryOperator; -public class IoTDBMemoryManager { - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBMemoryManager.class); - // TODO spricoder: make it configurable +public class MemoryManager { + private static final Logger LOGGER = LoggerFactory.getLogger(MemoryManager.class); + + // TODO @spricoder: make it configurable + /** Whether memory management is enabled */ private static final boolean ENABLED = false; - /** max retry times for memory allocation */ + /** Max retry times for memory allocation */ private static final int MEMORY_ALLOCATE_MAX_RETRIES = 3; - /** retry interval for memory allocation */ + /** Retry interval for memory allocation */ private static final long MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS = 1000; - /** min memory size to allocate */ + /** Min memory size to allocate */ private static final long MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES = 32; + /** Total memory size in byte of memory manager */ private long totalMemorySizeInBytes = 0L; - private long usedMemorySizeInBytes = 0L; - private IoTDBMemoryManager parentMemoryManager; - private final Set<IoTDBMemoryManager> childrens = new HashSet<>(); - private final Set<IoTDBMemoryBlock> allocatedMemoryBlocks = new HashSet<>(); - public IoTDBMemoryManager(IoTDBMemoryManager parentMemoryManager, long totalMemorySizeInBytes) { + /** Allocated memory size to allocate */ + private long allocatedMemorySizeInBytes = 0L; + + /** Parent memory manager, used to apply for memory */ + private final MemoryManager parentMemoryManager; + + /** Child memory manager, used to statistic memory */ + private final Set<MemoryManager> childrens = new HashSet<>(); + + /** Allocated memory blocks of this memory manager */ + private final Set<MemoryBlock> allocatedMemoryBlocks = new HashSet<>(); + + public MemoryManager(MemoryManager parentMemoryManager, long totalMemorySizeInBytes) { this.parentMemoryManager = parentMemoryManager; this.parentMemoryManager.addChildMemoryManager(this); this.totalMemorySizeInBytes = totalMemorySizeInBytes; } - private IoTDBMemoryBlock forceAllocate(long sizeInBytes, IoTDBMemoryBlockType type) { + /** Try to force allocate memory block with specified size in bytes. */ + private MemoryBlock forceAllocate(long sizeInBytes, MemoryBlockType type) { if (!ENABLED) { - return new IoTDBMemoryBlock(this, sizeInBytes, type); + return new MemoryBlock(this, sizeInBytes, type); } for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) { - if (totalMemorySizeInBytes - usedMemorySizeInBytes >= sizeInBytes) { - return registerMemoryBlock(sizeInBytes); + if (totalMemorySizeInBytes - allocatedMemorySizeInBytes >= sizeInBytes) { + return registerMemoryBlock(sizeInBytes, type); } try { - // TODO spricoder: try to find more memory + // TODO @spricoder: consider to find more memory in active way Thread.sleep(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -70,37 +82,36 @@ public class IoTDBMemoryManager { } } - throw new IoTDBMemoryException( + throw new MemoryException( String.format( "forceAllocate: failed to allocate memory after %d retries, " + "total memory size %d bytes, used memory size %d bytes, " + "requested memory size %d bytes", MEMORY_ALLOCATE_MAX_RETRIES, totalMemorySizeInBytes, - usedMemorySizeInBytes, + allocatedMemorySizeInBytes, sizeInBytes)); } - public synchronized IoTDBMemoryBlock forceAllocateIfSufficient( - long sizeInBytes, float usedThreshold) { + /** Try to force allocate memory block with specified size in bytes when memory is sufficient. */ + public synchronized MemoryBlock forceAllocateIfSufficient(long sizeInBytes, float usedThreshold) { if (usedThreshold < 0.0f || usedThreshold > 1.0f) { return null; } - if (!ENABLED) { - return new IoTDBMemoryBlock(this, sizeInBytes); + return new MemoryBlock(this, sizeInBytes); } - if (totalMemorySizeInBytes - usedMemorySizeInBytes >= sizeInBytes - && (float) usedMemorySizeInBytes / totalMemorySizeInBytes < usedThreshold) { - return forceAllocate(sizeInBytes, IoTDBMemoryBlockType.NONE); + if (totalMemorySizeInBytes - allocatedMemorySizeInBytes >= sizeInBytes + && (float) allocatedMemorySizeInBytes / totalMemorySizeInBytes < usedThreshold) { + return forceAllocate(sizeInBytes, MemoryBlockType.NONE); } else { - // TODO spricoder: try to find more memory + // TODO @spricoder: consider to find more memory in active way LOGGER.debug( "forceAllocateIfSufficient: failed to allocate memory, " + "total memory size {} bytes, used memory size {} bytes, " + "requested memory size {} bytes, used threshold {}", totalMemorySizeInBytes, - usedMemorySizeInBytes, + allocatedMemorySizeInBytes, sizeInBytes, usedThreshold); } @@ -108,40 +119,30 @@ public class IoTDBMemoryManager { return null; } - private IoTDBMemoryBlock registerMemoryBlock(long sizeInBytes) { - return registerMemoryBlock(sizeInBytes, IoTDBMemoryBlockType.NONE); - } - - private IoTDBMemoryBlock registerMemoryBlock(long sizeInBytes, IoTDBMemoryBlockType type) { - usedMemorySizeInBytes += sizeInBytes; - final IoTDBMemoryBlock memoryBlock = new IoTDBMemoryBlock(this, sizeInBytes, type); - allocatedMemoryBlocks.add(memoryBlock); - return memoryBlock; - } - - public synchronized IoTDBMemoryBlock tryAllocate( - long sizeInBytes, LongUnaryOperator customAllocateStrategy) { + /** Try to allocate memory block with customAllocateStrategy */ + public synchronized MemoryBlock tryAllocate( + long sizeInBytes, LongUnaryOperator customAllocateStrategy, MemoryBlockType type) { if (!ENABLED) { - return new IoTDBMemoryBlock(this, sizeInBytes); + return new MemoryBlock(this, sizeInBytes); } - if (totalMemorySizeInBytes - usedMemorySizeInBytes >= sizeInBytes) { - return registerMemoryBlock(sizeInBytes); + if (totalMemorySizeInBytes - allocatedMemorySizeInBytes >= sizeInBytes) { + return registerMemoryBlock(sizeInBytes, type); } long sizeToAllocateInBytes = sizeInBytes; while (sizeToAllocateInBytes > MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES) { - if (totalMemorySizeInBytes - usedMemorySizeInBytes >= sizeToAllocateInBytes) { + if (totalMemorySizeInBytes - allocatedMemorySizeInBytes >= sizeToAllocateInBytes) { LOGGER.info( "tryAllocate: allocated memory, " + "total memory size {} bytes, used memory size {} bytes, " + "original requested memory size {} bytes, " + "actual requested memory size {} bytes", totalMemorySizeInBytes, - usedMemorySizeInBytes, + allocatedMemorySizeInBytes, sizeInBytes, sizeToAllocateInBytes); - return registerMemoryBlock(sizeToAllocateInBytes); + return registerMemoryBlock(sizeToAllocateInBytes, type); } sizeToAllocateInBytes = @@ -150,41 +151,58 @@ public class IoTDBMemoryManager { MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES); } - // TODO spricoder: try to shrink first + // TODO @spricoder: consider to find more memory in active way LOGGER.warn( "tryAllocate: failed to allocate memory, " + "total memory size {} bytes, used memory size {} bytes, " + "requested memory size {} bytes", totalMemorySizeInBytes, - usedMemorySizeInBytes, + allocatedMemorySizeInBytes, sizeInBytes); - return registerMemoryBlock(0); + return registerMemoryBlock(0, type); + } + + /** Try to register memory block with specified size in bytes. */ + private MemoryBlock registerMemoryBlock(long sizeInBytes, MemoryBlockType type) { + allocatedMemorySizeInBytes += sizeInBytes; + final MemoryBlock memoryBlock = new MemoryBlock(this, sizeInBytes, type); + allocatedMemoryBlocks.add(memoryBlock); + return memoryBlock; } - public synchronized void release(IoTDBMemoryBlock block) { + /** Release memory block. */ + public synchronized void release(MemoryBlock block) { if (!ENABLED || block == null || block.isReleased()) { return; } - allocatedMemoryBlocks.remove(block); - usedMemorySizeInBytes -= block.getMemoryUsageInBytes(); block.markAsReleased(); + allocatedMemorySizeInBytes -= block.getMemoryUsageInBytes(); + allocatedMemoryBlocks.remove(block); this.notifyAll(); } - public synchronized void addChildMemoryManager(IoTDBMemoryManager childMemoryManager) { + public synchronized void addChildMemoryManager(MemoryManager childMemoryManager) { if (childMemoryManager != null) { childrens.add(childMemoryManager); } } + public long getFreeMemorySizeInBytes() { + return totalMemorySizeInBytes - allocatedMemorySizeInBytes; + } + public long getUsedMemorySizeInBytes() { - return usedMemorySizeInBytes; + long memoryBlockSize = + allocatedMemoryBlocks.stream().mapToLong(MemoryBlock::getMemoryUsageInBytes).sum(); + long childrenMemorySize = + childrens.stream().mapToLong(MemoryManager::getUsedMemorySizeInBytes).sum(); + return memoryBlockSize + childrenMemorySize; } - public long getFreeMemorySizeInBytes() { - return totalMemorySizeInBytes - usedMemorySizeInBytes; + public long getAllocatedMemorySizeInBytes() { + return allocatedMemorySizeInBytes; } public long getTotalMemorySizeInBytes() {
