This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c0743d98c2c Enhance wal compression (#12830)
c0743d98c2c is described below
commit c0743d98c2c249958d6063dfacd86d5ca0c3fb43
Author: Potato <[email protected]>
AuthorDate: Mon Jul 1 21:24:42 2024 +0800
Enhance wal compression (#12830)
* use directbuffer to optimize performance
Signed-off-by: OneSizeFitQuorum <[email protected]>
* optimize log
Signed-off-by: OneSizeFitQuorum <[email protected]>
* fix
Signed-off-by: OneSizeFitQuorum <[email protected]>
* fix review
Signed-off-by: OneSizeFitQuorum <[email protected]>
---------
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../confignode1conf/iotdb-system.properties | 2 --
.../dataregion/wal/io/WALInputStream.java | 34 +++++++++++++---------
.../java/org/apache/iotdb/db/utils/MmapUtil.java | 8 +++++
3 files changed, 29 insertions(+), 15 deletions(-)
diff --git
a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
index b396e373f86..5e97f60ba3c 100644
---
a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
+++
b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
@@ -33,8 +33,6 @@ cn_metric_prometheus_reporter_port=9091
timestamp_precision=ms
data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
-schema_replication_factor=3
-data_replication_factor=3
udf_lib_dir=target/confignode1/ext/udf
trigger_lib_dir=target/confignode1/ext/trigger
pipe_lib_dir=target/confignode1/ext/pipe
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 90bfd52e371..350f8f5e995 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.io;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.file.metadata.enums.CompressionType;
@@ -164,6 +165,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
@Override
public void close() throws IOException {
channel.close();
+ MmapUtil.clean(dataBuffer);
dataBuffer = null;
}
@@ -210,14 +212,17 @@ public class WALInputStream extends InputStream
implements AutoCloseable {
if (Objects.isNull(dataBuffer)
|| dataBuffer.capacity() < segmentInfo.uncompressedSize
|| dataBuffer.capacity() > segmentInfo.uncompressedSize * 2) {
- dataBuffer = ByteBuffer.allocate(segmentInfo.uncompressedSize);
+ if (!Objects.isNull(dataBuffer)) {
+ MmapUtil.clean(dataBuffer);
+ }
+ dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
}
dataBuffer.clear();
if (Objects.isNull(compressedBuffer)
|| compressedBuffer.capacity() < segmentInfo.dataInDiskSize
|| compressedBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
- compressedBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+ compressedBuffer =
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
}
compressedBuffer.clear();
// limit the buffer to prevent it from reading too much byte than
expected
@@ -226,15 +231,18 @@ public class WALInputStream extends InputStream
implements AutoCloseable {
throw new IOException("Unexpected end of file");
}
compressedBuffer.flip();
-
IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
unCompressor.uncompress(compressedBuffer, dataBuffer);
+ MmapUtil.clean(compressedBuffer);
} else {
// An uncompressed segment
if (Objects.isNull(dataBuffer)
|| dataBuffer.capacity() < segmentInfo.dataInDiskSize
|| dataBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
- dataBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+ if (!Objects.isNull(dataBuffer)) {
+ MmapUtil.clean(dataBuffer);
+ }
+ dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
}
dataBuffer.clear();
// limit the buffer to prevent it from reading too much byte than
expected
@@ -250,16 +258,15 @@ public class WALInputStream extends InputStream
implements AutoCloseable {
private void tryLoadSegment() throws IOException {
long originPosition = channel.position();
try {
- loadNextSegmentV2();
- version = WALFileVersion.V2;
- return;
+ loadNextSegmentV1();
+ version = WALFileVersion.V1;
} catch (Throwable e) {
// failed to load in V2 way, try in V1 way
- logger.warn("Failed to load WAL segment in V2 way, try in V1 way", e);
channel.position(originPosition);
+ loadNextSegmentV2();
+ version = WALFileVersion.V2;
+ logger.info("Failed to load WAL segment in V1 way, try in V2 way
successfully.");
}
- loadNextSegmentV1();
- version = WALFileVersion.V1;
}
/**
@@ -286,14 +293,15 @@ public class WALInputStream extends InputStream
implements AutoCloseable {
} while (posRemain >= 0);
if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
- compressedBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+ compressedBuffer =
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
channel.read(compressedBuffer);
compressedBuffer.flip();
IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
- dataBuffer = ByteBuffer.allocate(segmentInfo.uncompressedSize);
+ dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
unCompressor.uncompress(compressedBuffer, dataBuffer);
+ MmapUtil.clean(compressedBuffer);
} else {
- dataBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
+ dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
channel.read(dataBuffer);
dataBuffer.flip();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
index 10b7bd6eb73..21f239af9f7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.utils;
import io.netty.util.internal.PlatformDependent;
+import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
public class MmapUtil {
@@ -32,4 +33,11 @@ public class MmapUtil {
}
PlatformDependent.freeDirectBuffer(mappedByteBuffer);
}
+
+ /** we do not need to clean heapByteBuffer manually, so we just leave it
alone. */
+ public static void clean(ByteBuffer byteBuffer) {
+ if (byteBuffer instanceof MappedByteBuffer) {
+ clean((MappedByteBuffer) byteBuffer);
+ }
+ }
}