This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0bb46190fa5 Support WAL Compression (#12476)
0bb46190fa5 is described below
commit 0bb46190fa5ba87d7efc2fc54827b586475c72a9
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Jun 18 11:47:50 2024 +0800
Support WAL Compression (#12476)
* enable wal compression
remove metrics in mem table flush task, cache hash code in partial path,
use gzip to compress wal
batch update metrics
* fix bug
* fix compilation problem
* remove useless code
* recover some code
* support compression type in WAL Compress Header
* support multi version WAL
* edit configuration item
* add log for WAL size
* temp for debug
* fix bug
* remove useless log
* remove one configuration
* use compression rate to update wal disk usage
* fix ut
* fix test
* set default to uncompress
* fix wal ut
* optimize calculating of wal size
* close wal file when the origin size of wal buffer is larger than threshold
* add the size of magic string
* may be fix the bug
* fix with comment
* edit with review
* fix test
* add test for wal compression
* add hot reload
* clean the code to make it more readable
* reuse the byte buffer if possible
* Indicate the encoding of String
* Edit according to comment
* spotless
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 9 +
.../allocation/AbstractNodeAllocationStrategy.java | 8 +-
.../dataregion/wal/buffer/AbstractWALBuffer.java | 3 +-
.../dataregion/wal/buffer/WALBuffer.java | 16 +-
.../wal/checkpoint/CheckpointManager.java | 8 +-
.../dataregion/wal/io/CheckpointReader.java | 5 +-
.../dataregion/wal/io/CheckpointWriter.java | 4 +-
.../dataregion/wal/io/ILogWriter.java | 3 +-
.../storageengine/dataregion/wal/io/LogWriter.java | 88 ++++-
.../dataregion/wal/io/WALByteBufReader.java | 12 +-
.../dataregion/wal/io/WALInputStream.java | 365 ++++++++++++++++++
.../dataregion/wal/io/WALMetaData.java | 25 +-
.../storageengine/dataregion/wal/io/WALReader.java | 12 +-
.../storageengine/dataregion/wal/io/WALWriter.java | 30 +-
.../storageengine/dataregion/wal/node/WALNode.java | 5 +-
.../dataregion/wal/recover/WALNodeRecoverTask.java | 1 +
.../dataregion/wal/recover/WALRecoverWriter.java | 4 +-
.../dataregion/wal/utils/WALEntryPosition.java | 31 +-
.../java/org/apache/iotdb/db/tools/WalChecker.java | 6 +-
.../storageengine/dataregion/wal/WALTestUtils.java | 90 +++++
.../wal/compression/WALCompressionTest.java | 409 +++++++++++++++++++++
.../dataregion/wal/node/WALNodeTest.java | 4 +
.../wal/recover/WALRecoverManagerTest.java | 4 +
.../wal/recover/WALRecoverWriterTest.java | 14 +-
.../resources/conf/iotdb-system.properties | 4 +
26 files changed, 1107 insertions(+), 66 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9c20f6c32ec..3e3fa348ed6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.fileSystem.FSType;
import org.apache.tsfile.utils.FSUtils;
@@ -1137,6 +1138,8 @@ public class IoTDBConfig {
*/
private String RateLimiterType = "FixedIntervalRateLimiter";
+ private CompressionType WALCompressionAlgorithm =
CompressionType.UNCOMPRESSED;
+
IoTDBConfig() {}
public int getMaxLogEntriesNumPerBatch() {
@@ -1881,7 +1884,7 @@ public class IoTDBConfig {
return walFileSizeThresholdInByte;
}
- void setWalFileSizeThresholdInByte(long walFileSizeThresholdInByte) {
+ public void setWalFileSizeThresholdInByte(long walFileSizeThresholdInByte) {
this.walFileSizeThresholdInByte = walFileSizeThresholdInByte;
}
@@ -3984,4 +3987,12 @@ public class IoTDBConfig {
new TEndPoint(getInternalAddress(), getSchemaRegionConsensusPort()));
return result;
}
+
+ public CompressionType getWALCompressionAlgorithm() {
+ return WALCompressionAlgorithm;
+ }
+
+ public void setWALCompressionAlgorithm(CompressionType
WALCompressionAlgorithm) {
+ this.WALCompressionAlgorithm = WALCompressionAlgorithm;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 81c60b9b809..dcf8792a145 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.fileSystem.FSType;
import org.apache.tsfile.utils.FilePathUtils;
@@ -416,6 +417,10 @@ public class IoTDBDescriptor {
properties.getProperty(
"io_task_queue_size_for_flushing",
Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
+ boolean enableWALCompression =
+ Boolean.parseBoolean(properties.getProperty("enable_wal_compression",
"false"));
+ conf.setWALCompressionAlgorithm(
+ enableWALCompression ? CompressionType.LZ4 :
CompressionType.UNCOMPRESSED);
conf.setCompactionScheduleIntervalInMs(
Long.parseLong(
@@ -1793,6 +1798,10 @@ public class IoTDBDescriptor {
properties.getProperty(
"merge_threshold_of_explain_analyze",
String.valueOf(conf.getMergeThresholdOfExplainAnalyze()))));
+ boolean enableWALCompression =
+
Boolean.parseBoolean(properties.getProperty("enable_wal_compression", "false"));
+ conf.setWALCompressionAlgorithm(
+ enableWALCompression ? CompressionType.LZ4 :
CompressionType.UNCOMPRESSED);
// update Consensus config
reloadConsensusProps(properties);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java
index 62c438fcc69..e98086e5146 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java
@@ -32,7 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileNotFoundException;
+import java.io.IOException;
import java.util.Arrays;
public abstract class AbstractNodeAllocationStrategy implements
NodeAllocationStrategy {
@@ -72,8 +72,8 @@ public abstract class AbstractNodeAllocationStrategy
implements NodeAllocationSt
protected IWALNode createWALNode(String identifier, String folder) {
try {
return new WALNode(identifier, folder);
- } catch (FileNotFoundException e) {
- logger.error("Fail to create wal node", e);
+ } catch (IOException e) {
+ logger.error("Meet exception when creating wal node", e);
return WALFakeNode.getFailureInstance(e);
}
}
@@ -82,7 +82,7 @@ public abstract class AbstractNodeAllocationStrategy
implements NodeAllocationSt
String identifier, String folder, long startFileVersion, long
startSearchIndex) {
try {
return new WALNode(identifier, folder, startFileVersion,
startSearchIndex);
- } catch (FileNotFoundException e) {
+ } catch (IOException e) {
logger.error("Fail to create wal node", e);
return WALFakeNode.getFailureInstance(e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/AbstractWALBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/AbstractWALBuffer.java
index 08bca5f6649..c4c5bc13a3d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/AbstractWALBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/AbstractWALBuffer.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
@@ -58,7 +57,7 @@ public abstract class AbstractWALBuffer implements IWALBuffer
{
protected AbstractWALBuffer(
String identifier, String logDirectory, long startFileVersion, long
startSearchIndex)
- throws FileNotFoundException {
+ throws IOException {
this.identifier = identifier;
this.logDirectory = logDirectory;
File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
index 7bf33a93f40..20970aea34b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
@@ -42,7 +42,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
@@ -119,7 +118,7 @@ public class WALBuffer extends AbstractWALBuffer {
// manage wal files which have MemTableIds
private final Map<Long, Set<Long>> memTableIdsOfWal = new
ConcurrentHashMap<>();
- public WALBuffer(String identifier, String logDirectory) throws
FileNotFoundException {
+ public WALBuffer(String identifier, String logDirectory) throws IOException {
this(identifier, logDirectory, new CheckpointManager(identifier,
logDirectory), 0, 0L);
}
@@ -129,7 +128,7 @@ public class WALBuffer extends AbstractWALBuffer {
CheckpointManager checkpointManager,
long startFileVersion,
long startSearchIndex)
- throws FileNotFoundException {
+ throws IOException {
super(identifier, logDirectory, startFileVersion, startSearchIndex);
this.checkpointManager = checkpointManager;
currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
@@ -521,8 +520,9 @@ public class WALBuffer extends AbstractWALBuffer {
forceFlag, syncingBuffer.position(), syncingBuffer.capacity(),
usedRatio * 100);
// flush buffer to os
+ double compressionRatio = 1.0;
try {
- currentWALFileWriter.write(syncingBuffer, info.metaData);
+ compressionRatio = currentWALFileWriter.write(syncingBuffer,
info.metaData);
} catch (Throwable e) {
logger.error(
"Fail to sync wal node-{}'s buffer, change system mode to error.",
identifier, e);
@@ -535,12 +535,14 @@ public class WALBuffer extends AbstractWALBuffer {
memTableIdsOfWal
.computeIfAbsent(currentWALFileVersion, memTableIds -> new
HashSet<>())
.addAll(info.metaData.getMemTablesId());
-
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage);
+
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage,
compressionRatio);
boolean forceSuccess = false;
// try to roll log writer
if (info.rollWALFileWriterListener != null
- || (forceFlag && currentWALFileWriter.size() >=
config.getWalFileSizeThresholdInByte())) {
+ // TODO: Control the wal file by the number of WALEntry
+ || (forceFlag
+ && currentWALFileWriter.originalSize() >=
config.getWalFileSizeThresholdInByte())) {
try {
rollLogWriter(searchIndex, currentWALFileWriter.getWalFileStatus());
forceSuccess = true;
@@ -582,7 +584,7 @@ public class WALBuffer extends AbstractWALBuffer {
position += fsyncListener.getWalEntryHandler().getSize();
}
}
- lastFsyncPosition = currentWALFileWriter.size();
+ lastFsyncPosition = currentWALFileWriter.originalSize();
}
WRITING_METRICS.recordWALBufferEntriesCount(info.fsyncListeners.size());
WRITING_METRICS.recordSyncWALBufferCost(System.nanoTime() - startTime,
forceFlag);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
index 8359330754d..412dfcfc0a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
@@ -78,7 +77,7 @@ public class CheckpointManager implements AutoCloseable {
// endregion
- public CheckpointManager(String identifier, String logDirectory) throws
FileNotFoundException {
+ public CheckpointManager(String identifier, String logDirectory) throws
IOException {
this.identifier = identifier;
this.logDirectory = logDirectory;
File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
@@ -345,12 +344,13 @@ public class CheckpointManager implements AutoCloseable {
}
/** Update wal disk cost of active memTables. */
- public void updateCostOfActiveMemTables(Map<Long, Long>
memTableId2WalDiskUsage) {
+ public void updateCostOfActiveMemTables(
+ Map<Long, Long> memTableId2WalDiskUsage, double compressionRate) {
for (Map.Entry<Long, Long> memTableWalUsage :
memTableId2WalDiskUsage.entrySet()) {
memTableId2Info.computeIfPresent(
memTableWalUsage.getKey(),
(k, v) -> {
- v.addWalDiskUsage(memTableWalUsage.getValue());
+ v.addWalDiskUsage((long) (memTableWalUsage.getValue() *
compressionRate));
return v;
});
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
index 5d2bad0a874..578ab21ae8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
@@ -24,10 +24,8 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -47,8 +45,7 @@ public class CheckpointReader {
private void init() {
checkpoints = new ArrayList<>();
- try (DataInputStream logStream =
- new DataInputStream(new BufferedInputStream(new
FileInputStream(logFile)))) {
+ try (DataInputStream logStream = new DataInputStream(new
WALInputStream(logFile))) {
maxMemTableId = logStream.readLong();
while (logStream.available() > 0) {
Checkpoint checkpoint = Checkpoint.deserialize(logStream);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointWriter.java
index cd6af496377..322aa3c9f5d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointWriter.java
@@ -22,11 +22,11 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
import java.io.File;
-import java.io.FileNotFoundException;
+import java.io.IOException;
/** CheckpointWriter writes the binary {@link Checkpoint} into .checkpoint
file. */
public class CheckpointWriter extends LogWriter {
- public CheckpointWriter(File logFile) throws FileNotFoundException {
+ public CheckpointWriter(File logFile) throws IOException {
super(logFile);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
index d89563a9328..f4d65612c47 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
@@ -33,8 +33,9 @@ public interface ILogWriter extends Closeable {
*
* @param buffer content that have been converted to bytes
* @throws IOException if an I/O error occurs
+ * @return Compression rate of the buffer after compression
*/
- void write(ByteBuffer buffer) throws IOException;
+ double write(ByteBuffer buffer) throws IOException;
/**
* Forces any updates to this file to be written to the storage device that
contains it.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index 68f4deae318..29335efde5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -19,19 +19,23 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
/**
* LogWriter writes the binary logs into a file, including writing {@link
WALEntry} into .wal file
@@ -43,23 +47,89 @@ public abstract class LogWriter implements ILogWriter {
protected final File logFile;
protected final FileOutputStream logStream;
protected final FileChannel logChannel;
- protected long size;
+ protected long size = 0;
+ protected long originalSize = 0;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES *
2 + 1);
+ private ICompressor compressor =
+ ICompressor.getCompressor(
+
IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm());
+ private ByteBuffer compressedByteBuffer;
+ // Minimum size to compress, default is 32 KB
+ private static long minCompressionSize = 32 * 1024L;
- protected LogWriter(File logFile) throws FileNotFoundException {
+ protected LogWriter(File logFile) throws IOException {
this.logFile = logFile;
this.logStream = new FileOutputStream(logFile, true);
this.logChannel = this.logStream.getChannel();
+ if (!logFile.exists() || logFile.length() == 0) {
+ this.logChannel.write(
+
ByteBuffer.wrap(WALWriter.MAGIC_STRING.getBytes(StandardCharsets.UTF_8)));
+ size += logChannel.position();
+ }
+ if (IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm()
+ != CompressionType.UNCOMPRESSED) {
+ // TODO: Use a dynamic strategy to enlarge the buffer size
+ compressedByteBuffer =
+ ByteBuffer.allocate(
+ compressor.getMaxBytesForCompression(
+
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
+ } else {
+ compressedByteBuffer = null;
+ }
}
@Override
- public void write(ByteBuffer buffer) throws IOException {
- size += buffer.position();
+ public double write(ByteBuffer buffer) throws IOException {
+ CompressionType compressionType =
+ IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm();
+ int bufferSize = buffer.position();
+ if (bufferSize == 0) {
+ return 1.0;
+ }
+ originalSize += bufferSize;
buffer.flip();
+ boolean compressed = false;
+ int uncompressedSize = bufferSize;
+ if (compressionType != CompressionType.UNCOMPRESSED
+ /* Do not compress buffer that is less than min size */
+ && bufferSize > minCompressionSize) {
+ if (Objects.isNull(compressedByteBuffer)) {
+ compressedByteBuffer =
+ ByteBuffer.allocate(
+ compressor.getMaxBytesForCompression(
+
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
+ }
+ compressedByteBuffer.clear();
+ if (compressor.getType() != compressionType) {
+ compressor = ICompressor.getCompressor(compressionType);
+ }
+ compressor.compress(buffer, compressedByteBuffer);
+ buffer = compressedByteBuffer;
+ bufferSize = buffer.position();
+ buffer.flip();
+ compressed = true;
+ }
+ size += bufferSize;
+ /*
+ Header structure:
+ [CompressionType(1 byte)][dataBufferSize(4 bytes)][uncompressedSize(4
bytes)]
+ */
+ headerBuffer.clear();
+ headerBuffer.put(
+ compressed ? compressionType.serialize() :
CompressionType.UNCOMPRESSED.serialize());
+ headerBuffer.putInt(bufferSize);
+ if (compressed) {
+ headerBuffer.putInt(uncompressedSize);
+ }
+ size += headerBuffer.position();
try {
+ headerBuffer.flip();
+ logChannel.write(headerBuffer);
logChannel.write(buffer);
} catch (ClosedChannelException e) {
logger.warn("Cannot write to {}", logFile, e);
}
+ return ((double) bufferSize / uncompressedSize);
}
@Override
@@ -79,6 +149,10 @@ public abstract class LogWriter implements ILogWriter {
return size;
}
+ public long originalSize() {
+ return originalSize;
+ }
+
@Override
public File getLogFile() {
return logFile;
@@ -97,4 +171,8 @@ public abstract class LogWriter implements ILogWriter {
}
}
}
+
+ public long getOffset() throws IOException {
+ return logChannel.position();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
index f101eaf3647..882b5ea468c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import java.io.Closeable;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ public class WALByteBufReader implements Closeable {
private final File logFile;
private final FileChannel channel;
private final WALMetaData metaData;
+ private final DataInputStream logStream;
private final Iterator<Integer> sizeIterator;
public WALByteBufReader(File logFile) throws IOException {
@@ -46,6 +48,7 @@ public class WALByteBufReader implements Closeable {
public WALByteBufReader(File logFile, FileChannel channel) throws
IOException {
this.logFile = logFile;
this.channel = channel;
+ this.logStream = new DataInputStream(new WALInputStream(logFile));
this.metaData = WALMetaData.readFromWALFile(logFile, channel);
this.sizeIterator = metaData.getBuffersSize().iterator();
channel.position(0);
@@ -63,9 +66,14 @@ public class WALByteBufReader implements Closeable {
*/
public ByteBuffer next() throws IOException {
int size = sizeIterator.next();
+ // TODO: Reuse this buffer
ByteBuffer buffer = ByteBuffer.allocate(size);
- channel.read(buffer);
- buffer.clear();
+ /*
+ Notice, we don't need to flip the buffer after calling
+ logStream.readFully, since this function does not change
+ the position of the buffer.
+ */
+ logStream.readFully(buffer.array(), 0, size);
return buffer;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
new file mode 100644
index 00000000000..844c06436b7
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+
+import org.apache.iotdb.db.utils.MmapUtil;
+
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+public class WALInputStream extends InputStream implements AutoCloseable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(WALInputStream.class);
+ private final FileChannel channel;
+ private final ByteBuffer segmentHeaderBuffer =
ByteBuffer.allocate(Integer.BYTES + Byte.BYTES);
+ private final ByteBuffer compressedHeader =
ByteBuffer.allocate(Integer.BYTES);
+ private ByteBuffer dataBuffer = null;
+ private ByteBuffer compressedBuffer = null;
+ private long fileSize;
+ File logFile;
+ /*
+ The WAL file consist of following parts:
+ [MagicString] [Segment 1] [Segment 2] ... [Segment N] [Metadata]
[MagicString]
+ The endOffset indicates the maximum offset that a segment can reach.
+ Aka, the last byte of the last segment.
+ */
+ private long endOffset = -1;
+
+ enum FileVersion {
+ V1,
+ V2,
+ UNKNOWN
+ };
+
+ FileVersion version;
+
+ public WALInputStream(File logFile) throws IOException {
+ channel = FileChannel.open(logFile.toPath());
+ fileSize = channel.size();
+ analyzeFileVersion();
+ getEndOffset();
+ this.logFile = logFile;
+ }
+
+ private void getEndOffset() throws IOException {
+ if (channel.size() < WALWriter.MAGIC_STRING_BYTES + Integer.BYTES) {
+ // An broken file
+ endOffset = channel.size();
+ return;
+ }
+ ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
+ long position;
+ try {
+ if (version == FileVersion.V2) {
+ // New Version
+ ByteBuffer magicStringBuffer =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+ channel.read(magicStringBuffer, channel.size() -
WALWriter.MAGIC_STRING_BYTES);
+ magicStringBuffer.flip();
+ if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
+ .equals(WALWriter.MAGIC_STRING)) {
+ // This is a broken wal file
+ endOffset = channel.size();
+ return;
+ } else {
+ // This is a normal wal file
+ position = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES;
+ }
+ } else {
+ // Old version
+ ByteBuffer magicStringBuffer =
+
ByteBuffer.allocate(WALWriter.MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8).length);
+ channel.read(
+ magicStringBuffer,
+ channel.size() -
WALWriter.MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8).length);
+ magicStringBuffer.flip();
+ if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
+ .equals(WALWriter.MAGIC_STRING_V1)) {
+ // this is a broken wal file
+ endOffset = channel.size();
+ return;
+ } else {
+ position =
+ channel.size()
+ -
WALWriter.MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8).length
+ - Integer.BYTES;
+ }
+ }
+ // Read the meta data size
+ channel.read(metadataSizeBuf, position);
+ metadataSizeBuf.flip();
+ int metadataSize = metadataSizeBuf.getInt();
+ endOffset = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES - metadataSize - 1;
+ } finally {
+ channel.position(WALWriter.MAGIC_STRING_BYTES);
+ }
+ }
+
+ private void analyzeFileVersion() throws IOException {
+ if (channel.size() < WALWriter.MAGIC_STRING_BYTES) {
+ version = FileVersion.UNKNOWN;
+ return;
+ }
+ if (isCurrentVersion()) {
+ this.version = FileVersion.V2;
+ return;
+ }
+ this.version = FileVersion.V1;
+ }
+
+ private boolean isCurrentVersion() throws IOException {
+ channel.position(0);
+ ByteBuffer buffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+ channel.read(buffer);
+ return new String(buffer.array(),
StandardCharsets.UTF_8).equals(WALWriter.MAGIC_STRING);
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (Objects.isNull(dataBuffer) || dataBuffer.position() >=
dataBuffer.limit()) {
+ loadNextSegment();
+ }
+ return dataBuffer.get() & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (Objects.isNull(dataBuffer) || dataBuffer.position() >=
dataBuffer.limit()) {
+ loadNextSegment();
+ }
+ if (dataBuffer.remaining() >= len) {
+ dataBuffer.get(b, off, len);
+ return len;
+ }
+ int toBeRead = len;
+ while (toBeRead > 0) {
+ int remaining = dataBuffer.remaining();
+ int bytesRead = Math.min(remaining, toBeRead);
+ dataBuffer.get(b, off, bytesRead);
+ off += bytesRead;
+ toBeRead -= bytesRead;
+ if (toBeRead > 0) {
+ loadNextSegment();
+ }
+ }
+ return len;
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ dataBuffer = null;
+ }
+
+ @Override
+ public int available() throws IOException {
+ long size = (endOffset - channel.position());
+ if (!Objects.isNull(dataBuffer)) {
+ size += dataBuffer.limit() - dataBuffer.position();
+ }
+ return (int) size;
+ }
+
+ private void loadNextSegment() throws IOException {
+ if (channel.position() >= endOffset) {
+ throw new IOException("End of file");
+ }
+ if (version == FileVersion.V2) {
+ loadNextSegmentV2();
+ } else if (version == FileVersion.V1) {
+ loadNextSegmentV1();
+ } else {
+ tryLoadSegment();
+ }
+ }
+
+ private void loadNextSegmentV1() throws IOException {
+ // just read raw data as input
+ if (channel.position() >= fileSize) {
+ throw new IOException("Unexpected end of file");
+ }
+ if (Objects.isNull(dataBuffer)) {
+ // read 128 KB
+ dataBuffer = ByteBuffer.allocate(128 * 1024);
+ }
+ dataBuffer.clear();
+ channel.read(dataBuffer);
+ dataBuffer.flip();
+ }
+
+ private void loadNextSegmentV2() throws IOException {
+ SegmentInfo segmentInfo = getNextSegmentInfo();
+ if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
+ // A compressed segment
+ if (Objects.isNull(dataBuffer)
+ || dataBuffer.capacity() < segmentInfo.uncompressedSize
+ || dataBuffer.capacity() > segmentInfo.uncompressedSize * 2) {
+ if (!Objects.isNull(dataBuffer)) {
+ MmapUtil.clean((MappedByteBuffer) dataBuffer);
+ }
+ dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
+ }
+ dataBuffer.clear();
+
+ if (Objects.isNull(compressedBuffer)
+ || compressedBuffer.capacity() < segmentInfo.dataInDiskSize
+ || compressedBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
+ if (!Objects.isNull(compressedBuffer)) {
+ MmapUtil.clean((MappedByteBuffer) compressedBuffer);
+ }
+ compressedBuffer =
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
+ }
+ compressedBuffer.clear();
+ // limit the buffer to prevent it from reading too much byte than
expected
+ compressedBuffer.limit(segmentInfo.dataInDiskSize);
+ if (channel.read(compressedBuffer) != segmentInfo.dataInDiskSize) {
+ throw new IOException("Unexpected end of file");
+ }
+ compressedBuffer.flip();
+
+ IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
+ unCompressor.uncompress(compressedBuffer, dataBuffer);
+ } else {
+ // An uncompressed segment
+ if (Objects.isNull(dataBuffer)
+ || dataBuffer.capacity() < segmentInfo.dataInDiskSize
+ || dataBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
+ if (!Objects.isNull(dataBuffer)) {
+ MmapUtil.clean((MappedByteBuffer) dataBuffer);
+ }
+ dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
+ }
+ dataBuffer.clear();
+ // limit the buffer to prevent it from reading too much byte than
expected
+ dataBuffer.limit(segmentInfo.dataInDiskSize);
+
+ if (channel.read(dataBuffer) != segmentInfo.dataInDiskSize) {
+ throw new IOException("Unexpected end of file");
+ }
+ }
+ dataBuffer.flip();
+ }
+
+ private void tryLoadSegment() throws IOException {
+ long originPosition = channel.position();
+ try {
+ loadNextSegmentV2();
+ version = FileVersion.V2;
+ } catch (Throwable e) {
+ // failed to load in V2 way, try in V1 way
+ logger.warn("Failed to load WAL segment in V2 way, try in V1 way", e);
+ channel.position(originPosition);
+ }
+
+ if (version == FileVersion.UNKNOWN) {
+ loadNextSegmentV1();
+ version = FileVersion.V1;
+ }
+ }
+
+ /**
+ * Since current WAL file is compressed, but some part of the system need to
skip the offset of an
+ * uncompressed wal file, this method is used to skip to the given logical
position.
+ *
+ * @param pos The logical offset to skip to
+ * @throws IOException If the file is broken or the given position is invalid
+ */
+ public void skipToGivenLogicalPosition(long pos) throws IOException {
+ if (version == FileVersion.V2) {
+ channel.position(WALWriter.MAGIC_STRING_BYTES);
+ long posRemain = pos;
+ SegmentInfo segmentInfo = null;
+ do {
+ segmentInfo = getNextSegmentInfo();
+ if (posRemain >= segmentInfo.uncompressedSize) {
+ posRemain -= segmentInfo.uncompressedSize;
+ channel.position(channel.position() + segmentInfo.dataInDiskSize);
+ } else {
+ break;
+ }
+ } while (posRemain >= 0);
+
+ if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
+ compressedBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+ channel.read(compressedBuffer);
+ compressedBuffer.flip();
+ IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
+ dataBuffer = ByteBuffer.allocate(segmentInfo.uncompressedSize);
+ unCompressor.uncompress(compressedBuffer, dataBuffer);
+ } else {
+ dataBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+ channel.read(dataBuffer);
+ dataBuffer.flip();
+ }
+
+ dataBuffer.position((int) posRemain);
+ } else {
+ dataBuffer = null;
+ channel.position(pos);
+ }
+ }
+
+ public void read(ByteBuffer buffer) throws IOException {
+ int totalBytesToBeRead = buffer.remaining();
+ int currReadBytes = Math.min(dataBuffer.remaining(), buffer.remaining());
+ dataBuffer.get(buffer.array(), buffer.position(), currReadBytes);
+ if (totalBytesToBeRead - currReadBytes > 0) {
+ loadNextSegment();
+ read(buffer);
+ }
+ }
+
+ public long getFileCurrentPos() throws IOException {
+ return channel.position();
+ }
+
+ private SegmentInfo getNextSegmentInfo() throws IOException {
+ segmentHeaderBuffer.clear();
+ channel.read(segmentHeaderBuffer);
+ segmentHeaderBuffer.flip();
+ SegmentInfo info = new SegmentInfo();
+ info.compressionType =
CompressionType.deserialize(segmentHeaderBuffer.get());
+ info.dataInDiskSize = segmentHeaderBuffer.getInt();
+ if (info.compressionType != CompressionType.UNCOMPRESSED) {
+ compressedHeader.clear();
+ channel.read(compressedHeader);
+ compressedHeader.flip();
+ info.uncompressedSize = compressedHeader.getInt();
+ } else {
+ info.uncompressedSize = info.dataInDiskSize;
+ }
+ return info;
+ }
+
+ private class SegmentInfo {
+ public CompressionType compressionType;
+ public int dataInDiskSize;
+ public int uncompressedSize;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
index 9ca700a62f7..7fa634ffbdc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
@@ -22,10 +22,14 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.db.utils.SerializedSize;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -36,6 +40,7 @@ import java.util.Set;
* entry and the number of entries.
*/
public class WALMetaData implements SerializedSize {
+ private static final Logger logger =
LoggerFactory.getLogger(WALMetaData.class);
// search index 8 byte, wal entries' number 4 bytes
private static final int FIXED_SERIALIZED_SIZE = Long.BYTES + Integer.BYTES;
@@ -45,6 +50,7 @@ public class WALMetaData implements SerializedSize {
private final List<Integer> buffersSize;
// memTable ids of this wal file
private final Set<Long> memTablesId;
+ private long truncateOffSet = 0;
public WALMetaData() {
this(ConsensusReqReader.DEFAULT_SEARCH_INDEX, new ArrayList<>(), new
HashSet<>());
@@ -79,7 +85,7 @@ public class WALMetaData implements SerializedSize {
+ (memTablesId.isEmpty() ? 0 : Integer.BYTES + memTablesId.size() *
Long.BYTES);
}
- public void serialize(ByteBuffer buffer) {
+ public void serialize(File file, ByteBuffer buffer) {
buffer.putLong(firstSearchIndex);
buffer.putInt(buffersSize.size());
for (int size : buffersSize) {
@@ -123,8 +129,7 @@ public class WALMetaData implements SerializedSize {
}
public static WALMetaData readFromWALFile(File logFile, FileChannel channel)
throws IOException {
- if (channel.size() < WALWriter.MAGIC_STRING_BYTES
- || !readTailMagic(channel).equals(WALWriter.MAGIC_STRING)) {
+ if (channel.size() < WALWriter.MAGIC_STRING_BYTES ||
!isValidMagicString(channel)) {
throw new IOException(String.format("Broken wal file %s", logFile));
}
// load metadata size
@@ -153,10 +158,20 @@ public class WALMetaData implements SerializedSize {
return metaData;
}
- private static String readTailMagic(FileChannel channel) throws IOException {
+ private static boolean isValidMagicString(FileChannel channel) throws
IOException {
ByteBuffer magicStringBytes =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
channel.read(magicStringBytes, channel.size() -
WALWriter.MAGIC_STRING_BYTES);
magicStringBytes.flip();
- return new String(magicStringBytes.array());
+ String magicString = new String(magicStringBytes.array(),
StandardCharsets.UTF_8);
+ return magicString.equals(WALWriter.MAGIC_STRING)
+ || magicString.contains(WALWriter.MAGIC_STRING_V1);
+ }
+
+ public void setTruncateOffSet(long offset) {
+ this.truncateOffSet = offset;
+ }
+
+ public long getTruncateOffSet() {
+ return truncateOffSet;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
index ee50c73df97..1310bb36b46 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
@@ -26,12 +26,10 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -46,6 +44,7 @@ public class WALReader implements Closeable {
private final File logFile;
private final boolean fileMayCorrupt;
+ private final WALInputStream walInputStream;
private final DataInputStream logStream;
private WALEntry nextEntry;
private boolean fileCorrupted = false;
@@ -57,9 +56,8 @@ public class WALReader implements Closeable {
public WALReader(File logFile, boolean fileMayCorrupt) throws IOException {
this.logFile = logFile;
this.fileMayCorrupt = fileMayCorrupt;
- this.logStream =
- new DataInputStream(
- new BufferedInputStream(Files.newInputStream(logFile.toPath()),
STREAM_BUFFER_SIZE));
+ this.walInputStream = new WALInputStream(logFile);
+ this.logStream = new DataInputStream(walInputStream);
}
/** Like {@link Iterator#hasNext()}. */
@@ -88,6 +86,10 @@ public class WALReader implements Closeable {
return nextEntry != null;
}
+ public long getWALCurrentReadOffset() throws IOException {
+ return walInputStream.getFileCurrentPos();
+ }
+
/**
* Like {@link Iterator#next()}.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
index 425fc676fad..7017b4be6cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
@@ -24,21 +24,26 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALSignalEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
/** WALWriter writes the binary {@link WALEntry} into .wal file. */
public class WALWriter extends LogWriter {
- public static final String MAGIC_STRING = "WAL";
- public static final int MAGIC_STRING_BYTES = MAGIC_STRING.getBytes().length;
+ private static final Logger logger =
LoggerFactory.getLogger(WALWriter.class);
+ public static final String MAGIC_STRING_V1 = "WAL";
+ public static final String MAGIC_STRING = "V2-WAL";
+ public static final int MAGIC_STRING_BYTES =
MAGIC_STRING.getBytes(StandardCharsets.UTF_8).length;
private WALFileStatus walFileStatus =
WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
// wal files' metadata
protected final WALMetaData metaData = new WALMetaData();
- public WALWriter(File logFile) throws FileNotFoundException {
+ public WALWriter(File logFile) throws IOException {
super(logFile);
}
@@ -47,11 +52,11 @@ public class WALWriter extends LogWriter {
*
* @throws IOException when failing to write
*/
- public void write(ByteBuffer buffer, WALMetaData metaData) throws
IOException {
+ public double write(ByteBuffer buffer, WALMetaData metaData) throws
IOException {
// update metadata
updateMetaData(metaData);
// flush buffer
- write(buffer);
+ return write(buffer);
}
public void updateMetaData(WALMetaData metaData) {
@@ -67,11 +72,18 @@ public class WALWriter extends LogWriter {
// mark info part ends
endMarker.serialize(buffer);
// flush meta data
- metaData.serialize(buffer);
+ metaData.serialize(logFile, buffer);
buffer.putInt(metaDataSize);
// add magic string
- buffer.put(MAGIC_STRING.getBytes());
- write(buffer);
+ buffer.put(MAGIC_STRING.getBytes(StandardCharsets.UTF_8));
+ size += buffer.position();
+ writeMetadata(buffer);
+ }
+
+ private void writeMetadata(ByteBuffer buffer) throws IOException {
+ size += buffer.position();
+ buffer.flip();
+ logChannel.write(buffer);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index e4a497de4d9..9a4b99f9ec0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -64,6 +64,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -107,13 +108,13 @@ public class WALNode implements IWALNode {
// insert nodes whose search index are before this value can be deleted
safely
private volatile long safelyDeletedSearchIndex =
DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
- public WALNode(String identifier, String logDirectory) throws
FileNotFoundException {
+ public WALNode(String identifier, String logDirectory) throws IOException {
this(identifier, logDirectory, 0, 0L);
}
public WALNode(
String identifier, String logDirectory, long startFileVersion, long
startSearchIndex)
- throws FileNotFoundException {
+ throws IOException {
this.identifier = identifier;
this.logDirectory = SystemFileFactory.INSTANCE.getFile(logDirectory);
if (!this.logDirectory.exists() && this.logDirectory.mkdirs()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
index 237b353c9f9..07a06448359 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
@@ -172,6 +172,7 @@ public class WALNodeRecoverTask implements Runnable {
}
}
}
+ metaData.setTruncateOffSet(walReader.getWALCurrentReadOffset());
metaData.add(walEntry.serializedSize(), searchIndex,
walEntry.getMemTableId());
}
} catch (Exception e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
index 7015d45c58b..5db0c17c987 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
@@ -41,14 +41,14 @@ public class WALRecoverWriter {
public void recover(WALMetaData metaData) throws IOException {
// locate broken data
- int truncateSize;
+ long truncateSize;
if (logFile.length() < MAGIC_STRING_BYTES) { // file without magic string
truncateSize = 0;
} else {
if (readTailMagic().equals(MAGIC_STRING)) { // complete file
return;
} else { // file with broken magic string
- truncateSize =
metaData.getBuffersSize().stream().mapToInt(Integer::intValue).sum();
+ truncateSize = metaData.getTruncateOffSet();
}
}
// truncate broken data
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index 13b2f4c9704..c794745c6f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.storageengine.dataregion.wal.utils;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALInputStream;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
import org.apache.tsfile.utils.Pair;
@@ -100,11 +101,12 @@ public class WALEntryPosition {
if (!canRead()) {
throw new IOException("Target file hasn't been specified.");
}
- try (FileChannel channel = openReadFileChannel()) {
+ // TODO: Reuse the file stream
+ try (WALInputStream is = openReadFileStream()) {
+ is.skipToGivenLogicalPosition(position);
ByteBuffer buffer = ByteBuffer.allocate(size);
- channel.position(position);
- channel.read(buffer);
- buffer.clear();
+ is.read(buffer);
+ buffer.flip();
return buffer;
}
}
@@ -135,6 +137,27 @@ public class WALEntryPosition {
}
}
+ public WALInputStream openReadFileStream() throws IOException {
+ // TODO: Refactor this part of code
+ if (isInSealedFile()) {
+ walFile = walNode.getWALFile(walFileVersionId);
+ return new WALInputStream(walFile);
+ } else {
+ try {
+ walFile = walNode.getWALFile(walFileVersionId);
+ return new WALInputStream(walFile);
+ } catch (IOException e) {
+ // unsealed file may be renamed after sealed, so we should try again
+ if (isInSealedFile()) {
+ walFile = walNode.getWALFile(walFileVersionId);
+ return new WALInputStream(walFile);
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+
public File getWalFile() {
return walFile;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
index cbf2ead7464..5e53e9f664a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
@@ -22,15 +22,14 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALInputStream;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
@@ -86,8 +85,7 @@ public class WalChecker {
}
private boolean checkFile(File walFile) {
- try (DataInputStream logStream =
- new DataInputStream(new BufferedInputStream(new
FileInputStream(walFile)))) {
+ try (DataInputStream logStream = new DataInputStream(new
WALInputStream(walFile))) {
while (logStream.available() > 0) {
WALEntry walEntry = WALEntry.deserialize(logStream);
if (walEntry.getType() == WALEntryType.WAL_FILE_INFO_END_MARKER) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALTestUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALTestUtils.java
new file mode 100644
index 00000000000..8f671eb3f03
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALTestUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+import java.lang.reflect.Field;
+
+public class WALTestUtils {
+ public static void setMinCompressionSize(long size)
+ throws NoSuchFieldException, ClassNotFoundException,
IllegalAccessException {
+ Class<?> logWriterClass =
+
Class.forName("org.apache.iotdb.db.storageengine.dataregion.wal.io.LogWriter");
+ Field minCompressionSizeField =
logWriterClass.getDeclaredField("minCompressionSize");
+ minCompressionSizeField.setAccessible(true);
+ minCompressionSizeField.setLong(null, size);
+ }
+
+ public static long getMinCompressionSize()
+ throws ClassNotFoundException, NoSuchFieldException,
IllegalAccessException {
+ Class<?> logWriterClass =
+
Class.forName("org.apache.iotdb.db.storageengine.dataregion.wal.io.LogWriter");
+ Field minCompressionSizeField =
logWriterClass.getDeclaredField("minCompressionSize");
+ minCompressionSizeField.setAccessible(true);
+ return minCompressionSizeField.getLong(null);
+ }
+
+ public static InsertRowNode getInsertRowNode(String devicePath, long time)
+ throws IllegalPathException, QueryProcessException {
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ Object[] columns = new Object[6];
+ columns[0] = 1.0d;
+ columns[1] = 2f;
+ columns[2] = 10000L;
+ columns[3] = 100;
+ columns[4] = false;
+ columns[5] = new Binary("hh" + 0, TSFileConfig.STRING_CHARSET);
+
+ InsertRowNode node =
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath(devicePath),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ time,
+ columns,
+ false);
+ MeasurementSchema[] schemas = new MeasurementSchema[6];
+ for (int i = 0; i < 6; i++) {
+ schemas[i] = new MeasurementSchema("s" + (i + 1), dataTypes[i]);
+ }
+ node.setMeasurementSchemas(schemas);
+ return node;
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
new file mode 100644
index 00000000000..d187f6107b6
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.compression;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.storageengine.dataregion.wal.WALTestUtils;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALBuffer;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALInfoEntry;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALSignalEntry;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.LogWriter;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALInputStream;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALReader;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
+import org.apache.iotdb.db.utils.constant.TestConstant;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+public class WALCompressionTest {
+ private final File walFile =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ WALFileUtils.getLogFileName(0, 0,
WALFileStatus.CONTAINS_SEARCH_INDEX)));
+
+ private final String compressionDir =
+ TestConstant.OUTPUT_DATA_DIR.concat(File.separator + "wal-compression");
+
+ private final String devicePath = "root.sg.d1";
+ long originalMinCompressionSize;
+ CompressionType originCompressionType =
+ IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm();
+
+ @Before
+ public void setUp()
+ throws IOException, NoSuchFieldException, ClassNotFoundException,
IllegalAccessException {
+ if (walFile.exists()) {
+ FileUtils.delete(walFile);
+ }
+ originalMinCompressionSize = WALTestUtils.getMinCompressionSize();
+ if (new File(compressionDir).exists()) {
+ FileUtils.forceDelete(new File(compressionDir));
+ }
+ }
+
+ @After
+ public void tearDown()
+ throws IOException, NoSuchFieldException, ClassNotFoundException,
IllegalAccessException {
+ if (walFile.exists()) {
+ FileUtils.delete(walFile);
+ }
+ if (new File(compressionDir).exists()) {
+ FileUtils.forceDelete(new File(compressionDir));
+ }
+ WALTestUtils.setMinCompressionSize(originalMinCompressionSize);
+
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(originCompressionType);
+ }
+
+ @Test
+ public void testSkipToGivenPositionWithCompression()
+ throws NoSuchFieldException,
+ ClassNotFoundException,
+ IllegalAccessException,
+ QueryProcessException,
+ IllegalPathException,
+ IOException {
+ WALTestUtils.setMinCompressionSize(0L);
+
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(CompressionType.LZ4);
+ testSkipToGivenPosition();
+ }
+
+ @Test
+ public void testSkipToGivenPositionWithoutCompression()
+ throws NoSuchFieldException,
+ ClassNotFoundException,
+ IllegalAccessException,
+ QueryProcessException,
+ IllegalPathException,
+ IOException {
+ WALTestUtils.setMinCompressionSize(1024 * 32);
+ testSkipToGivenPosition();
+ }
+
+ public void testSkipToGivenPosition()
+ throws QueryProcessException, IllegalPathException, IOException {
+ LogWriter writer = new WALWriter(walFile);
+ ByteBuffer buffer = ByteBuffer.allocate(1024 * 4);
+ List<Pair<Long, InsertRowNode>> positionAndEntryPairList = new
ArrayList<>();
+ int memTableId = 0;
+ long fileOffset = 0;
+ for (int i = 0; i < 100; ) {
+ InsertRowNode insertRowNode = WALTestUtils.getInsertRowNode(devicePath +
memTableId, i);
+ long serializedSize = insertRowNode.serializedSize();
+ if (buffer.remaining() >= serializedSize) {
+ int pos = buffer.position();
+ insertRowNode.serialize(buffer);
+ positionAndEntryPairList.add(new Pair<>(fileOffset, insertRowNode));
+ fileOffset += buffer.position() - pos;
+ i++;
+ } else {
+ writer.write(buffer);
+ buffer.clear();
+ }
+ }
+ if (buffer.position() != 0) {
+ writer.write(buffer);
+ }
+ writer.close();
+ try (WALInputStream stream = new WALInputStream(walFile)) {
+ for (int i = 0; i < 100; ++i) {
+ Pair<Long, InsertRowNode> positionAndNodePair =
positionAndEntryPairList.get(i);
+ stream.skipToGivenLogicalPosition(positionAndNodePair.left);
+ /*
+ Add the allocated buffer size by 2, because the actual serialized
size
+ of InsertRowNode is larger than the estimated value got by
serializedSize.
+ I don't know if this is a bug or not.
+ */
+ ByteBuffer nodeBuffer1 =
+ ByteBuffer.allocate(positionAndNodePair.right.serializedSize() +
2);
+ stream.read(nodeBuffer1);
+ ByteBuffer nodeBuffer2 =
+ ByteBuffer.allocate(positionAndNodePair.right.serializedSize() +
2);
+ positionAndNodePair.right.serialize(nodeBuffer2);
+ nodeBuffer2.flip();
+ Assert.assertArrayEquals(nodeBuffer1.array(), nodeBuffer2.array());
+ }
+ }
+ }
+
+ @Test
+ public void testUncompressedWALStructure()
+ throws QueryProcessException, IllegalPathException, IOException {
+ PublicBAOS baos = new PublicBAOS();
+ DataOutputStream dataOutputStream = new DataOutputStream(baos);
+ List<InsertRowNode> insertRowNodes = new ArrayList<>();
+ for (int i = 0; i < 100; ++i) {
+ InsertRowNode node = WALTestUtils.getInsertRowNode(devicePath, i);
+ insertRowNodes.add(node);
+ node.serialize(dataOutputStream);
+ }
+ dataOutputStream.close();
+ ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
+ // Do not compress it
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setWALCompressionAlgorithm(CompressionType.UNCOMPRESSED);
+ try (WALWriter writer = new WALWriter(walFile)) {
+ buf.position(buf.limit());
+ writer.write(buf);
+ }
+
+ try (DataInputStream dataInputStream =
+ new DataInputStream(new
BufferedInputStream(Files.newInputStream(walFile.toPath())))) {
+ byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_BYTES];
+ // head magic string
+ dataInputStream.readFully(magicStringBytes);
+ Assert.assertEquals(WALWriter.MAGIC_STRING, new
String(magicStringBytes));
+ Assert.assertEquals(
+ CompressionType.UNCOMPRESSED,
CompressionType.deserialize(dataInputStream.readByte()));
+ Assert.assertEquals(buf.array().length, dataInputStream.readInt());
+ ByteBuffer dataBuf = ByteBuffer.allocate(buf.array().length);
+ dataInputStream.readFully(dataBuf.array());
+ Assert.assertArrayEquals(buf.array(), dataBuf.array());
+ Assert.assertEquals(
+ new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER),
+ WALEntry.deserialize(dataInputStream));
+ ByteBuffer metadataBuf = ByteBuffer.allocate(12 + Integer.BYTES);
+ dataInputStream.readFully(metadataBuf.array());
+ // Tail magic string
+ dataInputStream.readFully(magicStringBytes);
+ Assert.assertEquals(WALWriter.MAGIC_STRING, new
String(magicStringBytes));
+ }
+ }
+
+ @Test
+ public void testCompressedWALStructure()
+ throws IOException,
+ QueryProcessException,
+ IllegalPathException,
+ NoSuchFieldException,
+ ClassNotFoundException,
+ IllegalAccessException {
+ PublicBAOS baos = new PublicBAOS();
+ DataOutputStream dataOutputStream = new DataOutputStream(baos);
+ List<InsertRowNode> insertRowNodes = new ArrayList<>();
+ for (int i = 0; i < 100; ++i) {
+ InsertRowNode node = WALTestUtils.getInsertRowNode(devicePath, i);
+ insertRowNodes.add(node);
+ node.serialize(dataOutputStream);
+ }
+ dataOutputStream.close();
+ ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
+ // Compress it
+
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(CompressionType.LZ4);
+ WALTestUtils.setMinCompressionSize(0);
+ try (WALWriter writer = new WALWriter(walFile)) {
+ buf.position(buf.limit());
+ writer.write(buf);
+ }
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.LZ4);
+ byte[] compressed = compressor.compress(buf.array());
+
+ try (DataInputStream dataInputStream =
+ new DataInputStream(new
BufferedInputStream(Files.newInputStream(walFile.toPath())))) {
+ byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_BYTES];
+ // head magic string
+ dataInputStream.readFully(magicStringBytes);
+ Assert.assertEquals(WALWriter.MAGIC_STRING, new
String(magicStringBytes));
+ Assert.assertEquals(
+ CompressionType.LZ4,
CompressionType.deserialize(dataInputStream.readByte()));
+ Assert.assertEquals(compressed.length, dataInputStream.readInt());
+ Assert.assertEquals(buf.array().length, dataInputStream.readInt());
+ ByteBuffer dataBuf = ByteBuffer.allocate(compressed.length);
+ dataInputStream.readFully(dataBuf.array());
+ Assert.assertArrayEquals(compressed, dataBuf.array());
+ IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(CompressionType.LZ4);
+ Assert.assertArrayEquals(unCompressor.uncompress(compressed),
buf.array());
+ Assert.assertEquals(
+ new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER),
+ WALEntry.deserialize(dataInputStream));
+ ByteBuffer metadataBuf = ByteBuffer.allocate(12 + Integer.BYTES);
+ dataInputStream.readFully(metadataBuf.array());
+ // Tail magic string
+ dataInputStream.readFully(magicStringBytes);
+ Assert.assertEquals(WALWriter.MAGIC_STRING, new
String(magicStringBytes));
+ }
+ }
+
+ @Test
+ public void testWALReaderWithoutCompression()
+ throws QueryProcessException, IllegalPathException, IOException,
InterruptedException {
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setWALCompressionAlgorithm(CompressionType.UNCOMPRESSED);
+ testWALReader();
+ }
+
+ @Test
+ public void testWALReaderWithCompression()
+ throws QueryProcessException,
+ IllegalPathException,
+ IOException,
+ InterruptedException,
+ NoSuchFieldException,
+ ClassNotFoundException,
+ IllegalAccessException {
+
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(CompressionType.LZ4);
+ WALTestUtils.setMinCompressionSize(0);
+ testWALReader();
+ }
+
+ public void testWALReader()
+ throws IOException, QueryProcessException, IllegalPathException,
InterruptedException {
+ File dir = new File(compressionDir);
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+ WALBuffer walBuffer = new WALBuffer("", compressionDir);
+ List<WALEntry> entryList = new ArrayList<>();
+ for (int i = 0; i < 100; ++i) {
+ InsertRowNode node = WALTestUtils.getInsertRowNode(devicePath, i);
+ WALEntry entry = new WALInfoEntry(0, node);
+ walBuffer.write(entry);
+ entryList.add(entry);
+ }
+ long sleepTime = 0;
+ while (!walBuffer.isAllWALEntriesConsumed()) {
+ Thread.sleep(100);
+ sleepTime += 100;
+ if (sleepTime > 10_000) {
+ Assert.fail("It has been too long for all entries to be consumed");
+ }
+ }
+ walBuffer.close();
+
+ File[] walFiles = WALFileUtils.listAllWALFiles(new File(compressionDir));
+ Assert.assertNotNull(walFiles);
+ Assert.assertEquals(1, walFiles.length);
+ List<WALEntry> readWALEntryList = new ArrayList<>();
+ try (WALReader reader = new WALReader(walFiles[0])) {
+ while (reader.hasNext()) {
+ readWALEntryList.add(reader.next());
+ }
+ }
+ Assert.assertEquals(entryList, readWALEntryList);
+
+ try (WALByteBufReader reader = new WALByteBufReader(walFiles[0])) {
+ for (int i = 0; i < 100; ++i) {
+ Assert.assertTrue(reader.hasNext());
+ ByteBuffer buffer = reader.next();
+ Assert.assertEquals(entryList.get(i).serializedSize(),
buffer.array().length);
+ }
+ }
+ }
+
+ @Test
+ public void testHotLoad()
+ throws IOException,
+ QueryProcessException,
+ IllegalPathException,
+ InterruptedException,
+ NoSuchFieldException,
+ ClassNotFoundException,
+ IllegalAccessException {
+ File dir = new File(compressionDir);
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+ WALTestUtils.setMinCompressionSize(0);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setWALCompressionAlgorithm(CompressionType.UNCOMPRESSED);
+ // Do not compress wal for these entries
+ WALBuffer walBuffer = new WALBuffer("", compressionDir);
+ List<WALEntry> entryList = new ArrayList<>();
+ for (int i = 0; i < 50; ++i) {
+ InsertRowNode node = WALTestUtils.getInsertRowNode(devicePath, i);
+ WALEntry entry = new WALInfoEntry(0, node);
+ walBuffer.write(entry);
+ entryList.add(entry);
+ }
+ long sleepTime = 0;
+ while (!walBuffer.isAllWALEntriesConsumed()) {
+ Thread.sleep(100);
+ sleepTime += 100;
+ if (sleepTime > 10_000) {
+ Assert.fail("It has been too long for all entries to be consumed");
+ }
+ }
+
+ // compress wal for these entries
+
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(CompressionType.LZ4);
+ for (int i = 50; i < 100; ++i) {
+ InsertRowNode node = WALTestUtils.getInsertRowNode(devicePath, i);
+ WALEntry entry = new WALInfoEntry(0, node);
+ walBuffer.write(entry);
+ entryList.add(entry);
+ }
+ sleepTime = 0;
+ while (!walBuffer.isAllWALEntriesConsumed()) {
+ Thread.sleep(100);
+ sleepTime += 100;
+ if (sleepTime > 10_000) {
+ Assert.fail("It has been too long for all entries to be consumed");
+ }
+ }
+ walBuffer.close();
+
+ File[] walFiles = WALFileUtils.listAllWALFiles(new File(compressionDir));
+ Assert.assertNotNull(walFiles);
+ Assert.assertEquals(1, walFiles.length);
+ List<WALEntry> readWALEntryList = new ArrayList<>();
+ try (WALReader reader = new WALReader(walFiles[0])) {
+ while (reader.hasNext()) {
+ readWALEntryList.add(reader.next());
+ }
+ }
+ Assert.assertEquals(entryList, readWALEntryList);
+
+ try (WALByteBufReader reader = new WALByteBufReader(walFiles[0])) {
+ for (int i = 0; i < 100; ++i) {
+ Assert.assertTrue(reader.hasNext());
+ ByteBuffer buffer = reader.next();
+ Assert.assertEquals(entryList.get(i).serializedSize(),
buffer.array().length);
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
index 61f5e4150e5..a3c2e71e4d2 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java
@@ -80,9 +80,12 @@ public class WALNodeTest {
private WALMode prevMode;
private String prevConsensus;
private WALNode walNode;
+ private long originWALThreshold =
+
IoTDBDescriptor.getInstance().getConfig().getWalFileSizeThresholdInByte();
@Before
public void setUp() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setWalFileSizeThresholdInByte(2
* 1024 * 1024);
EnvironmentUtils.cleanDir(logDirectory);
prevMode = config.getWalMode();
prevConsensus = config.getDataRegionConsensusProtocolClass();
@@ -93,6 +96,7 @@ public class WALNodeTest {
@After
public void tearDown() throws Exception {
+
IoTDBDescriptor.getInstance().getConfig().setWalFileSizeThresholdInByte(originWALThreshold);
walNode.close();
config.setWalMode(prevMode);
config.setDataRegionConsensusProtocolClass(prevConsensus);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
index 7193d7722a5..0ba1ebf2571 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverManagerTest.java
@@ -109,9 +109,12 @@ public class WALRecoverManagerTest {
private CheckpointManager checkpointManager;
private TsFileResource tsFileWithWALResource;
private TsFileResource tsFileWithoutWALResource;
+ private long originWALThreshold =
+
IoTDBDescriptor.getInstance().getConfig().getWalFileSizeThresholdInByte();
@Before
public void setUp() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setWalFileSizeThresholdInByte(1
* 1024 * 1024);
EnvironmentUtils.cleanDir(new File(FILE_WITH_WAL_NAME).getParent());
EnvironmentUtils.envSetUp();
prevMode = config.getWalMode();
@@ -122,6 +125,7 @@ public class WALRecoverManagerTest {
@After
public void tearDown() throws Exception {
+
IoTDBDescriptor.getInstance().getConfig().setWalFileSizeThresholdInByte(originWALThreshold);
if (tsFileWithWALResource != null) {
tsFileWithWALResource.close();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
index c22505ad1a3..97b00f379f2 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
@@ -71,9 +71,13 @@ public class WALRecoverWriterTest {
// recover
WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
walRecoverWriter.recover(walMetaData);
- // verify file, marker + metadata(search index + size number) + metadata
size + magic string
+ // verify file, marker + metadata(search index + size number) + metadata
size + head magic
+ // string + tail magic string
Assert.assertEquals(
- Byte.BYTES + (Long.BYTES + Integer.BYTES) + Integer.BYTES +
WALWriter.MAGIC_STRING_BYTES,
+ Byte.BYTES
+ + (Long.BYTES + Integer.BYTES)
+ + Integer.BYTES
+ + WALWriter.MAGIC_STRING_BYTES * 2,
logFile.length());
try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
Assert.assertFalse(reader.hasNext());
@@ -95,7 +99,10 @@ public class WALRecoverWriterTest {
walRecoverWriter.recover(walMetaData);
// verify file, marker + metadata(search index + size number) + metadata
size + magic string
Assert.assertEquals(
- Byte.BYTES + (Long.BYTES + Integer.BYTES) + Integer.BYTES +
WALWriter.MAGIC_STRING_BYTES,
+ Byte.BYTES
+ + (Long.BYTES + Integer.BYTES)
+ + Integer.BYTES
+ + WALWriter.MAGIC_STRING_BYTES * 2,
logFile.length());
try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
Assert.assertFalse(reader.hasNext());
@@ -162,6 +169,7 @@ public class WALRecoverWriterTest {
walMetaData.add(size, 1, walEntry.getMemTableId());
try (WALWriter walWriter = new WALWriter(logFile)) {
walWriter.write(buffer.getBuffer(), walMetaData);
+ walMetaData.setTruncateOffSet(walWriter.getOffset());
}
long len = logFile.length();
try (FileChannel channel = FileChannel.open(logFile.toPath(),
StandardOpenOption.APPEND)) {
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
index 38f3f7c00f0..7dbf5f5b716 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
@@ -1431,6 +1431,10 @@ data_replication_factor=1
# Datatype: long
# iot_consensus_cache_window_time_in_ms=-1
+# Enable Write Ahead Log compression.
+# Option: true, false
+# enable_wal_compression=false
+
####################
### IoTConsensus Configuration
####################