This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d5e0478bc1 Pipe: Set WAL to uncompressed when using real-time sync 
(#15733)
5d5e0478bc1 is described below

commit 5d5e0478bc1fc02309d4793007302fd0f80af5e7
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 17 15:22:11 2025 +0800

    Pipe: Set WAL to uncompressed when using real-time sync (#15733)
---
 .../dataregion/IoTDBDataRegionExtractor.java       | 25 ++++++++++++++++------
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index 8396cf2c581..7c85881ff93 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -53,6 +53,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
+import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -468,7 +469,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     // Use hybrid mode by default
     if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, 
SOURCE_MODE_STREAMING_KEY)
         && !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
-      checkWalEnable(parameters);
+      checkWalEnableAndSetUncompressed(parameters);
       realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
       LOGGER.info(
           "Pipe: '{}' ('{}') and '{}' ('{}') is not set, use hybrid mode by 
default.",
@@ -485,7 +486,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
               Arrays.asList(EXTRACTOR_MODE_STREAMING_KEY, 
SOURCE_MODE_STREAMING_KEY),
               EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE);
       if (isStreamingMode) {
-        checkWalEnable(parameters);
+        checkWalEnableAndSetUncompressed(parameters);
         realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
       } else {
         realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
@@ -501,15 +502,15 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
       case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
       case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
       case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE:
-        checkWalEnable(parameters);
+        checkWalEnableAndSetUncompressed(parameters);
         realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
         break;
       case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
-        checkWalEnable(parameters);
+        checkWalEnableAndSetUncompressed(parameters);
         realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
         break;
       default:
-        checkWalEnable(parameters);
+        checkWalEnableAndSetUncompressed(parameters);
         realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
         if (LOGGER.isWarnEnabled()) {
           LOGGER.warn(
@@ -519,7 +520,8 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     }
   }
 
-  private void checkWalEnable(final PipeParameters parameters) throws 
IllegalPathException {
+  private void checkWalEnableAndSetUncompressed(final PipeParameters 
parameters)
+      throws IllegalPathException {
     if (Boolean.TRUE.equals(
             
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
                 .getLeft())
@@ -527,6 +529,17 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
       throw new PipeException(
           "The pipe cannot transfer realtime insertion if data region disables 
wal. Please set 'realtime.mode'='batch' in source parameters when enabling 
realtime transmission.");
     }
+
+    if (!IoTDBDescriptor.getInstance()
+        .getConfig()
+        .getWALCompressionAlgorithm()
+        .equals(CompressionType.UNCOMPRESSED)) {
+      LOGGER.info(
+          "The pipe prefers uncompressed wal, and may introduce certain delay 
in realtime insert syncing without it. Hence, we change it to uncompressed.");
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setWALCompressionAlgorithm(CompressionType.UNCOMPRESSED);
+    }
   }
 
   @Override

Reply via email to