This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch fix_memory_leak_in_wal_compression
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a525f355e5a7bf54b940c731b81f2b12eb1e1e31
Author: Tian Jiang <[email protected]>
AuthorDate: Sun Apr 27 17:29:29 2025 +0800

    Fix memory leak of wal compressed buffer
---
 .../deletion/persist/PageCacheDeletionBuffer.java  |  1 +
 .../dataregion/wal/buffer/WALBuffer.java           |  9 ++++-
 .../dataregion/wal/io/WALInputStream.java          |  2 +
 .../wal/compression/WALCompressionTest.java        | 45 ++++++++++++----------
 4 files changed, 35 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
index 7cb1600c6a5..bf334b0ed8f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
@@ -273,6 +273,7 @@ public class PageCacheDeletionBuffer implements 
DeletionBuffer {
     }
     // clean buffer
     MmapUtil.clean(serializeBuffer);
+    serializeBuffer = null;
   }
 
   private void waitUntilFlushAllDeletionsOrTimeOut() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
index 3a53dbf18ae..16be3f8ad08 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
@@ -178,11 +178,12 @@ public class WALBuffer extends AbstractWALBuffer {
     buffersLock.lock();
     try {
       MmapUtil.clean(workingBuffer);
-      MmapUtil.clean(workingBuffer);
+      MmapUtil.clean(idleBuffer);
       MmapUtil.clean(syncingBuffer);
       MmapUtil.clean(compressedByteBuffer);
       workingBuffer = ByteBuffer.allocateDirect(capacity);
       idleBuffer = ByteBuffer.allocateDirect(capacity);
+      syncingBuffer = null;
       compressedByteBuffer = 
ByteBuffer.allocateDirect(getCompressedByteBufferSize(capacity));
       currentWALFileWriter.setCompressedByteBuffer(compressedByteBuffer);
     } catch (OutOfMemoryError e) {
@@ -719,9 +720,13 @@ public class WALBuffer extends AbstractWALBuffer {
     checkpointManager.close();
 
     MmapUtil.clean(workingBuffer);
-    MmapUtil.clean(workingBuffer);
+    MmapUtil.clean(idleBuffer);
     MmapUtil.clean(syncingBuffer);
     MmapUtil.clean(compressedByteBuffer);
+    workingBuffer = null;
+    idleBuffer = null;
+    syncingBuffer = null;
+    compressedByteBuffer = null;
   }
 
   private void shutdownThread(ExecutorService thread, ThreadName threadName) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index e3f544a8894..1827bfc9365 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -173,6 +173,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
     MmapUtil.clean(dataBuffer);
     MmapUtil.clean(compressedBuffer);
     dataBuffer = null;
+    compressedBuffer = null;
   }
 
   @Override
@@ -306,6 +307,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
         dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
         uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor);
         MmapUtil.clean(compressedBuffer);
+        compressedBuffer = null;
       } else {
         dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
         readWALBufferFromChannel(dataBuffer);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
index 28b95625594..b0a7abd52a3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
@@ -80,6 +80,7 @@ public class WALCompressionTest {
       FileUtils.delete(walFile);
     }
     originalMinCompressionSize = WALTestUtils.getMinCompressionSize();
+    WALTestUtils.setMinCompressionSize(0);
     if (new File(compressionDir).exists()) {
       FileUtils.forceDelete(new File(compressionDir));
     }
@@ -125,29 +126,33 @@ public class WALCompressionTest {
 
   public void testSkipToGivenPosition()
       throws QueryProcessException, IllegalPathException, IOException {
-    LogWriter writer = new WALWriter(walFile);
-    ByteBuffer buffer = ByteBuffer.allocate(1024 * 4);
-    List<Pair<Long, Integer>> positionAndEntryPairList = new ArrayList<>();
-    int memTableId = 0;
-    long fileOffset = 0;
-    for (int i = 0; i < 100; ) {
-      InsertRowNode insertRowNode = WALTestUtils.getInsertRowNode(devicePath + 
memTableId, i);
-      if (buffer.remaining() >= buffer.capacity() / 4) {
-        int pos = buffer.position();
-        insertRowNode.serialize(buffer);
-        int size = buffer.position() - pos;
-        positionAndEntryPairList.add(new Pair<>(fileOffset, size));
-        fileOffset += size;
-        i++;
-      } else {
+    List<Pair<Long, Integer>> positionAndEntryPairList;
+    int memTableId;
+    try (LogWriter writer = new WALWriter(walFile)) {
+      writer.setCompressedByteBuffer(
+          ByteBuffer.allocateDirect(WALBuffer.ONE_THIRD_WAL_BUFFER_SIZE));
+      ByteBuffer buffer = ByteBuffer.allocate(1024 * 4);
+      positionAndEntryPairList = new ArrayList<>();
+      memTableId = 0;
+      long fileOffset = 0;
+      for (int i = 0; i < 100; ) {
+        InsertRowNode insertRowNode = WALTestUtils.getInsertRowNode(devicePath 
+ memTableId, i);
+        if (buffer.remaining() >= buffer.capacity() / 4) {
+          int pos = buffer.position();
+          insertRowNode.serialize(buffer);
+          int size = buffer.position() - pos;
+          positionAndEntryPairList.add(new Pair<>(fileOffset, size));
+          fileOffset += size;
+          i++;
+        } else {
+          writer.write(buffer);
+          buffer.clear();
+        }
+      }
+      if (buffer.position() != 0) {
         writer.write(buffer);
-        buffer.clear();
       }
     }
-    if (buffer.position() != 0) {
-      writer.write(buffer);
-    }
-    writer.close();
     try (WALInputStream stream = new WALInputStream(walFile)) {
       for (int i = 0; i < 100; ++i) {
         Pair<Long, Integer> positionAndNodePair = 
positionAndEntryPairList.get(i);

Reply via email to