This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch fix_iot_memory_2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 27d8b27947521a9c9c0bbaacf444d2859781a324 Author: Tian Jiang <[email protected]> AuthorDate: Wed Aug 13 17:29:30 2025 +0800 Fix ref count of IoTConsensus request not decreased in allocation failure --- .../logdispatcher/IoTConsensusMemoryManager.java | 88 ++++++++++++------ .../IoTConsensusMemoryManagerTest.java | 103 +++++++++++++++++++++ 2 files changed, 163 insertions(+), 28 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java index 9a9d38946fe..17074d5e66c 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java @@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.iot.logdispatcher; import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock; import org.apache.iotdb.commons.memory.IMemoryBlock; import org.apache.iotdb.commons.service.metric.MetricService; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; import org.slf4j.Logger; @@ -46,38 +47,43 @@ public class IoTConsensusMemoryManager { if (prevRef == 0) { boolean reserved = reserve(request.getMemorySize(), true); if (reserved) { - logger.info( - "Reserving {} bytes for request {} succeeds, current total usage {}", - request.getMemorySize(), - request.getSearchIndex(), - memoryBlock.getUsedMemoryInBytes()); + if (logger.isDebugEnabled()) { + logger.debug( + "Reserving {} bytes for request {} succeeds, current total usage {}", + request.getMemorySize(), + request.getSearchIndex(), + memoryBlock.getUsedMemoryInBytes()); + } } else { - logger.info( - "Reserving {} bytes for request {} fails, current total usage {}", - request.getMemorySize(), - request.getSearchIndex(), - memoryBlock.getUsedMemoryInBytes()); + request.decRef(); + if (logger.isDebugEnabled()) { + logger.debug( + "Reserving {} bytes for request {} fails, current total usage {}", + request.getMemorySize(), + request.getSearchIndex(), + memoryBlock.getUsedMemoryInBytes()); + } } return reserved; - } else { - logger.info( + } else if (logger.isDebugEnabled()) { + logger.debug( "Skip memory reservation for {} because its ref count is not 0", request.getSearchIndex()); - return true; } + return true; } public boolean reserve(Batch batch) { boolean reserved = reserve(batch.getMemorySize(), false); - if (reserved) { - logger.info( + if (reserved && logger.isDebugEnabled()) { + logger.debug( "Reserving {} bytes for batch {}-{} succeeds, current total usage {}", batch.getMemorySize(), batch.getStartIndex(), batch.getEndIndex(), memoryBlock.getUsedMemoryInBytes()); - } else { - logger.info( + } else if (logger.isDebugEnabled()) { + logger.debug( "Reserving {} bytes for batch {}-{} fails, current total usage {}", batch.getMemorySize(), batch.getStartIndex(), @@ -106,22 +112,26 @@ public class IoTConsensusMemoryManager { long prevRef = request.decRef(); if (prevRef == 1) { free(request.getMemorySize(), true); - logger.info( - "Freed {} bytes for request {}, current total usage {}", - request.getMemorySize(), - request.getSearchIndex(), - memoryBlock.getUsedMemoryInBytes()); + if (logger.isDebugEnabled()) { + logger.debug( + "Freed {} bytes for request {}, current total usage {}", + request.getMemorySize(), + request.getSearchIndex(), + memoryBlock.getUsedMemoryInBytes()); + } } } public void free(Batch batch) { free(batch.getMemorySize(), false); - logger.info( - "Freed {} bytes for batch {}-{}, current total usage {}", - batch.getMemorySize(), - batch.getStartIndex(), - batch.getEndIndex(), - getMemorySizeInByte()); + if (logger.isDebugEnabled()) { + logger.debug( + "Freed {} bytes for batch {}-{}, current total usage {}", + batch.getMemorySize(), + batch.getStartIndex(), + batch.getEndIndex(), + getMemorySizeInByte()); + } } private void free(long size, boolean fromQueue) { @@ -143,6 +153,28 @@ public class IoTConsensusMemoryManager { this.maxMemoryRatioForQueue = maxMemoryRatioForQueue; } + @TestOnly + public void reset() { + this.memoryBlock.release(this.memoryBlock.getUsedMemoryInBytes()); + this.queueMemorySizeInByte.set(0); + this.syncMemorySizeInByte.set(0); + } + + @TestOnly + public IMemoryBlock getMemoryBlock() { + return memoryBlock; + } + + @TestOnly + public void setMemoryBlock(IMemoryBlock memoryBlock) { + this.memoryBlock = memoryBlock; + } + + @TestOnly + public Double getMaxMemoryRatioForQueue() { + return maxMemoryRatioForQueue; + } + long getMemorySizeInByte() { return memoryBlock.getUsedMemoryInBytes(); } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java new file mode 100644 index 00000000000..3d89943772e --- /dev/null +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.consensus.iot.logdispatcher; + +import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock; +import org.apache.iotdb.commons.memory.IMemoryBlock; +import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; +import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class IoTConsensusMemoryManagerTest { + + private IMemoryBlock previousMemoryBlock; + private long memoryBlockSize = 16 * 1024L; + + @Before + public void setUp() throws Exception { + previousMemoryBlock = IoTConsensusMemoryManager.getInstance().getMemoryBlock(); + IoTConsensusMemoryManager.getInstance() + .setMemoryBlock(new AtomicLongMemoryBlock("Test", null, memoryBlockSize)); + } + + @After + public void tearDown() throws Exception { + IoTConsensusMemoryManager.getInstance().setMemoryBlock(previousMemoryBlock); + } + + @Test + public void testSingleReserveAndRelease() { + testReserveAndRelease(1); + } + + @Test + public void testMultiReserveAndRelease() { + testReserveAndRelease(3); + } + + private void testReserveAndRelease(int numReservation) { + int allocationSize = 1; + long allocatedSize = 0; + List<IndexedConsensusRequest> requestList = new ArrayList<>(); + while (true) { + IndexedConsensusRequest request = + new IndexedConsensusRequest( + 0, + Collections.singletonList( + new ByteBufferConsensusRequest(ByteBuffer.allocate(allocationSize)))); + request.buildSerializedRequests(); + if (allocatedSize + allocationSize + <= memoryBlockSize + * IoTConsensusMemoryManager.getInstance().getMaxMemoryRatioForQueue()) { + for (int i = 0; i < numReservation; i++) { + assertTrue(IoTConsensusMemoryManager.getInstance().reserve(request)); + requestList.add(request); + } + } else { + for (int i = 0; i < numReservation; i++) { + assertFalse(IoTConsensusMemoryManager.getInstance().reserve(request)); + } + break; + } + allocatedSize += allocationSize; + } + + assertTrue( + IoTConsensusMemoryManager.getInstance().getMemorySizeInByte() + <= memoryBlockSize + * IoTConsensusMemoryManager.getInstance().getMaxMemoryRatioForQueue()); + for (IndexedConsensusRequest indexedConsensusRequest : requestList) { + IoTConsensusMemoryManager.getInstance().free(indexedConsensusRequest); + } + assertEquals(0, IoTConsensusMemoryManager.getInstance().getMemorySizeInByte()); + } +}
