This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c7f61620fbbcc86e1c6cc80992ffa9643c8ab578 Author: Caideyipi <[email protected]> AuthorDate: Wed Jun 18 15:07:28 2025 +0800 Pipe: Set WAL to uncompressed when using real-time sync (#15733) (#15757) --- .../dataregion/IoTDBDataRegionExtractor.java | 23 +++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index b0ccea5cc43..808df12dfac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -47,6 +47,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -269,7 +270,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { // Use hybrid mode by default if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { - checkWalEnable(parameters); + checkWalEnableAndSetUncompressed(parameters); realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); LOGGER.info( "Pipe: '{}' is not set, use hybrid mode by default.", EXTRACTOR_REALTIME_MODE_KEY); @@ -284,15 +285,15 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE: case EXTRACTOR_REALTIME_MODE_LOG_VALUE: case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE: - checkWalEnable(parameters); + checkWalEnableAndSetUncompressed(parameters); realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); break; case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE: - checkWalEnable(parameters); + checkWalEnableAndSetUncompressed(parameters); realtimeExtractor = new PipeRealtimeDataRegionLogExtractor(); break; default: - checkWalEnable(parameters); + checkWalEnableAndSetUncompressed(parameters); realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); if (LOGGER.isWarnEnabled()) { LOGGER.warn( @@ -302,7 +303,8 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { } } - private void checkWalEnable(final PipeParameters parameters) throws IllegalPathException { + private void checkWalEnableAndSetUncompressed(final PipeParameters parameters) + throws IllegalPathException { if (Boolean.TRUE.equals( DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters) .getLeft()) @@ -310,6 +312,17 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { throw new PipeException( "The pipe cannot transfer realtime insertion if data region disables wal. Please set 'realtime.mode'='batch' in source parameters when enabling realtime transmission."); } + + if (!IoTDBDescriptor.getInstance() + .getConfig() + .getWALCompressionAlgorithm() + .equals(CompressionType.UNCOMPRESSED)) { + LOGGER.info( + "The pipe prefers uncompressed wal, and may introduce certain delay in realtime insert syncing without it. Hence, we change it to uncompressed."); + IoTDBDescriptor.getInstance() + .getConfig() + .setWALCompressionAlgorithm(CompressionType.UNCOMPRESSED); + } } @Override
