This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch fix_negative_iot_queue_size_and_delete_empty_file in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ef499339a842512d6c79f0cc1af775d19ae8fb82 Author: Tian Jiang <[email protected]> AuthorDate: Wed Jul 23 17:18:29 2025 +0800 Fix double memory free of iotconsensus queue request during region deletion --- .../common/request/IndexedConsensusRequest.java | 10 ++++++++++ .../logdispatcher/IoTConsensusMemoryManager.java | 21 +++++++++++++++++++++ .../consensus/iot/logdispatcher/LogDispatcher.java | 22 ++++++++++++++++------ 3 files changed, 47 insertions(+), 6 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java index 1147abc049e..2bf01d4ef86 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; /** only used for iot consensus. */ public class IndexedConsensusRequest implements IConsensusRequest { @@ -34,6 +35,7 @@ public class IndexedConsensusRequest implements IConsensusRequest { private final List<IConsensusRequest> requests; private final List<ByteBuffer> serializedRequests; private long memorySize = 0; + private AtomicLong referenceCnt = new AtomicLong(); public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) { this.searchIndex = searchIndex; @@ -100,4 +102,12 @@ public class IndexedConsensusRequest implements IConsensusRequest { public int hashCode() { return Objects.hash(searchIndex, requests); } + + public long incRef() { + return referenceCnt.getAndIncrement(); + } + + public long decRef() { + return referenceCnt.getAndDecrement(); + } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java index 7bee00588cd..03ef5f3a5f1 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock; import org.apache.iotdb.commons.memory.IMemoryBlock; import org.apache.iotdb.commons.service.metric.MetricService; +import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,17 @@ public class IoTConsensusMemoryManager { MetricService.getInstance().addMetricSet(new IoTConsensusMemoryManagerMetrics(this)); } + public boolean reserve(IndexedConsensusRequest request, boolean fromQueue) { + synchronized (request) { + long prevRef = request.incRef(); + if (prevRef == 0) { + return reserve(request.getMemorySize(), fromQueue); + } else { + return true; + } + } + } + public boolean reserve(long size, boolean fromQueue) { boolean result = fromQueue @@ -55,6 +67,15 @@ public class IoTConsensusMemoryManager { return result; } + public void free(IndexedConsensusRequest request, boolean fromQueue) { + synchronized (request) { + long prevRef = request.decRef(); + if (prevRef == 0) { + free(request.getMemorySize(), fromQueue); + } + } + } + public void free(long size, boolean fromQueue) { long currentUsedMemory = memoryBlock.release(size); if (fromQueue) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index e196df43209..106c5b743a3 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -280,7 +280,7 @@ public class LogDispatcher { /** try to offer a request into queue with memory control. */ public boolean offer(IndexedConsensusRequest indexedConsensusRequest) { - if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getMemorySize(), true)) { + if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest, true)) { return false; } boolean success; @@ -288,19 +288,19 @@ public class LogDispatcher { success = pendingEntries.offer(indexedConsensusRequest); } catch (Throwable t) { // If exception occurs during request offer, the reserved memory should be released - iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true); + iotConsensusMemoryManager.free(indexedConsensusRequest, true); throw t; } if (!success) { // If offer failed, the reserved memory should be released - iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true); + iotConsensusMemoryManager.free(indexedConsensusRequest, true); } return success; } /** try to remove a request from queue with memory control. */ private void releaseReservedMemory(IndexedConsensusRequest indexedConsensusRequest) { - iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true); + iotConsensusMemoryManager.free(indexedConsensusRequest, true); } public void stop() { @@ -322,13 +322,23 @@ public class LogDispatcher { } long requestSize = 0; for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) { - requestSize += indexedConsensusRequest.getMemorySize(); + synchronized (indexedConsensusRequest) { + long prevRef = indexedConsensusRequest.decRef(); + if (prevRef == 1) { + requestSize += indexedConsensusRequest.getMemorySize(); + } + } } pendingEntries.clear(); iotConsensusMemoryManager.free(requestSize, true); requestSize = 0; for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) { - requestSize += indexedConsensusRequest.getMemorySize(); + synchronized (indexedConsensusRequest) { + long prevRef = indexedConsensusRequest.decRef(); + if (prevRef == 1) { + requestSize += indexedConsensusRequest.getMemorySize(); + } + } } iotConsensusMemoryManager.free(requestSize, true); syncStatus.free();
