This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bb46190fa5 Support WAL Compression (#12476)
0bb46190fa5 is described below

commit 0bb46190fa5ba87d7efc2fc54827b586475c72a9
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Jun 18 11:47:50 2024 +0800

    Support WAL Compression (#12476)
    
    * enable wal compression
    
    remove metrics in mem table flush task, cache hash code in partial path, 
use gzip to compress wal
    
    batch update metrics
    
    * fix bug
    
    * fix compilation problem
    
    * remove useless code
    
    * recover some code
    
    * support compression type in WAL Compress Header
    
    * support multi version WAL
    
    * edit configuration item
    
    * add log for WAL size
    
    * temp for debug
    
    * fix bug
    
    * remove useless log
    
    * remove one configuration
    
    * use compression rate to update wal disk usage
    
    * fix ut
    
    * fix test
    
    * set default to uncompress
    
    * fix wal ut
    
    * optimize calculating of wal size
    
    * close wal file when the origin size of wal buffer is larger than threshold
    
    * add the size of magic string
    
    * may be fix the bug
    
    * fix with comment
    
    * edit with review
    
    * fix test
    
    * add test for wal compression
    
    * add hot reload
    
    * clean the code to make it more readable
    
    * reuse the byte buffer if possible
    
    * Indicate the encoding of String
    
    * Edit according to comment
    
    * spotless
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  13 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   9 +
 .../allocation/AbstractNodeAllocationStrategy.java |   8 +-
 .../dataregion/wal/buffer/AbstractWALBuffer.java   |   3 +-
 .../dataregion/wal/buffer/WALBuffer.java           |  16 +-
 .../wal/checkpoint/CheckpointManager.java          |   8 +-
 .../dataregion/wal/io/CheckpointReader.java        |   5 +-
 .../dataregion/wal/io/CheckpointWriter.java        |   4 +-
 .../dataregion/wal/io/ILogWriter.java              |   3 +-
 .../storageengine/dataregion/wal/io/LogWriter.java |  88 ++++-
 .../dataregion/wal/io/WALByteBufReader.java        |  12 +-
 .../dataregion/wal/io/WALInputStream.java          | 365 ++++++++++++++++++
 .../dataregion/wal/io/WALMetaData.java             |  25 +-
 .../storageengine/dataregion/wal/io/WALReader.java |  12 +-
 .../storageengine/dataregion/wal/io/WALWriter.java |  30 +-
 .../storageengine/dataregion/wal/node/WALNode.java |   5 +-
 .../dataregion/wal/recover/WALNodeRecoverTask.java |   1 +
 .../dataregion/wal/recover/WALRecoverWriter.java   |   4 +-
 .../dataregion/wal/utils/WALEntryPosition.java     |  31 +-
 .../java/org/apache/iotdb/db/tools/WalChecker.java |   6 +-
 .../storageengine/dataregion/wal/WALTestUtils.java |  90 +++++
 .../wal/compression/WALCompressionTest.java        | 409 +++++++++++++++++++++
 .../dataregion/wal/node/WALNodeTest.java           |   4 +
 .../wal/recover/WALRecoverManagerTest.java         |   4 +
 .../wal/recover/WALRecoverWriterTest.java          |  14 +-
 .../resources/conf/iotdb-system.properties         |   4 +
 26 files changed, 1107 insertions(+), 66 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9c20f6c32ec..3e3fa348ed6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
 import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.fileSystem.FSType;
 import org.apache.tsfile.utils.FSUtils;
@@ -1137,6 +1138,8 @@ public class IoTDBConfig {
    */
   private String RateLimiterType = "FixedIntervalRateLimiter";
 
+  private CompressionType WALCompressionAlgorithm = 
CompressionType.UNCOMPRESSED;
+
   IoTDBConfig() {}
 
   public int getMaxLogEntriesNumPerBatch() {
@@ -1881,7 +1884,7 @@ public class IoTDBConfig {
     return walFileSizeThresholdInByte;
   }
 
-  void setWalFileSizeThresholdInByte(long walFileSizeThresholdInByte) {
+  public void setWalFileSizeThresholdInByte(long walFileSizeThresholdInByte) {
     this.walFileSizeThresholdInByte = walFileSizeThresholdInByte;
   }
 
@@ -3984,4 +3987,12 @@ public class IoTDBConfig {
         new TEndPoint(getInternalAddress(), getSchemaRegionConsensusPort()));
     return result;
   }
+
+  public CompressionType getWALCompressionAlgorithm() {
+    return WALCompressionAlgorithm;
+  }
+
+  public void setWALCompressionAlgorithm(CompressionType 
WALCompressionAlgorithm) {
+    this.WALCompressionAlgorithm = WALCompressionAlgorithm;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 81c60b9b809..dcf8792a145 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
 
 import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.fileSystem.FSType;
 import org.apache.tsfile.utils.FilePathUtils;
@@ -416,6 +417,10 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "io_task_queue_size_for_flushing",
                 Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
+    boolean enableWALCompression =
+        Boolean.parseBoolean(properties.getProperty("enable_wal_compression", 
"false"));
+    conf.setWALCompressionAlgorithm(
+        enableWALCompression ? CompressionType.LZ4 : 
CompressionType.UNCOMPRESSED);
 
     conf.setCompactionScheduleIntervalInMs(
         Long.parseLong(
@@ -1793,6 +1798,10 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "merge_threshold_of_explain_analyze",
                   String.valueOf(conf.getMergeThresholdOfExplainAnalyze()))));
+      boolean enableWALCompression =
+          
Boolean.parseBoolean(properties.getProperty("enable_wal_compression", "false"));
+      conf.setWALCompressionAlgorithm(
+          enableWALCompression ? CompressionType.LZ4 : 
CompressionType.UNCOMPRESSED);
 
       // update Consensus config
       reloadConsensusProps(properties);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java
index 62c438fcc69..e98086e5146 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java
@@ -32,7 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.Arrays;
 
 public abstract class AbstractNodeAllocationStrategy implements 
NodeAllocationStrategy {
@@ -72,8 +72,8 @@ public abstract class AbstractNodeAllocationStrategy 
implements NodeAllocationSt
   protected IWALNode createWALNode(String identifier, String folder) {
     try {
       return new WALNode(identifier, folder);
-    } catch (FileNotFoundException e) {
-      logger.error("Fail to create wal node", e);
+    } catch (IOException e) {
+      logger.error("Meet exception when creating wal node", e);
       return WALFakeNode.getFailureInstance(e);
     }
   }
@@ -82,7 +82,7 @@ public abstract class AbstractNodeAllocationStrategy 
implements NodeAllocationSt
       String identifier, String folder, long startFileVersion, long 
startSearchIndex) {
     try {
       return new WALNode(identifier, folder, startFileVersion, 
startSearchIndex);
-    } catch (FileNotFoundException e) {
+    } catch (IOException e) {
       logger.error("Fail to create wal node", e);
       return WALFakeNode.getFailureInstance(e);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/AbstractWALBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/AbstractWALBuffer.java
index 08bca5f6649..c4c5bc13a3d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/AbstractWALBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/AbstractWALBuffer.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
@@ -58,7 +57,7 @@ public abstract class AbstractWALBuffer implements IWALBuffer 
{
 
   protected AbstractWALBuffer(
       String identifier, String logDirectory, long startFileVersion, long 
startSearchIndex)
-      throws FileNotFoundException {
+      throws IOException {
     this.identifier = identifier;
     this.logDirectory = logDirectory;
     File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
index 7bf33a93f40..20970aea34b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
@@ -42,7 +42,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
@@ -119,7 +118,7 @@ public class WALBuffer extends AbstractWALBuffer {
   // manage wal files which have MemTableIds
   private final Map<Long, Set<Long>> memTableIdsOfWal = new 
ConcurrentHashMap<>();
 
-  public WALBuffer(String identifier, String logDirectory) throws 
FileNotFoundException {
+  public WALBuffer(String identifier, String logDirectory) throws IOException {
     this(identifier, logDirectory, new CheckpointManager(identifier, 
logDirectory), 0, 0L);
   }
 
@@ -129,7 +128,7 @@ public class WALBuffer extends AbstractWALBuffer {
       CheckpointManager checkpointManager,
       long startFileVersion,
       long startSearchIndex)
-      throws FileNotFoundException {
+      throws IOException {
     super(identifier, logDirectory, startFileVersion, startSearchIndex);
     this.checkpointManager = checkpointManager;
     currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
@@ -521,8 +520,9 @@ public class WALBuffer extends AbstractWALBuffer {
           forceFlag, syncingBuffer.position(), syncingBuffer.capacity(), 
usedRatio * 100);
 
       // flush buffer to os
+      double compressionRatio = 1.0;
       try {
-        currentWALFileWriter.write(syncingBuffer, info.metaData);
+        compressionRatio = currentWALFileWriter.write(syncingBuffer, 
info.metaData);
       } catch (Throwable e) {
         logger.error(
             "Fail to sync wal node-{}'s buffer, change system mode to error.", 
identifier, e);
@@ -535,12 +535,14 @@ public class WALBuffer extends AbstractWALBuffer {
       memTableIdsOfWal
           .computeIfAbsent(currentWALFileVersion, memTableIds -> new 
HashSet<>())
           .addAll(info.metaData.getMemTablesId());
-      
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage);
+      
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage, 
compressionRatio);
 
       boolean forceSuccess = false;
       // try to roll log writer
       if (info.rollWALFileWriterListener != null
-          || (forceFlag && currentWALFileWriter.size() >= 
config.getWalFileSizeThresholdInByte())) {
+          // TODO: Control the wal file by the number of WALEntry
+          || (forceFlag
+              && currentWALFileWriter.originalSize() >= 
config.getWalFileSizeThresholdInByte())) {
         try {
           rollLogWriter(searchIndex, currentWALFileWriter.getWalFileStatus());
           forceSuccess = true;
@@ -582,7 +584,7 @@ public class WALBuffer extends AbstractWALBuffer {
             position += fsyncListener.getWalEntryHandler().getSize();
           }
         }
-        lastFsyncPosition = currentWALFileWriter.size();
+        lastFsyncPosition = currentWALFileWriter.originalSize();
       }
       WRITING_METRICS.recordWALBufferEntriesCount(info.fsyncListeners.size());
       WRITING_METRICS.recordSyncWALBufferCost(System.nanoTime() - startTime, 
forceFlag);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
index 8359330754d..412dfcfc0a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
@@ -78,7 +77,7 @@ public class CheckpointManager implements AutoCloseable {
 
   // endregion
 
-  public CheckpointManager(String identifier, String logDirectory) throws 
FileNotFoundException {
+  public CheckpointManager(String identifier, String logDirectory) throws 
IOException {
     this.identifier = identifier;
     this.logDirectory = logDirectory;
     File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
@@ -345,12 +344,13 @@ public class CheckpointManager implements AutoCloseable {
   }
 
   /** Update wal disk cost of active memTables. */
-  public void updateCostOfActiveMemTables(Map<Long, Long> 
memTableId2WalDiskUsage) {
+  public void updateCostOfActiveMemTables(
+      Map<Long, Long> memTableId2WalDiskUsage, double compressionRate) {
     for (Map.Entry<Long, Long> memTableWalUsage : 
memTableId2WalDiskUsage.entrySet()) {
       memTableId2Info.computeIfPresent(
           memTableWalUsage.getKey(),
           (k, v) -> {
-            v.addWalDiskUsage(memTableWalUsage.getValue());
+            v.addWalDiskUsage((long) (memTableWalUsage.getValue() * 
compressionRate));
             return v;
           });
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
index 5d2bad0a874..578ab21ae8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
@@ -24,10 +24,8 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -47,8 +45,7 @@ public class CheckpointReader {
 
   private void init() {
     checkpoints = new ArrayList<>();
-    try (DataInputStream logStream =
-        new DataInputStream(new BufferedInputStream(new 
FileInputStream(logFile)))) {
+    try (DataInputStream logStream = new DataInputStream(new 
WALInputStream(logFile))) {
       maxMemTableId = logStream.readLong();
       while (logStream.available() > 0) {
         Checkpoint checkpoint = Checkpoint.deserialize(logStream);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointWriter.java
index cd6af496377..322aa3c9f5d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointWriter.java
@@ -22,11 +22,11 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
 
 import java.io.File;
-import java.io.FileNotFoundException;
+import java.io.IOException;
 
 /** CheckpointWriter writes the binary {@link Checkpoint} into .checkpoint 
file. */
 public class CheckpointWriter extends LogWriter {
-  public CheckpointWriter(File logFile) throws FileNotFoundException {
+  public CheckpointWriter(File logFile) throws IOException {
     super(logFile);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
index d89563a9328..f4d65612c47 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
@@ -33,8 +33,9 @@ public interface ILogWriter extends Closeable {
    *
    * @param buffer content that have been converted to bytes
    * @throws IOException if an I/O error occurs
+   * @return Compression rate of the buffer after compression
    */
-  void write(ByteBuffer buffer) throws IOException;
+  double write(ByteBuffer buffer) throws IOException;
 
   /**
    * Forces any updates to this file to be written to the storage device that 
contains it.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index 68f4deae318..29335efde5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -19,19 +19,23 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
 
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
 
 /**
  * LogWriter writes the binary logs into a file, including writing {@link 
WALEntry} into .wal file
@@ -43,23 +47,89 @@ public abstract class LogWriter implements ILogWriter {
   protected final File logFile;
   protected final FileOutputStream logStream;
   protected final FileChannel logChannel;
-  protected long size;
+  protected long size = 0;
+  protected long originalSize = 0;
+  private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES * 
2 + 1);
+  private ICompressor compressor =
+      ICompressor.getCompressor(
+          
IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm());
+  private ByteBuffer compressedByteBuffer;
+  // Minimum size to compress, default is 32 KB
+  private static long minCompressionSize = 32 * 1024L;
 
-  protected LogWriter(File logFile) throws FileNotFoundException {
+  protected LogWriter(File logFile) throws IOException {
     this.logFile = logFile;
     this.logStream = new FileOutputStream(logFile, true);
     this.logChannel = this.logStream.getChannel();
+    if (!logFile.exists() || logFile.length() == 0) {
+      this.logChannel.write(
+          
ByteBuffer.wrap(WALWriter.MAGIC_STRING.getBytes(StandardCharsets.UTF_8)));
+      size += logChannel.position();
+    }
+    if (IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm()
+        != CompressionType.UNCOMPRESSED) {
+      // TODO: Use a dynamic strategy to enlarge the buffer size
+      compressedByteBuffer =
+          ByteBuffer.allocate(
+              compressor.getMaxBytesForCompression(
+                  
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
+    } else {
+      compressedByteBuffer = null;
+    }
   }
 
   @Override
-  public void write(ByteBuffer buffer) throws IOException {
-    size += buffer.position();
+  public double write(ByteBuffer buffer) throws IOException {
+    CompressionType compressionType =
+        IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm();
+    int bufferSize = buffer.position();
+    if (bufferSize == 0) {
+      return 1.0;
+    }
+    originalSize += bufferSize;
     buffer.flip();
+    boolean compressed = false;
+    int uncompressedSize = bufferSize;
+    if (compressionType != CompressionType.UNCOMPRESSED
+        /* Do not compress buffer that is less than min size */
+        && bufferSize > minCompressionSize) {
+      if (Objects.isNull(compressedByteBuffer)) {
+        compressedByteBuffer =
+            ByteBuffer.allocate(
+                compressor.getMaxBytesForCompression(
+                    
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
+      }
+      compressedByteBuffer.clear();
+      if (compressor.getType() != compressionType) {
+        compressor = ICompressor.getCompressor(compressionType);
+      }
+      compressor.compress(buffer, compressedByteBuffer);
+      buffer = compressedByteBuffer;
+      bufferSize = buffer.position();
+      buffer.flip();
+      compressed = true;
+    }
+    size += bufferSize;
+    /*
+     Header structure:
+     [CompressionType(1 byte)][dataBufferSize(4 bytes)][uncompressedSize(4 
bytes)]
+    */
+    headerBuffer.clear();
+    headerBuffer.put(
+        compressed ? compressionType.serialize() : 
CompressionType.UNCOMPRESSED.serialize());
+    headerBuffer.putInt(bufferSize);
+    if (compressed) {
+      headerBuffer.putInt(uncompressedSize);
+    }
+    size += headerBuffer.position();
     try {
+      headerBuffer.flip();
+      logChannel.write(headerBuffer);
       logChannel.write(buffer);
     } catch (ClosedChannelException e) {
       logger.warn("Cannot write to {}", logFile, e);
     }
+    return ((double) bufferSize / uncompressedSize);
   }
 
   @Override
@@ -79,6 +149,10 @@ public abstract class LogWriter implements ILogWriter {
     return size;
   }
 
+  public long originalSize() {
+    return originalSize;
+  }
+
   @Override
   public File getLogFile() {
     return logFile;
@@ -97,4 +171,8 @@ public abstract class LogWriter implements ILogWriter {
       }
     }
   }
+
+  public long getOffset() throws IOException {
+    return logChannel.position();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
index f101eaf3647..882b5ea468c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 
 import java.io.Closeable;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ public class WALByteBufReader implements Closeable {
   private final File logFile;
   private final FileChannel channel;
   private final WALMetaData metaData;
+  private final DataInputStream logStream;
   private final Iterator<Integer> sizeIterator;
 
   public WALByteBufReader(File logFile) throws IOException {
@@ -46,6 +48,7 @@ public class WALByteBufReader implements Closeable {
   public WALByteBufReader(File logFile, FileChannel channel) throws 
IOException {
     this.logFile = logFile;
     this.channel = channel;
+    this.logStream = new DataInputStream(new WALInputStream(logFile));
     this.metaData = WALMetaData.readFromWALFile(logFile, channel);
     this.sizeIterator = metaData.getBuffersSize().iterator();
     channel.position(0);
@@ -63,9 +66,14 @@ public class WALByteBufReader implements Closeable {
    */
   public ByteBuffer next() throws IOException {
     int size = sizeIterator.next();
+    // TODO: Reuse this buffer
     ByteBuffer buffer = ByteBuffer.allocate(size);
-    channel.read(buffer);
-    buffer.clear();
+    /*
+     Notice, we don't need to flip the buffer after calling
+     logStream.readFully, since this function does not change
+     the position of the buffer.
+    */
+    logStream.readFully(buffer.array(), 0, size);
     return buffer;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
new file mode 100644
index 00000000000..844c06436b7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+
+import org.apache.iotdb.db.utils.MmapUtil;
+
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+public class WALInputStream extends InputStream implements AutoCloseable {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(WALInputStream.class);
+  private final FileChannel channel;
+  private final ByteBuffer segmentHeaderBuffer = 
ByteBuffer.allocate(Integer.BYTES + Byte.BYTES);
+  private final ByteBuffer compressedHeader = 
ByteBuffer.allocate(Integer.BYTES);
+  private ByteBuffer dataBuffer = null;
+  private ByteBuffer compressedBuffer = null;
+  private long fileSize;
+  File logFile;
+  /*
+   The WAL file consist of following parts:
+   [MagicString] [Segment 1] [Segment 2] ... [Segment N] [Metadata] 
[MagicString]
+   The endOffset indicates the maximum offset that a segment can reach.
+   Aka, the last byte of the last segment.
+  */
+  private long endOffset = -1;
+
+  enum FileVersion {
+    V1,
+    V2,
+    UNKNOWN
+  };
+
+  FileVersion version;
+
+  public WALInputStream(File logFile) throws IOException {
+    channel = FileChannel.open(logFile.toPath());
+    fileSize = channel.size();
+    analyzeFileVersion();
+    getEndOffset();
+    this.logFile = logFile;
+  }
+
+  private void getEndOffset() throws IOException {
+    if (channel.size() < WALWriter.MAGIC_STRING_BYTES + Integer.BYTES) {
+      // An broken file
+      endOffset = channel.size();
+      return;
+    }
+    ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
+    long position;
+    try {
+      if (version == FileVersion.V2) {
+        // New Version
+        ByteBuffer magicStringBuffer = 
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+        channel.read(magicStringBuffer, channel.size() - 
WALWriter.MAGIC_STRING_BYTES);
+        magicStringBuffer.flip();
+        if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
+            .equals(WALWriter.MAGIC_STRING)) {
+          // This is a broken wal file
+          endOffset = channel.size();
+          return;
+        } else {
+          // This is a normal wal file
+          position = channel.size() - WALWriter.MAGIC_STRING_BYTES - 
Integer.BYTES;
+        }
+      } else {
+        // Old version
+        ByteBuffer magicStringBuffer =
+            
ByteBuffer.allocate(WALWriter.MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8).length);
+        channel.read(
+            magicStringBuffer,
+            channel.size() - 
WALWriter.MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8).length);
+        magicStringBuffer.flip();
+        if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
+            .equals(WALWriter.MAGIC_STRING_V1)) {
+          // this is a broken wal file
+          endOffset = channel.size();
+          return;
+        } else {
+          position =
+              channel.size()
+                  - 
WALWriter.MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8).length
+                  - Integer.BYTES;
+        }
+      }
+      // Read the meta data size
+      channel.read(metadataSizeBuf, position);
+      metadataSizeBuf.flip();
+      int metadataSize = metadataSizeBuf.getInt();
+      endOffset = channel.size() - WALWriter.MAGIC_STRING_BYTES - 
Integer.BYTES - metadataSize - 1;
+    } finally {
+      channel.position(WALWriter.MAGIC_STRING_BYTES);
+    }
+  }
+
+  private void analyzeFileVersion() throws IOException {
+    if (channel.size() < WALWriter.MAGIC_STRING_BYTES) {
+      version = FileVersion.UNKNOWN;
+      return;
+    }
+    if (isCurrentVersion()) {
+      this.version = FileVersion.V2;
+      return;
+    }
+    this.version = FileVersion.V1;
+  }
+
+  private boolean isCurrentVersion() throws IOException {
+    channel.position(0);
+    ByteBuffer buffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+    channel.read(buffer);
+    return new String(buffer.array(), 
StandardCharsets.UTF_8).equals(WALWriter.MAGIC_STRING);
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (Objects.isNull(dataBuffer) || dataBuffer.position() >= 
dataBuffer.limit()) {
+      loadNextSegment();
+    }
+    return dataBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if (Objects.isNull(dataBuffer) || dataBuffer.position() >= 
dataBuffer.limit()) {
+      loadNextSegment();
+    }
+    if (dataBuffer.remaining() >= len) {
+      dataBuffer.get(b, off, len);
+      return len;
+    }
+    int toBeRead = len;
+    while (toBeRead > 0) {
+      int remaining = dataBuffer.remaining();
+      int bytesRead = Math.min(remaining, toBeRead);
+      dataBuffer.get(b, off, bytesRead);
+      off += bytesRead;
+      toBeRead -= bytesRead;
+      if (toBeRead > 0) {
+        loadNextSegment();
+      }
+    }
+    return len;
+  }
+
+  @Override
+  public void close() throws IOException {
+    channel.close();
+    dataBuffer = null;
+  }
+
+  @Override
+  public int available() throws IOException {
+    long size = (endOffset - channel.position());
+    if (!Objects.isNull(dataBuffer)) {
+      size += dataBuffer.limit() - dataBuffer.position();
+    }
+    return (int) size;
+  }
+
+  private void loadNextSegment() throws IOException {
+    if (channel.position() >= endOffset) {
+      throw new IOException("End of file");
+    }
+    if (version == FileVersion.V2) {
+      loadNextSegmentV2();
+    } else if (version == FileVersion.V1) {
+      loadNextSegmentV1();
+    } else {
+      tryLoadSegment();
+    }
+  }
+
+  private void loadNextSegmentV1() throws IOException {
+    // just read raw data as input
+    if (channel.position() >= fileSize) {
+      throw new IOException("Unexpected end of file");
+    }
+    if (Objects.isNull(dataBuffer)) {
+      // read 128 KB
+      dataBuffer = ByteBuffer.allocate(128 * 1024);
+    }
+    dataBuffer.clear();
+    channel.read(dataBuffer);
+    dataBuffer.flip();
+  }
+
+  private void loadNextSegmentV2() throws IOException {
+    SegmentInfo segmentInfo = getNextSegmentInfo();
+    if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
+      // A compressed segment
+      if (Objects.isNull(dataBuffer)
+          || dataBuffer.capacity() < segmentInfo.uncompressedSize
+          || dataBuffer.capacity() > segmentInfo.uncompressedSize * 2) {
+        if (!Objects.isNull(dataBuffer)) {
+          MmapUtil.clean((MappedByteBuffer) dataBuffer);
+        }
+        dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
+      }
+      dataBuffer.clear();
+
+      if (Objects.isNull(compressedBuffer)
+          || compressedBuffer.capacity() < segmentInfo.dataInDiskSize
+          || compressedBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
+        if (!Objects.isNull(compressedBuffer)) {
+          MmapUtil.clean((MappedByteBuffer) compressedBuffer);
+        }
+        compressedBuffer = 
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
+      }
+      compressedBuffer.clear();
+      // limit the buffer to prevent it from reading too much byte than 
expected
+      compressedBuffer.limit(segmentInfo.dataInDiskSize);
+      if (channel.read(compressedBuffer) != segmentInfo.dataInDiskSize) {
+        throw new IOException("Unexpected end of file");
+      }
+      compressedBuffer.flip();
+
+      IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
+      unCompressor.uncompress(compressedBuffer, dataBuffer);
+    } else {
+      // An uncompressed segment
+      if (Objects.isNull(dataBuffer)
+          || dataBuffer.capacity() < segmentInfo.dataInDiskSize
+          || dataBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
+        if (!Objects.isNull(dataBuffer)) {
+          MmapUtil.clean((MappedByteBuffer) dataBuffer);
+        }
+        dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
+      }
+      dataBuffer.clear();
+      // limit the buffer to prevent it from reading too much byte than 
expected
+      dataBuffer.limit(segmentInfo.dataInDiskSize);
+
+      if (channel.read(dataBuffer) != segmentInfo.dataInDiskSize) {
+        throw new IOException("Unexpected end of file");
+      }
+    }
+    dataBuffer.flip();
+  }
+
+  private void tryLoadSegment() throws IOException {
+    long originPosition = channel.position();
+    try {
+      loadNextSegmentV2();
+      version = FileVersion.V2;
+    } catch (Throwable e) {
+      // failed to load in V2 way, try in V1 way
+      logger.warn("Failed to load WAL segment in V2 way, try in V1 way", e);
+      channel.position(originPosition);
+    }
+
+    if (version == FileVersion.UNKNOWN) {
+      loadNextSegmentV1();
+      version = FileVersion.V1;
+    }
+  }
+
+  /**
+   * Since current WAL file is compressed, but some part of the system need to 
skip the offset of an
+   * uncompressed wal file, this method is used to skip to the given logical 
position.
+   *
+   * @param pos The logical offset to skip to
+   * @throws IOException If the file is broken or the given position is invalid
+   */
+  public void skipToGivenLogicalPosition(long pos) throws IOException {
+    if (version == FileVersion.V2) {
+      channel.position(WALWriter.MAGIC_STRING_BYTES);
+      long posRemain = pos;
+      SegmentInfo segmentInfo = null;
+      do {
+        segmentInfo = getNextSegmentInfo();
+        if (posRemain >= segmentInfo.uncompressedSize) {
+          posRemain -= segmentInfo.uncompressedSize;
+          channel.position(channel.position() + segmentInfo.dataInDiskSize);
+        } else {
+          break;
+        }
+      } while (posRemain >= 0);
+
+      if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
+        compressedBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+        channel.read(compressedBuffer);
+        compressedBuffer.flip();
+        IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
+        dataBuffer = ByteBuffer.allocate(segmentInfo.uncompressedSize);
+        unCompressor.uncompress(compressedBuffer, dataBuffer);
+      } else {
+        dataBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+        channel.read(dataBuffer);
+        dataBuffer.flip();
+      }
+
+      dataBuffer.position((int) posRemain);
+    } else {
+      dataBuffer = null;
+      channel.position(pos);
+    }
+  }
+
+  public void read(ByteBuffer buffer) throws IOException {
+    int totalBytesToBeRead = buffer.remaining();
+    int currReadBytes = Math.min(dataBuffer.remaining(), buffer.remaining());
+    dataBuffer.get(buffer.array(), buffer.position(), currReadBytes);
+    if (totalBytesToBeRead - currReadBytes > 0) {
+      loadNextSegment();
+      read(buffer);
+    }
+  }
+
+  public long getFileCurrentPos() throws IOException {
+    return channel.position();
+  }
+
+  private SegmentInfo getNextSegmentInfo() throws IOException {
+    segmentHeaderBuffer.clear();
+    channel.read(segmentHeaderBuffer);
+    segmentHeaderBuffer.flip();
+    SegmentInfo info = new SegmentInfo();
+    info.compressionType = 
CompressionType.deserialize(segmentHeaderBuffer.get());
+    info.dataInDiskSize = segmentHeaderBuffer.getInt();
+    if (info.compressionType != CompressionType.UNCOMPRESSED) {
+      compressedHeader.clear();
+      channel.read(compressedHeader);
+      compressedHeader.flip();
+      info.uncompressedSize = compressedHeader.getInt();
+    } else {
+      info.uncompressedSize = info.dataInDiskSize;
+    }
+    return info;
+  }
+
+  private class SegmentInfo {
+    public CompressionType compressionType;
+    public int dataInDiskSize;
+    public int uncompressedSize;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
index 9ca700a62f7..7fa634ffbdc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
@@ -22,10 +22,14 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
 import org.apache.iotdb.db.utils.SerializedSize;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -36,6 +40,7 @@ import java.util.Set;
  * entry and the number of entries.
  */
 public class WALMetaData implements SerializedSize {
+  private static final Logger logger = 
LoggerFactory.getLogger(WALMetaData.class);
   // search index 8 byte, wal entries' number 4 bytes
   private static final int FIXED_SERIALIZED_SIZE = Long.BYTES + Integer.BYTES;
 
@@ -45,6 +50,7 @@ public class WALMetaData implements SerializedSize {
   private final List<Integer> buffersSize;
   // memTable ids of this wal file
   private final Set<Long> memTablesId;
+  private long truncateOffSet = 0;
 
   public WALMetaData() {
     this(ConsensusReqReader.DEFAULT_SEARCH_INDEX, new ArrayList<>(), new 
HashSet<>());
@@ -79,7 +85,7 @@ public class WALMetaData implements SerializedSize {
         + (memTablesId.isEmpty() ? 0 : Integer.BYTES + memTablesId.size() * 
Long.BYTES);
   }
 
-  public void serialize(ByteBuffer buffer) {
+  public void serialize(File file, ByteBuffer buffer) {
     buffer.putLong(firstSearchIndex);
     buffer.putInt(buffersSize.size());
     for (int size : buffersSize) {
@@ -123,8 +129,7 @@ public class WALMetaData implements SerializedSize {
   }
 
   public static WALMetaData readFromWALFile(File logFile, FileChannel channel) 
throws IOException {
-    if (channel.size() < WALWriter.MAGIC_STRING_BYTES
-        || !readTailMagic(channel).equals(WALWriter.MAGIC_STRING)) {
+    if (channel.size() < WALWriter.MAGIC_STRING_BYTES || 
!isValidMagicString(channel)) {
       throw new IOException(String.format("Broken wal file %s", logFile));
     }
     // load metadata size
@@ -153,10 +158,20 @@ public class WALMetaData implements SerializedSize {
     return metaData;
   }
 
-  private static String readTailMagic(FileChannel channel) throws IOException {
+  private static boolean isValidMagicString(FileChannel channel) throws 
IOException {
     ByteBuffer magicStringBytes = 
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
     channel.read(magicStringBytes, channel.size() - 
WALWriter.MAGIC_STRING_BYTES);
     magicStringBytes.flip();
-    return new String(magicStringBytes.array());
+    String magicString = new String(magicStringBytes.array(), 
StandardCharsets.UTF_8);
+    return magicString.equals(WALWriter.MAGIC_STRING)
+        || magicString.contains(WALWriter.MAGIC_STRING_V1);
+  }
+
+  public void setTruncateOffSet(long offset) {
+    this.truncateOffSet = offset;
+  }
+
+  public long getTruncateOffSet() {
+    return truncateOffSet;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
index ee50c73df97..1310bb36b46 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
@@ -26,12 +26,10 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
@@ -46,6 +44,7 @@ public class WALReader implements Closeable {
 
   private final File logFile;
   private final boolean fileMayCorrupt;
+  private final WALInputStream walInputStream;
   private final DataInputStream logStream;
   private WALEntry nextEntry;
   private boolean fileCorrupted = false;
@@ -57,9 +56,8 @@ public class WALReader implements Closeable {
   public WALReader(File logFile, boolean fileMayCorrupt) throws IOException {
     this.logFile = logFile;
     this.fileMayCorrupt = fileMayCorrupt;
-    this.logStream =
-        new DataInputStream(
-            new BufferedInputStream(Files.newInputStream(logFile.toPath()), 
STREAM_BUFFER_SIZE));
+    this.walInputStream = new WALInputStream(logFile);
+    this.logStream = new DataInputStream(walInputStream);
   }
 
   /** Like {@link Iterator#hasNext()}. */
@@ -88,6 +86,10 @@ public class WALReader implements Closeable {
     return nextEntry != null;
   }
 
+  public long getWALCurrentReadOffset() throws IOException {
+    return walInputStream.getFileCurrentPos();
+  }
+
   /**
    * Like {@link Iterator#next()}.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
index 425fc676fad..7017b4be6cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
@@ -24,21 +24,26 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALSignalEntry;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 
 /** WALWriter writes the binary {@link WALEntry} into .wal file. */
 public class WALWriter extends LogWriter {
-  public static final String MAGIC_STRING = "WAL";
-  public static final int MAGIC_STRING_BYTES = MAGIC_STRING.getBytes().length;
+  private static final Logger logger = 
LoggerFactory.getLogger(WALWriter.class);
+  public static final String MAGIC_STRING_V1 = "WAL";
+  public static final String MAGIC_STRING = "V2-WAL";
+  public static final int MAGIC_STRING_BYTES = 
MAGIC_STRING.getBytes(StandardCharsets.UTF_8).length;
 
   private WALFileStatus walFileStatus = 
WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
   // wal files' metadata
   protected final WALMetaData metaData = new WALMetaData();
 
-  public WALWriter(File logFile) throws FileNotFoundException {
+  public WALWriter(File logFile) throws IOException {
     super(logFile);
   }
 
@@ -47,11 +52,11 @@ public class WALWriter extends LogWriter {
    *
    * @throws IOException when failing to write
    */
-  public void write(ByteBuffer buffer, WALMetaData metaData) throws 
IOException {
+  public double write(ByteBuffer buffer, WALMetaData metaData) throws 
IOException {
     // update metadata
     updateMetaData(metaData);
     // flush buffer
-    write(buffer);
+    return write(buffer);
   }
 
   public void updateMetaData(WALMetaData metaData) {
@@ -67,11 +72,18 @@ public class WALWriter extends LogWriter {
     // mark info part ends
     endMarker.serialize(buffer);
     // flush meta data
-    metaData.serialize(buffer);
+    metaData.serialize(logFile, buffer);
     buffer.putInt(metaDataSize);
     // add magic string
-    buffer.put(MAGIC_STRING.getBytes());
-    write(buffer);
+    buffer.put(MAGIC_STRING.getBytes(StandardCharsets.UTF_8));
+    size += buffer.position();
+    writeMetadata(buffer);
+  }
+
+  private void writeMetadata(ByteBuffer buffer) throws IOException {
+    size += buffer.position();
+    buffer.flip();
+    logChannel.write(buffer);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index e4a497de4d9..9a4b99f9ec0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -64,6 +64,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -107,13 +108,13 @@ public class WALNode implements IWALNode {
   // insert nodes whose search index are before this value can be deleted 
safely
   private volatile long safelyDeletedSearchIndex = 
DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
 
-  public WALNode(String identifier, String logDirectory) throws 
FileNotFoundException {
+  public WALNode(String identifier, String logDirectory) throws IOException {
     this(identifier, logDirectory, 0, 0L);
   }
 
   public WALNode(
       String identifier, String logDirectory, long startFileVersion, long 
startSearchIndex)
-      throws FileNotFoundException {
+      throws IOException {
     this.identifier = identifier;
     this.logDirectory = SystemFileFactory.INSTANCE.getFile(logDirectory);
     if (!this.logDirectory.exists() && this.logDirectory.mkdirs()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
index 237b353c9f9..07a06448359 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
@@ -172,6 +172,7 @@ public class WALNodeRecoverTask implements Runnable {
             }
           }
         }
+        metaData.setTruncateOffSet(walReader.getWALCurrentReadOffset());
         metaData.add(walEntry.serializedSize(), searchIndex, 
walEntry.getMemTableId());
       }
     } catch (Exception e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
index 7015d45c58b..5db0c17c987 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
@@ -41,14 +41,14 @@ public class WALRecoverWriter {
 
   public void recover(WALMetaData metaData) throws IOException {
     // locate broken data
-    int truncateSize;
+    long truncateSize;
     if (logFile.length() < MAGIC_STRING_BYTES) { // file without magic string
       truncateSize = 0;
     } else {
       if (readTailMagic().equals(MAGIC_STRING)) { // complete file
         return;
       } else { // file with broken magic string
-        truncateSize = 
metaData.getBuffersSize().stream().mapToInt(Integer::intValue).sum();
+        truncateSize = metaData.getTruncateOffSet();
       }
     }
     // truncate broken data
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index 13b2f4c9704..c794745c6f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.storageengine.dataregion.wal.utils;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALInputStream;
 import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
 
 import org.apache.tsfile.utils.Pair;
@@ -100,11 +101,12 @@ public class WALEntryPosition {
     if (!canRead()) {
       throw new IOException("Target file hasn't been specified.");
     }
-    try (FileChannel channel = openReadFileChannel()) {
+    // TODO: Reuse the file stream
+    try (WALInputStream is = openReadFileStream()) {
+      is.skipToGivenLogicalPosition(position);
       ByteBuffer buffer = ByteBuffer.allocate(size);
-      channel.position(position);
-      channel.read(buffer);
-      buffer.clear();
+      is.read(buffer);
+      buffer.flip();
       return buffer;
     }
   }
@@ -135,6 +137,27 @@ public class WALEntryPosition {
     }
   }
 
+  public WALInputStream openReadFileStream() throws IOException {
+    // TODO: Refactor this part of code
+    if (isInSealedFile()) {
+      walFile = walNode.getWALFile(walFileVersionId);
+      return new WALInputStream(walFile);
+    } else {
+      try {
+        walFile = walNode.getWALFile(walFileVersionId);
+        return new WALInputStream(walFile);
+      } catch (IOException e) {
+        // unsealed file may be renamed after sealed, so we should try again
+        if (isInSealedFile()) {
+          walFile = walNode.getWALFile(walFileVersionId);
+          return new WALInputStream(walFile);
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+
   public File getWalFile() {
     return walFile;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/WalChecker.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
index cbf2ead7464..5e53e9f664a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
@@ -22,15 +22,14 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
 import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALInputStream;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -86,8 +85,7 @@ public class WalChecker {
   }
 
   private boolean checkFile(File walFile) {
-    try (DataInputStream logStream =
-        new DataInputStream(new BufferedInputStream(new 
FileInputStream(walFile)))) {
+    try (DataInputStream logStream = new DataInputStream(new 
WALInputStream(walFile))) {
       while (logStream.available() > 0) {
         WALEntry walEntry = WALEntry.deserialize(logStream);
         if (walEntry.getType() == WALEntryType.WAL_FILE_INFO_END_MARKER) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALTestUtils.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALTestUtils.java
new file mode 100644
index 00000000000..8f671eb3f03
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALTestUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+import java.lang.reflect.Field;
+
+public class WALTestUtils {
+  public static void setMinCompressionSize(long size)
+      throws NoSuchFieldException, ClassNotFoundException, 
IllegalAccessException {
+    Class<?> logWriterClass =
+        
Class.forName("org.apache.iotdb.db.storageengine.dataregion.wal.io.LogWriter");
+    Field minCompressionSizeField = 
logWriterClass.getDeclaredField("minCompressionSize");
+    minCompressionSizeField.setAccessible(true);
+    minCompressionSizeField.setLong(null, size);
+  }
+
+  public static long getMinCompressionSize()
+      throws ClassNotFoundException, NoSuchFieldException, 
IllegalAccessException {
+    Class<?> logWriterClass =
+        
Class.forName("org.apache.iotdb.db.storageengine.dataregion.wal.io.LogWriter");
+    Field minCompressionSizeField = 
logWriterClass.getDeclaredField("minCompressionSize");
+    minCompressionSizeField.setAccessible(true);
+    return minCompressionSizeField.getLong(null);
+  }
+
+  public static InsertRowNode getInsertRowNode(String devicePath, long time)
+      throws IllegalPathException, QueryProcessException {
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.DOUBLE,
+          TSDataType.FLOAT,
+          TSDataType.INT64,
+          TSDataType.INT32,
+          TSDataType.BOOLEAN,
+          TSDataType.TEXT
+        };
+
+    Object[] columns = new Object[6];
+    columns[0] = 1.0d;
+    columns[1] = 2f;
+    columns[2] = 10000L;
+    columns[3] = 100;
+    columns[4] = false;
+    columns[5] = new Binary("hh" + 0, TSFileConfig.STRING_CHARSET);
+
+    InsertRowNode node =
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath(devicePath),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+            dataTypes,
+            time,
+            columns,
+            false);
+    MeasurementSchema[] schemas = new MeasurementSchema[6];
+    for (int i = 0; i < 6; i++) {
+      schemas[i] = new MeasurementSchema("s" + (i + 1), dataTypes[i]);
+    }
+    node.setMeasurementSchemas(schemas);
+    return node;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
new file mode 100644
index 00000000000..d187f6107b6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.compression;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.storageengine.dataregion.wal.WALTestUtils;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALBuffer;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALInfoEntry;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALSignalEntry;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.LogWriter;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALInputStream;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALReader;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
+import org.apache.iotdb.db.utils.constant.TestConstant;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+public class WALCompressionTest {
+  private final File walFile =
+      new File(
+          TestConstant.BASE_OUTPUT_PATH.concat(
+              WALFileUtils.getLogFileName(0, 0, 
WALFileStatus.CONTAINS_SEARCH_INDEX)));
+
+  private final String compressionDir =
+      TestConstant.OUTPUT_DATA_DIR.concat(File.separator + "wal-compression");
+
+  private final String devicePath = "root.sg.d1";
+  long originalMinCompressionSize;
+  CompressionType originCompressionType =
+      IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm();
+
+  @Before
+  public void setUp()
+      throws IOException, NoSuchFieldException, ClassNotFoundException, 
IllegalAccessException {
+    if (walFile.exists()) {
+      FileUtils.delete(walFile);
+    }
+    originalMinCompressionSize = WALTestUtils.getMinCompressionSize();
+    if (new File(compressionDir).exists()) {
+      FileUtils.forceDelete(new File(compressionDir));
+    }
+  }
+
+  @After
+  public void tearDown()
+      throws IOException, NoSuchFieldException, ClassNotFoundException, 
IllegalAccessException {
+    if (walFile.exists()) {
+      FileUtils.delete(walFile);
+    }
+    if (new File(compressionDir).exists()) {
+      FileUtils.forceDelete(new File(compressionDir));
+    }
+    WALTestUtils.setMinCompressionSize(originalMinCompressionSize);
+    
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(originCompressionType);
+  }
+
+  @Test
+  public void testSkipToGivenPositionWithCompression()
+      throws NoSuchFieldException,
+          ClassNotFoundException,
+          IllegalAccessException,
+          QueryProcessException,
+          IllegalPathException,
+          IOException {
+    WALTestUtils.setMinCompressionSize(0L);
+    
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(CompressionType.LZ4);
+    testSkipToGivenPosition();
+  }
+
+  @Test
+  public void testSkipToGivenPositionWithoutCompression()
+      throws NoSuchFieldException,
+          ClassNotFoundException,
+          IllegalAccessException,
+          QueryProcessException,
+          IllegalPathException,
+          IOException {
+    WALTestUtils.setMinCompressionSize(1024 * 32);
+    testSkipToGivenPosition();
+  }
+
+  public void testSkipToGivenPosition()
+      throws QueryProcessException, IllegalPathException, IOException {
+    LogWriter writer = new WALWriter(walFile);
+    ByteBuffer buffer = ByteBuffer.allocate(1024 * 4);
+    List<Pair<Long, InsertRowNode>> positionAndEntryPairList = new 
ArrayList<>();
+    int memTableId = 0;
+    long fileOffset = 0;
+    for (int i = 0; i < 100; ) {
+      InsertRowNode insertRowNode = WALTestUtils.getInsertRowNode(devicePath + 
memTableId, i);
+      long serializedSize = insertRowNode.serializedSize();
+      if (buffer.remaining() >= serializedSize) {
+        int pos = buffer.position();
+        insertRowNode.serialize(buffer);
+        positionAndEntryPairList.add(new Pair<>(fileOffset, insertRowNode));
+        fileOffset += buffer.position() - pos;
+        i++;
+      } else {
+        writer.write(buffer);
+        buffer.clear();
+      }
+    }
+    if (buffer.position() != 0) {
+      writer.write(buffer);
+    }
+    writer.close();
+    try (WALInputStream stream = new WALInputStream(walFile)) {
+      for (int i = 0; i < 100; ++i) {
+        Pair<Long, InsertRowNode> positionAndNodePair = 
positionAndEntryPairList.get(i);
+        stream.skipToGivenLogicalPosition(positionAndNodePair.left);
+        /*
+          Add the allocated buffer size by 2, because the actual serialized 
size
+          of InsertRowNode is larger than the estimated value got by 
serializedSize.
+          I don't know if this is a bug or not.
+        */
+        ByteBuffer nodeBuffer1 =
+            ByteBuffer.allocate(positionAndNodePair.right.serializedSize() + 
2);
+        stream.read(nodeBuffer1);
+        ByteBuffer nodeBuffer2 =
+            ByteBuffer.allocate(positionAndNodePair.right.serializedSize() + 
2);
+        positionAndNodePair.right.serialize(nodeBuffer2);
+        nodeBuffer2.flip();
+        Assert.assertArrayEquals(nodeBuffer1.array(), nodeBuffer2.array());
+      }
+    }
+  }
+
+  @Test
+  public void testUncompressedWALStructure()
+      throws QueryProcessException, IllegalPathException, IOException {
+    PublicBAOS baos = new PublicBAOS();
+    DataOutputStream dataOutputStream = new DataOutputStream(baos);
+    List<InsertRowNode> insertRowNodes = new ArrayList<>();
+    for (int i = 0; i < 100; ++i) {
+      InsertRowNode node = WALTestUtils.getInsertRowNode(devicePath, i);
+      insertRowNodes.add(node);
+      node.serialize(dataOutputStream);
+    }
+    dataOutputStream.close();
+    ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
+    // Do not compress it
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setWALCompressionAlgorithm(CompressionType.UNCOMPRESSED);
+    try (WALWriter writer = new WALWriter(walFile)) {
+      buf.position(buf.limit());
+      writer.write(buf);
+    }
+
+    try (DataInputStream dataInputStream =
+        new DataInputStream(new 
BufferedInputStream(Files.newInputStream(walFile.toPath())))) {
+      byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_BYTES];
+      // head magic string
+      dataInputStream.readFully(magicStringBytes);
+      Assert.assertEquals(WALWriter.MAGIC_STRING, new 
String(magicStringBytes));
+      Assert.assertEquals(
+          CompressionType.UNCOMPRESSED, 
CompressionType.deserialize(dataInputStream.readByte()));
+      Assert.assertEquals(buf.array().length, dataInputStream.readInt());
+      ByteBuffer dataBuf = ByteBuffer.allocate(buf.array().length);
+      dataInputStream.readFully(dataBuf.array());
+      Assert.assertArrayEquals(buf.array(), dataBuf.array());
+      Assert.assertEquals(
+          new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER),
+          WALEntry.deserialize(dataInputStream));
+      ByteBuffer metadataBuf = ByteBuffer.allocate(12 + Integer.BYTES);
+      dataInputStream.readFully(metadataBuf.array());
+      // Tail magic string
+      dataInputStream.readFully(magicStringBytes);
+      Assert.assertEquals(WALWriter.MAGIC_STRING, new 
String(magicStringBytes));
+    }
+  }
+
+  @Test
+  public void testCompressedWALStructure()
+      throws IOException,
+          QueryProcessException,
+          IllegalPathException,
+          NoSuchFieldException,
+          ClassNotFoundException,
+          IllegalAccessException {
+    PublicBAOS baos = new PublicBAOS();
+    DataOutputStream dataOutputStream = new DataOutputStream(baos);
+    List<InsertRowNode> insertRowNodes = new ArrayList<>();
+    for (int i = 0; i < 100; ++i) {
+      InsertRowNode node = WALTestUtils.getInsertRowNode(devicePath, i);
+      insertRowNodes.add(node);
+      node.serialize(dataOutputStream);
+    }
+    dataOutputStream.close();
+    ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
+    // Compress it
+    
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(CompressionType.LZ4);
+    WALTestUtils.setMinCompressionSize(0);
+    try (WALWriter writer = new WALWriter(walFile)) {
+      buf.position(buf.limit());
+      writer.write(buf);
+    }
+    ICompressor compressor = ICompressor.getCompressor(CompressionType.LZ4);
+    byte[] compressed = compressor.compress(buf.array());
+
+    try (DataInputStream dataInputStream =
+        new DataInputStream(new 
BufferedInputStream(Files.newInputStream(walFile.toPath())))) {
+      byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_BYTES];
+      // head magic string
+      dataInputStream.readFully(magicStringBytes);
+      Assert.assertEquals(WALWriter.MAGIC_STRING, new 
String(magicStringBytes));
+      Assert.assertEquals(
+          CompressionType.LZ4, 
CompressionType.deserialize(dataInputStream.readByte()));
+      Assert.assertEquals(compressed.length, dataInputStream.readInt());
+      Assert.assertEquals(buf.array().length, dataInputStream.readInt());
+      ByteBuffer dataBuf = ByteBuffer.allocate(compressed.length);
+      dataInputStream.readFully(dataBuf.array());
+      Assert.assertArrayEquals(compressed, dataBuf.array());
+      IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(CompressionType.LZ4);
+      Assert.assertArrayEquals(unCompressor.uncompress(compressed), 
buf.array());
+      Assert.assertEquals(
+          new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER),
+          WALEntry.deserialize(dataInputStream));
+      ByteBuffer metadataBuf = ByteBuffer.allocate(12 + Integer.BYTES);
+      dataInputStream.readFully(metadataBuf.array());
+      // Tail magic string
+      dataInputStream.readFully(magicStringBytes);
+      Assert.assertEquals(WALWriter.MAGIC_STRING, new 
String(magicStringBytes));
+    }
+  }
+
+  @Test
+  public void testWALReaderWithoutCompression()
+      throws QueryProcessException, IllegalPathException, IOException, 
InterruptedException {
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setWALCompressionAlgorithm(CompressionType.UNCOMPRESSED);
+    testWALReader();
+  }
+
+  @Test
+  public void testWALReaderWithCompression()
+      throws QueryProcessException,
+          IllegalPathException,
+          IOException,
+          InterruptedException,
+          NoSuchFieldException,
+          ClassNotFoundException,
+          IllegalAccessException {
+    
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(CompressionType.LZ4);
+    WALTestUtils.setMinCompressionSize(0);
+    testWALReader();
+  }
+
+  public void testWALReader()
+      throws IOException, QueryProcessException, IllegalPathException, 
InterruptedException {
+    File dir = new File(compressionDir);
+    if (!dir.exists()) {
+      dir.mkdirs();
+    }
+    WALBuffer walBuffer = new WALBuffer("", compressionDir);
+    List<WALEntry> entryList = new ArrayList<>();
+    for (int i = 0; i < 100; ++i) {
+      InsertRowNode node = WALTestUtils.getInsertRowNode(devicePath, i);
+      WALEntry entry = new WALInfoEntry(0, node);
+      walBuffer.write(entry);
+      entryList.add(entry);
+    }
+    long sleepTime = 0;
+    while (!walBuffer.isAllWALEntriesConsumed()) {
+      Thread.sleep(100);
+      sleepTime += 100;
+      if (sleepTime > 10_000) {
+        Assert.fail("It has been too long for all entries to be consumed");
+      }
+    }
+    walBuffer.close();
+
+    File[] walFiles = WALFileUtils.listAllWALFiles(new File(compressionDir));
+    Assert.assertNotNull(walFiles);
+    Assert.assertEquals(1, walFiles.length);
+    List<WALEntry> readWALEntryList = new ArrayList<>();
+    try (WALReader reader = new WALReader(walFiles[0])) {
+      while (reader.hasNext()) {
+        readWALEntryList.add(reader.next());
+      }
+    }
+    Assert.assertEquals(entryList, readWALEntryList);
+
+    try (WALByteBufReader reader = new WALByteBufReader(walFiles[0])) {
+      for (int i = 0; i < 100; ++i) {
+        Assert.assertTrue(reader.hasNext());
+        ByteBuffer buffer = reader.next();
+        Assert.assertEquals(entryList.get(i).serializedSize(), 
buffer.array().length);
+      }
+    }
+  }
+
+  @Test
+  public void testHotLoad()
+      throws IOException,
+          QueryProcessException,
+          IllegalPathException,
+          InterruptedException,
+          NoSuchFieldException,
+          ClassNotFoundException,
+          IllegalAccessException {
+    File dir = new File(compressionDir);
+    if (!dir.exists()) {
+      dir.mkdirs();
+    }
+    WALTestUtils.setMinCompressionSize(0);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setWALCompressionAlgorithm(CompressionType.UNCOMPRESSED);
+    // Do not compress wal for these entries
+    WALBuffer walBuffer = new WALBuffer("", compressionDir);
+    List<WALEntry> entryList = new ArrayList<>();
+    for (int i = 0; i < 50; ++i) {
+      InsertRowNode node = WALTestUtils.getInsertRowNode(devicePath, i);
+      WALEntry entry = new WALInfoEntry(0, node);
+      walBuffer.write(entry);
+      entryList.add(entry);
+    }
+    long sleepTime = 0;
+    while (!walBuffer.isAllWALEntriesConsumed()) {
+      Thread.sleep(100);
+      sleepTime += 100;
+      if (sleepTime > 10_000) {
+        Assert.fail("It has been too long for all entries to be consumed");
+      }
+    }
+
+    // compress wal for these entries
+    
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(CompressionType.LZ4);
+    for (int i = 50; i < 100; ++i) {
+      InsertRowNode node = WALTestUtils.getInsertRowNode(devicePath, i);
+      WALEntry entry = new WALInfoEntry(0, node);
+      walBuffer.write(entry);
+      entryList.add(entry);
+    }
+    sleepTime = 0;
+    while (!walBuffer.isAllWALEntriesConsumed()) {
+      Thread.sleep(100);
+      sleepTime += 100;
+      if (sleepTime > 10_000) {
+        Assert.fail("It has been too long for all entries to be consumed");
+      }
+    }
+    walBuffer.close();
+
+    File[] walFiles = WALFileUtils.listAllWALFiles(new File(compressionDir));
+    Assert.assertNotNull(walFiles);
+    Assert.assertEquals(1, walFiles.length);
+    List<WALEntry> readWALEntryList = new ArrayList<>();
+    try (WALReader reader = new WALReader(walFiles[0])) {
+      while (reader.hasNext()) {
+        readWALEntryList.add(reader.next());
+      }
+    }
+    Assert.assertEquals(entryList, readWALEntryList);
+
+    try (WALByteBufReader reader = new WALByteBufReader(walFiles[0])) {
+      for (int i = 0; i < 100; ++i) {
+        Assert.assertTrue(reader.hasNext());
+        ByteBuffer buffer = reader.next();
+        Assert.assertEquals(entryList.get(i).serializedSize(), 
buffer.array().length);
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
index 61f5e4150e5..a3c2e71e4d2 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
@@ -80,9 +80,12 @@ public class WALNodeTest {
   private WALMode prevMode;
   private String prevConsensus;
   private WALNode walNode;
+  private long originWALThreshold =
+      
IoTDBDescriptor.getInstance().getConfig().getWalFileSizeThresholdInByte();
 
   @Before
   public void setUp() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setWalFileSizeThresholdInByte(2 
* 1024 * 1024);
     EnvironmentUtils.cleanDir(logDirectory);
     prevMode = config.getWalMode();
     prevConsensus = config.getDataRegionConsensusProtocolClass();
@@ -93,6 +96,7 @@ public class WALNodeTest {
 
   @After
   public void tearDown() throws Exception {
+    
IoTDBDescriptor.getInstance().getConfig().setWalFileSizeThresholdInByte(originWALThreshold);
     walNode.close();
     config.setWalMode(prevMode);
     config.setDataRegionConsensusProtocolClass(prevConsensus);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
index 7193d7722a5..0ba1ebf2571 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
@@ -109,9 +109,12 @@ public class WALRecoverManagerTest {
   private CheckpointManager checkpointManager;
   private TsFileResource tsFileWithWALResource;
   private TsFileResource tsFileWithoutWALResource;
+  private long originWALThreshold =
+      
IoTDBDescriptor.getInstance().getConfig().getWalFileSizeThresholdInByte();
 
   @Before
   public void setUp() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setWalFileSizeThresholdInByte(1 
* 1024 * 1024);
     EnvironmentUtils.cleanDir(new File(FILE_WITH_WAL_NAME).getParent());
     EnvironmentUtils.envSetUp();
     prevMode = config.getWalMode();
@@ -122,6 +125,7 @@ public class WALRecoverManagerTest {
 
   @After
   public void tearDown() throws Exception {
+    
IoTDBDescriptor.getInstance().getConfig().setWalFileSizeThresholdInByte(originWALThreshold);
     if (tsFileWithWALResource != null) {
       tsFileWithWALResource.close();
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
index c22505ad1a3..97b00f379f2 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
@@ -71,9 +71,13 @@ public class WALRecoverWriterTest {
     // recover
     WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
     walRecoverWriter.recover(walMetaData);
-    // verify file, marker + metadata(search index + size number) + metadata 
size + magic string
+    // verify file, marker + metadata(search index + size number) + metadata 
size + head magic
+    // string + tail magic string
     Assert.assertEquals(
-        Byte.BYTES + (Long.BYTES + Integer.BYTES) + Integer.BYTES + 
WALWriter.MAGIC_STRING_BYTES,
+        Byte.BYTES
+            + (Long.BYTES + Integer.BYTES)
+            + Integer.BYTES
+            + WALWriter.MAGIC_STRING_BYTES * 2,
         logFile.length());
     try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
       Assert.assertFalse(reader.hasNext());
@@ -95,7 +99,10 @@ public class WALRecoverWriterTest {
     walRecoverWriter.recover(walMetaData);
     // verify file, marker + metadata(search index + size number) + metadata 
size + magic string
     Assert.assertEquals(
-        Byte.BYTES + (Long.BYTES + Integer.BYTES) + Integer.BYTES + 
WALWriter.MAGIC_STRING_BYTES,
+        Byte.BYTES
+            + (Long.BYTES + Integer.BYTES)
+            + Integer.BYTES
+            + WALWriter.MAGIC_STRING_BYTES * 2,
         logFile.length());
     try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
       Assert.assertFalse(reader.hasNext());
@@ -162,6 +169,7 @@ public class WALRecoverWriterTest {
     walMetaData.add(size, 1, walEntry.getMemTableId());
     try (WALWriter walWriter = new WALWriter(logFile)) {
       walWriter.write(buffer.getBuffer(), walMetaData);
+      walMetaData.setTruncateOffSet(walWriter.getOffset());
     }
     long len = logFile.length();
     try (FileChannel channel = FileChannel.open(logFile.toPath(), 
StandardOpenOption.APPEND)) {
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
index 38f3f7c00f0..7dbf5f5b716 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
@@ -1431,6 +1431,10 @@ data_replication_factor=1
 # Datatype: long
 # iot_consensus_cache_window_time_in_ms=-1
 
+# Enable Write Ahead Log compression.
+# Option: true, false
+# enable_wal_compression=false
+
 ####################
 ### IoTConsensus Configuration
 ####################

Reply via email to