This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 587e52f177645f895de924a82da3d1d8b5bcebeb Author: nanxiang xia <[email protected]> AuthorDate: Thu Jul 3 15:28:12 2025 +0800 [To dev/1.3] Pipe: prevent historical extractor from forwarding TsFiles generated by pipe when isForwardingPipeRequests is false (#15845) (#15864) --- .../PipeHistoricalDataRegionTsFileExtractor.java | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index acec7573a2a..3ea98cf5bb3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -66,6 +66,8 @@ import java.util.Set; import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY; @@ -79,6 +81,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; @@ -119,6 +122,8 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa private boolean isTerminateSignalSent = false; + private boolean isForwardingPipeRequests; + private volatile boolean hasBeenStarted = false; private Queue<TsFileResource> pendingQueue; @@ -330,9 +335,15 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa || // Should extract deletion listeningOptionPair.getRight()); + isForwardingPipeRequests = + parameters.getBooleanOrDefault( + Arrays.asList( + EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, SOURCE_FORWARDING_PIPE_REQUESTS_KEY), + EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); + if (LOGGER.isInfoEnabled()) { LOGGER.info( - "Pipe {}@{}: historical data extraction time range, start time {}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should transfer mod file {}", + "Pipe {}@{}: historical data extraction time range, start time {}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should transfer mod file {}, is forwarding pipe requests: {}", pipeName, dataRegionId, DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime), @@ -341,7 +352,8 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa historicalDataExtractionEndTime, sloppyPattern, sloppyTimeRange, - shouldTransferModFile); + shouldTransferModFile, + isForwardingPipeRequests); } } @@ -444,6 +456,9 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa resource -> // Some resource is marked as deleted but not removed from the list. !resource.isDeleted() + // Some resource is generated by pipe. We ignore them if the pipe should + // not transfer pipe requests. + && (!resource.isGeneratedByPipe() || isForwardingPipeRequests) && ( // Some resource may not be closed due to the control of // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them. @@ -461,6 +476,9 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa resource -> // Some resource is marked as deleted but not removed from the list. !resource.isDeleted() + // Some resource is generated by pipe. We ignore them if the pipe should + // not transfer pipe requests. + && (!resource.isGeneratedByPipe() || isForwardingPipeRequests) && ( // Some resource may not be closed due to the control of // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them.
