This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c0743d98c2c Enhance wal compression (#12830)
c0743d98c2c is described below

commit c0743d98c2c249958d6063dfacd86d5ca0c3fb43
Author: Potato <[email protected]>
AuthorDate: Mon Jul 1 21:24:42 2024 +0800

    Enhance wal compression (#12830)
    
    * use directbuffer to optimize performance
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
    
    * optimize log
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
    
    * fix
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
    
    * fix review
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
    
    ---------
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../confignode1conf/iotdb-system.properties        |  2 --
 .../dataregion/wal/io/WALInputStream.java          | 34 +++++++++++++---------
 .../java/org/apache/iotdb/db/utils/MmapUtil.java   |  8 +++++
 3 files changed, 29 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
 
b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
index b396e373f86..5e97f60ba3c 100644
--- 
a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
+++ 
b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
@@ -33,8 +33,6 @@ cn_metric_prometheus_reporter_port=9091
 timestamp_precision=ms
 
data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
 
schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
-schema_replication_factor=3
-data_replication_factor=3
 udf_lib_dir=target/confignode1/ext/udf
 trigger_lib_dir=target/confignode1/ext/trigger
 pipe_lib_dir=target/confignode1/ext/pipe
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 90bfd52e371..350f8f5e995 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.MmapUtil;
 
 import org.apache.tsfile.compress.IUnCompressor;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
@@ -164,6 +165,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
   @Override
   public void close() throws IOException {
     channel.close();
+    MmapUtil.clean(dataBuffer);
     dataBuffer = null;
   }
 
@@ -210,14 +212,17 @@ public class WALInputStream extends InputStream 
implements AutoCloseable {
       if (Objects.isNull(dataBuffer)
           || dataBuffer.capacity() < segmentInfo.uncompressedSize
           || dataBuffer.capacity() > segmentInfo.uncompressedSize * 2) {
-        dataBuffer = ByteBuffer.allocate(segmentInfo.uncompressedSize);
+        if (!Objects.isNull(dataBuffer)) {
+          MmapUtil.clean(dataBuffer);
+        }
+        dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
       }
       dataBuffer.clear();
 
       if (Objects.isNull(compressedBuffer)
           || compressedBuffer.capacity() < segmentInfo.dataInDiskSize
           || compressedBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
-        compressedBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+        compressedBuffer = 
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
       }
       compressedBuffer.clear();
       // limit the buffer to prevent it from reading too much byte than 
expected
@@ -226,15 +231,18 @@ public class WALInputStream extends InputStream 
implements AutoCloseable {
         throw new IOException("Unexpected end of file");
       }
       compressedBuffer.flip();
-
       IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
       unCompressor.uncompress(compressedBuffer, dataBuffer);
+      MmapUtil.clean(compressedBuffer);
     } else {
       // An uncompressed segment
       if (Objects.isNull(dataBuffer)
           || dataBuffer.capacity() < segmentInfo.dataInDiskSize
           || dataBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
-        dataBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+        if (!Objects.isNull(dataBuffer)) {
+          MmapUtil.clean(dataBuffer);
+        }
+        dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
       }
       dataBuffer.clear();
       // limit the buffer to prevent it from reading too much byte than 
expected
@@ -250,16 +258,15 @@ public class WALInputStream extends InputStream 
implements AutoCloseable {
   private void tryLoadSegment() throws IOException {
     long originPosition = channel.position();
     try {
-      loadNextSegmentV2();
-      version = WALFileVersion.V2;
-      return;
+      loadNextSegmentV1();
+      version = WALFileVersion.V1;
     } catch (Throwable e) {
       // failed to load in V2 way, try in V1 way
-      logger.warn("Failed to load WAL segment in V2 way, try in V1 way", e);
       channel.position(originPosition);
+      loadNextSegmentV2();
+      version = WALFileVersion.V2;
+      logger.info("Failed to load WAL segment in V1 way, try in V2 way 
successfully.");
     }
-    loadNextSegmentV1();
-    version = WALFileVersion.V1;
   }
 
   /**
@@ -286,14 +293,15 @@ public class WALInputStream extends InputStream 
implements AutoCloseable {
       } while (posRemain >= 0);
 
       if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
-        compressedBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+        compressedBuffer = 
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
         channel.read(compressedBuffer);
         compressedBuffer.flip();
         IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
-        dataBuffer = ByteBuffer.allocate(segmentInfo.uncompressedSize);
+        dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
         unCompressor.uncompress(compressedBuffer, dataBuffer);
+        MmapUtil.clean(compressedBuffer);
       } else {
-        dataBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+        dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
         channel.read(dataBuffer);
         dataBuffer.flip();
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
index 10b7bd6eb73..21f239af9f7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.utils;
 
 import io.netty.util.internal.PlatformDependent;
 
+import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 
 public class MmapUtil {
@@ -32,4 +33,11 @@ public class MmapUtil {
     }
     PlatformDependent.freeDirectBuffer(mappedByteBuffer);
   }
+
+  /** we do not need to clean heapByteBuffer manually, so we just leave it 
alone. */
+  public static void clean(ByteBuffer byteBuffer) {
+    if (byteBuffer instanceof MappedByteBuffer) {
+      clean((MappedByteBuffer) byteBuffer);
+    }
+  }
 }

Reply via email to