This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch fix_memory_leak_in_wal_compression in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a525f355e5a7bf54b940c731b81f2b12eb1e1e31 Author: Tian Jiang <[email protected]> AuthorDate: Sun Apr 27 17:29:29 2025 +0800 Fix memory leak of wal compressed buffer --- .../deletion/persist/PageCacheDeletionBuffer.java | 1 + .../dataregion/wal/buffer/WALBuffer.java | 9 ++++- .../dataregion/wal/io/WALInputStream.java | 2 + .../wal/compression/WALCompressionTest.java | 45 ++++++++++++---------- 4 files changed, 35 insertions(+), 22 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java index 7cb1600c6a5..bf334b0ed8f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java @@ -273,6 +273,7 @@ public class PageCacheDeletionBuffer implements DeletionBuffer { } // clean buffer MmapUtil.clean(serializeBuffer); + serializeBuffer = null; } private void waitUntilFlushAllDeletionsOrTimeOut() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java index 3a53dbf18ae..16be3f8ad08 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java @@ -178,11 +178,12 @@ public class WALBuffer extends AbstractWALBuffer { buffersLock.lock(); try { MmapUtil.clean(workingBuffer); - MmapUtil.clean(workingBuffer); + MmapUtil.clean(idleBuffer); MmapUtil.clean(syncingBuffer); MmapUtil.clean(compressedByteBuffer); workingBuffer = ByteBuffer.allocateDirect(capacity); idleBuffer = ByteBuffer.allocateDirect(capacity); + syncingBuffer = null; compressedByteBuffer = ByteBuffer.allocateDirect(getCompressedByteBufferSize(capacity)); currentWALFileWriter.setCompressedByteBuffer(compressedByteBuffer); } catch (OutOfMemoryError e) { @@ -719,9 +720,13 @@ public class WALBuffer extends AbstractWALBuffer { checkpointManager.close(); MmapUtil.clean(workingBuffer); - MmapUtil.clean(workingBuffer); + MmapUtil.clean(idleBuffer); MmapUtil.clean(syncingBuffer); MmapUtil.clean(compressedByteBuffer); + workingBuffer = null; + idleBuffer = null; + syncingBuffer = null; + compressedByteBuffer = null; } private void shutdownThread(ExecutorService thread, ThreadName threadName) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java index e3f544a8894..1827bfc9365 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -173,6 +173,7 @@ public class WALInputStream extends InputStream implements AutoCloseable { MmapUtil.clean(dataBuffer); MmapUtil.clean(compressedBuffer); dataBuffer = null; + compressedBuffer = null; } @Override @@ -306,6 +307,7 @@ public class WALInputStream extends InputStream implements AutoCloseable { dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize); uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor); MmapUtil.clean(compressedBuffer); + compressedBuffer = null; } else { dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); readWALBufferFromChannel(dataBuffer); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java index 28b95625594..b0a7abd52a3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java @@ -80,6 +80,7 @@ public class WALCompressionTest { FileUtils.delete(walFile); } originalMinCompressionSize = WALTestUtils.getMinCompressionSize(); + WALTestUtils.setMinCompressionSize(0); if (new File(compressionDir).exists()) { FileUtils.forceDelete(new File(compressionDir)); } @@ -125,29 +126,33 @@ public class WALCompressionTest { public void testSkipToGivenPosition() throws QueryProcessException, IllegalPathException, IOException { - LogWriter writer = new WALWriter(walFile); - ByteBuffer buffer = ByteBuffer.allocate(1024 * 4); - List<Pair<Long, Integer>> positionAndEntryPairList = new ArrayList<>(); - int memTableId = 0; - long fileOffset = 0; - for (int i = 0; i < 100; ) { - InsertRowNode insertRowNode = WALTestUtils.getInsertRowNode(devicePath + memTableId, i); - if (buffer.remaining() >= buffer.capacity() / 4) { - int pos = buffer.position(); - insertRowNode.serialize(buffer); - int size = buffer.position() - pos; - positionAndEntryPairList.add(new Pair<>(fileOffset, size)); - fileOffset += size; - i++; - } else { + List<Pair<Long, Integer>> positionAndEntryPairList; + int memTableId; + try (LogWriter writer = new WALWriter(walFile)) { + writer.setCompressedByteBuffer( + ByteBuffer.allocateDirect(WALBuffer.ONE_THIRD_WAL_BUFFER_SIZE)); + ByteBuffer buffer = ByteBuffer.allocate(1024 * 4); + positionAndEntryPairList = new ArrayList<>(); + memTableId = 0; + long fileOffset = 0; + for (int i = 0; i < 100; ) { + InsertRowNode insertRowNode = WALTestUtils.getInsertRowNode(devicePath + memTableId, i); + if (buffer.remaining() >= buffer.capacity() / 4) { + int pos = buffer.position(); + insertRowNode.serialize(buffer); + int size = buffer.position() - pos; + positionAndEntryPairList.add(new Pair<>(fileOffset, size)); + fileOffset += size; + i++; + } else { + writer.write(buffer); + buffer.clear(); + } + } + if (buffer.position() != 0) { writer.write(buffer); - buffer.clear(); } } - if (buffer.position() != 0) { - writer.write(buffer); - } - writer.close(); try (WALInputStream stream = new WALInputStream(walFile)) { for (int i = 0; i < 100; ++i) { Pair<Long, Integer> positionAndNodePair = positionAndEntryPairList.get(i);
