This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c7f61620fbbcc86e1c6cc80992ffa9643c8ab578
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 18 15:07:28 2025 +0800

    Pipe: Set WAL to uncompressed when using real-time sync (#15733) (#15757)
---
 .../dataregion/IoTDBDataRegionExtractor.java       | 23 +++++++++++++++++-----
 1 file changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index b0ccea5cc43..808df12dfac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -47,6 +47,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -269,7 +270,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
 
     // Use hybrid mode by default
     if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
-      checkWalEnable(parameters);
+      checkWalEnableAndSetUncompressed(parameters);
       realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
       LOGGER.info(
           "Pipe: '{}' is not set, use hybrid mode by default.", 
EXTRACTOR_REALTIME_MODE_KEY);
@@ -284,15 +285,15 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
       case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
       case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
       case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE:
-        checkWalEnable(parameters);
+        checkWalEnableAndSetUncompressed(parameters);
         realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
         break;
       case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
-        checkWalEnable(parameters);
+        checkWalEnableAndSetUncompressed(parameters);
         realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
         break;
       default:
-        checkWalEnable(parameters);
+        checkWalEnableAndSetUncompressed(parameters);
         realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
         if (LOGGER.isWarnEnabled()) {
           LOGGER.warn(
@@ -302,7 +303,8 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     }
   }
 
-  private void checkWalEnable(final PipeParameters parameters) throws 
IllegalPathException {
+  private void checkWalEnableAndSetUncompressed(final PipeParameters 
parameters)
+      throws IllegalPathException {
     if (Boolean.TRUE.equals(
             
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
                 .getLeft())
@@ -310,6 +312,17 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
       throw new PipeException(
           "The pipe cannot transfer realtime insertion if data region disables 
wal. Please set 'realtime.mode'='batch' in source parameters when enabling 
realtime transmission.");
     }
+
+    if (!IoTDBDescriptor.getInstance()
+        .getConfig()
+        .getWALCompressionAlgorithm()
+        .equals(CompressionType.UNCOMPRESSED)) {
+      LOGGER.info(
+          "The pipe prefers uncompressed wal, and may introduce certain delay 
in realtime insert syncing without it. Hence, we change it to uncompressed.");
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setWALCompressionAlgorithm(CompressionType.UNCOMPRESSED);
+    }
   }
 
   @Override

Reply via email to