This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch wal_compression
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 51bfb3d368ab50d4a27298cd47c2773f35db8c76
Author: OneSizeFitQuorum <[email protected]>
AuthorDate: Sat Jun 29 16:03:34 2024 +0800

    use directbuffer to optimize performance
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../dataregion/wal/io/WALInputStream.java          | 23 +++++++++++++++-------
 .../java/org/apache/iotdb/db/utils/MmapUtil.java   |  8 ++++++++
 2 files changed, 24 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 90bfd52e371..abaf9dce36d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.MmapUtil;
 
 import org.apache.tsfile.compress.IUnCompressor;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
@@ -164,6 +165,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
   @Override
   public void close() throws IOException {
     channel.close();
+    MmapUtil.clean(dataBuffer);
     dataBuffer = null;
   }
 
@@ -210,14 +212,17 @@ public class WALInputStream extends InputStream 
implements AutoCloseable {
       if (Objects.isNull(dataBuffer)
           || dataBuffer.capacity() < segmentInfo.uncompressedSize
           || dataBuffer.capacity() > segmentInfo.uncompressedSize * 2) {
-        dataBuffer = ByteBuffer.allocate(segmentInfo.uncompressedSize);
+        if (!Objects.isNull(dataBuffer)) {
+          MmapUtil.clean(dataBuffer);
+        }
+        dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
       }
       dataBuffer.clear();
 
       if (Objects.isNull(compressedBuffer)
           || compressedBuffer.capacity() < segmentInfo.dataInDiskSize
           || compressedBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
-        compressedBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+        compressedBuffer = 
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
       }
       compressedBuffer.clear();
       // limit the buffer to prevent it from reading too much byte than 
expected
@@ -226,15 +231,18 @@ public class WALInputStream extends InputStream 
implements AutoCloseable {
         throw new IOException("Unexpected end of file");
       }
       compressedBuffer.flip();
-
       IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
       unCompressor.uncompress(compressedBuffer, dataBuffer);
+      MmapUtil.clean(compressedBuffer);
     } else {
       // An uncompressed segment
       if (Objects.isNull(dataBuffer)
           || dataBuffer.capacity() < segmentInfo.dataInDiskSize
           || dataBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
-        dataBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+        if (!Objects.isNull(dataBuffer)) {
+          MmapUtil.clean(dataBuffer);
+        }
+        dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
       }
       dataBuffer.clear();
       // limit the buffer to prevent it from reading too much byte than 
expected
@@ -286,14 +294,15 @@ public class WALInputStream extends InputStream 
implements AutoCloseable {
       } while (posRemain >= 0);
 
       if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
-        compressedBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+        compressedBuffer = 
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
         channel.read(compressedBuffer);
         compressedBuffer.flip();
         IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
-        dataBuffer = ByteBuffer.allocate(segmentInfo.uncompressedSize);
+        dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
         unCompressor.uncompress(compressedBuffer, dataBuffer);
+        MmapUtil.clean(compressedBuffer);
       } else {
-        dataBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+        dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
         channel.read(dataBuffer);
         dataBuffer.flip();
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
index 10b7bd6eb73..326c6c0583d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.utils;
 
 import io.netty.util.internal.PlatformDependent;
 
+import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 
 public class MmapUtil {
@@ -32,4 +33,11 @@ public class MmapUtil {
     }
     PlatformDependent.freeDirectBuffer(mappedByteBuffer);
   }
+
+  /** we do not need to clean heapByteBuffer manually, so we just leave it 
alone. */
+  public static void clean(ByteBuffer byteBuffer) {
+    if (byteBuffer != null & byteBuffer instanceof MappedByteBuffer) {
+      clean((MappedByteBuffer) byteBuffer);
+    }
+  }
 }

Reply via email to