This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 563e42ac868 Pipe: fix the issue where the pipe still forwards insert
events when the parameter source.mode.double-living is true. (#14839)
563e42ac868 is described below
commit 563e42ac8685a946c9861bbb3745a44baa2f53a3
Author: nanxiang xia <[email protected]>
AuthorDate: Fri Feb 14 18:03:32 2025 +0800
Pipe: fix the issue where the pipe still forwards insert events when the
parameter source.mode.double-living is true. (#14839)
---
.../realtime/PipeRealtimeDataRegionExtractor.java | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index 6957ccfdbc3..163f8ea7805 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -234,12 +234,22 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
?
TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime)
:
TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) - 1;
- isForwardingPipeRequests =
+ final boolean isDoubleLiving =
parameters.getBooleanOrDefault(
Arrays.asList(
- PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
- PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
-
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+ PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+ PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+ PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+ if (isDoubleLiving) {
+ isForwardingPipeRequests = false;
+ } else {
+ isForwardingPipeRequests =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+ PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
+
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+ }
if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) {
shouldTransferModFile =