This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new a13af31aca9 [To dev/1.3] Pipe: prevent historical extractor from 
forwarding TsFiles generated by pipe when isForwardingPipeRequests is false 
(#15845) (#15864)
a13af31aca9 is described below

commit a13af31aca9387c8b3cdf58ac4fdbd7ab3608d3e
Author: nanxiang xia <[email protected]>
AuthorDate: Thu Jul 3 15:28:12 2025 +0800

    [To dev/1.3] Pipe: prevent historical extractor from forwarding TsFiles 
generated by pipe when isForwardingPipeRequests is false (#15845) (#15864)
---
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 22 ++++++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index acec7573a2a..3ea98cf5bb3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -66,6 +66,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
@@ -79,6 +81,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
@@ -119,6 +122,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
   private boolean isTerminateSignalSent = false;
 
+  private boolean isForwardingPipeRequests;
+
   private volatile boolean hasBeenStarted = false;
 
   private Queue<TsFileResource> pendingQueue;
@@ -330,9 +335,15 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                 || // Should extract deletion
                 listeningOptionPair.getRight());
 
+    isForwardingPipeRequests =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(
+                EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, 
SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
+            EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+
     if (LOGGER.isInfoEnabled()) {
       LOGGER.info(
-          "Pipe {}@{}: historical data extraction time range, start time 
{}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should 
transfer mod file {}",
+          "Pipe {}@{}: historical data extraction time range, start time 
{}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should 
transfer mod file {}, is forwarding pipe requests: {}",
           pipeName,
           dataRegionId,
           DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
@@ -341,7 +352,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
           historicalDataExtractionEndTime,
           sloppyPattern,
           sloppyTimeRange,
-          shouldTransferModFile);
+          shouldTransferModFile,
+          isForwardingPipeRequests);
     }
   }
 
@@ -444,6 +456,9 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                     resource ->
                         // Some resource is marked as deleted but not removed 
from the list.
                         !resource.isDeleted()
+                            // Some resource is generated by pipe. We ignore 
them if the pipe should
+                            // not transfer pipe requests.
+                            && (!resource.isGeneratedByPipe() || 
isForwardingPipeRequests)
                             && (
                             // Some resource may not be closed due to the 
control of
                             // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore 
them.
@@ -461,6 +476,9 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                     resource ->
                         // Some resource is marked as deleted but not removed 
from the list.
                         !resource.isDeleted()
+                            // Some resource is generated by pipe. We ignore 
them if the pipe should
+                            // not transfer pipe requests.
+                            && (!resource.isGeneratedByPipe() || 
isForwardingPipeRequests)
                             && (
                             // Some resource may not be closed due to the 
control of
                             // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore 
them.

Reply via email to