This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new a13af31aca9 [To dev/1.3] Pipe: prevent historical extractor from
forwarding TsFiles generated by pipe when isForwardingPipeRequests is false
(#15845) (#15864)
a13af31aca9 is described below
commit a13af31aca9387c8b3cdf58ac4fdbd7ab3608d3e
Author: nanxiang xia <[email protected]>
AuthorDate: Thu Jul 3 15:28:12 2025 +0800
[To dev/1.3] Pipe: prevent historical extractor from forwarding TsFiles
generated by pipe when isForwardingPipeRequests is false (#15845) (#15864)
---
.../PipeHistoricalDataRegionTsFileExtractor.java | 22 ++++++++++++++++++++--
1 file changed, 20 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index acec7573a2a..3ea98cf5bb3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -66,6 +66,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
@@ -79,6 +81,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
@@ -119,6 +122,8 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
private boolean isTerminateSignalSent = false;
+ private boolean isForwardingPipeRequests;
+
private volatile boolean hasBeenStarted = false;
private Queue<TsFileResource> pendingQueue;
@@ -330,9 +335,15 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
|| // Should extract deletion
listeningOptionPair.getRight());
+ isForwardingPipeRequests =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(
+ EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
+ EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
- "Pipe {}@{}: historical data extraction time range, start time
{}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should
transfer mod file {}",
+ "Pipe {}@{}: historical data extraction time range, start time
{}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should
transfer mod file {}, is forwarding pipe requests: {}",
pipeName,
dataRegionId,
DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
@@ -341,7 +352,8 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
historicalDataExtractionEndTime,
sloppyPattern,
sloppyTimeRange,
- shouldTransferModFile);
+ shouldTransferModFile,
+ isForwardingPipeRequests);
}
}
@@ -444,6 +456,9 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
resource ->
// Some resource is marked as deleted but not removed
from the list.
!resource.isDeleted()
+ // Some resource is generated by pipe. We ignore
them if the pipe should
+ // not transfer pipe requests.
+ && (!resource.isGeneratedByPipe() ||
isForwardingPipeRequests)
&& (
// Some resource may not be closed due to the
control of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore
them.
@@ -461,6 +476,9 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
resource ->
// Some resource is marked as deleted but not removed
from the list.
!resource.isDeleted()
+ // Some resource is generated by pipe. We ignore
them if the pipe should
+ // not transfer pipe requests.
+ && (!resource.isGeneratedByPipe() ||
isForwardingPipeRequests)
&& (
// Some resource may not be closed due to the
control of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore
them.