This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5d5e0478bc1 Pipe: Set WAL to uncompressed when using real-time sync
(#15733)
5d5e0478bc1 is described below
commit 5d5e0478bc1fc02309d4793007302fd0f80af5e7
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 17 15:22:11 2025 +0800
Pipe: Set WAL to uncompressed when using real-time sync (#15733)
---
.../dataregion/IoTDBDataRegionExtractor.java | 25 ++++++++++++++++------
1 file changed, 19 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index 8396cf2c581..7c85881ff93 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -53,6 +53,7 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -468,7 +469,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
// Use hybrid mode by default
if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY,
SOURCE_MODE_STREAMING_KEY)
&& !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
- checkWalEnable(parameters);
+ checkWalEnableAndSetUncompressed(parameters);
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
LOGGER.info(
"Pipe: '{}' ('{}') and '{}' ('{}') is not set, use hybrid mode by
default.",
@@ -485,7 +486,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
Arrays.asList(EXTRACTOR_MODE_STREAMING_KEY,
SOURCE_MODE_STREAMING_KEY),
EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE);
if (isStreamingMode) {
- checkWalEnable(parameters);
+ checkWalEnableAndSetUncompressed(parameters);
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
} else {
realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
@@ -501,15 +502,15 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE:
- checkWalEnable(parameters);
+ checkWalEnableAndSetUncompressed(parameters);
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
break;
case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
- checkWalEnable(parameters);
+ checkWalEnableAndSetUncompressed(parameters);
realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
break;
default:
- checkWalEnable(parameters);
+ checkWalEnableAndSetUncompressed(parameters);
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(
@@ -519,7 +520,8 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
}
}
- private void checkWalEnable(final PipeParameters parameters) throws
IllegalPathException {
+ private void checkWalEnableAndSetUncompressed(final PipeParameters
parameters)
+ throws IllegalPathException {
if (Boolean.TRUE.equals(
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
.getLeft())
@@ -527,6 +529,17 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
throw new PipeException(
"The pipe cannot transfer realtime insertion if data region disables
wal. Please set 'realtime.mode'='batch' in source parameters when enabling
realtime transmission.");
}
+
+ if (!IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getWALCompressionAlgorithm()
+ .equals(CompressionType.UNCOMPRESSED)) {
+ LOGGER.info(
+ "The pipe prefers uncompressed wal, and may introduce certain delay
in realtime insert syncing without it. Hence, we change it to uncompressed.");
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setWALCompressionAlgorithm(CompressionType.UNCOMPRESSED);
+ }
}
@Override