This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a67b0c02b6e Pipe: fixed potential lose point bug caused by cancelled 
flush of historical extractor (#12056)
a67b0c02b6e is described below

commit a67b0c02b6edae0916498869ed33dc04cb1458b3
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 21 14:16:50 2024 +0800

    Pipe: fixed potential lose point bug caused by cancelled flush of 
historical extractor (#12056)
    
    The data loss situation can be caused by the following operations:
     1. PipeA: start historical data extraction with flush
     2. Data insertion
     3. PipeB: start realtime data extraction
     4. PipeB: start historical data extraction without flush
     5. Data inserted in the step2 is not captured by PipeB, and if its tsfile 
epoch's state is USING_TABLET, the tsfile event will be ignored, which will 
cause the data loss in the tsfile epoch.
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  4 ++++
 .../PipeRealtimeDataRegionHybridExtractor.java     | 22 +++++++++++++++++++++-
 .../PipeRealtimeDataRegionLogExtractor.java        | 12 +++++++++---
 .../pipe/extractor/realtime/epoch/TsFileEpoch.java | 14 ++++++++++++++
 .../realtime/epoch/TsFileEpochManager.java         |  5 ++++-
 5 files changed, 52 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 532ca5a1a7e..412e4143a03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -116,6 +116,10 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
     return isLoaded;
   }
 
+  public long getFileStartTime() {
+    return resource.getFileStartTime();
+  }
+
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 568c96b5263..63a4098cff9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
@@ -138,7 +139,26 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
                 case USING_TSFILE:
                   return TsFileEpoch.State.USING_TSFILE;
                 case USING_TABLET:
-                  return TsFileEpoch.State.USING_TABLET;
+                  if (((PipeTsFileInsertionEvent) 
event.getEvent()).getFileStartTime()
+                      < event.getTsFileEpoch().getInsertNodeMinTime()) {
+                    // Some insert nodes in the tsfile epoch are not captured 
by pipe, so we should
+                    // capture the tsfile event to make sure all data in the 
tsfile epoch can be
+                    // extracted.
+                    //
+                    // The situation can be caused by the following operations:
+                    //  1. PipeA: start historical data extraction with flush
+                    //  2. Data insertion
+                    //  3. PipeB: start realtime data extraction
+                    //  4. PipeB: start historical data extraction without 
flush
+                    //  5. Data inserted in the step2 is not captured by 
PipeB, and if its tsfile
+                    //     epoch's state is USING_TABLET, the tsfile event 
will be ignored, which
+                    //     will cause the data loss in the tsfile epoch.
+                    return TsFileEpoch.State.USING_BOTH;
+                  } else {
+                    // All data in the tsfile epoch has been extracted in 
tablet mode, so we should
+                    // simply keep the state of the tsfile epoch and discard 
the tsfile event.
+                    return TsFileEpoch.State.USING_TABLET;
+                  }
                 case USING_BOTH:
                 default:
                   return TsFileEpoch.State.USING_BOTH;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 2d857bcad9b..68f21ea283a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -75,8 +75,14 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
   }
 
   private void extractTsFileInsertion(PipeRealtimeEvent event) {
-    if (!((PipeTsFileInsertionEvent) event.getEvent()).getIsLoaded()) {
-      // only loaded tsfile can be extracted by this extractor. Ignore this 
event.
+    final PipeTsFileInsertionEvent tsFileInsertionEvent =
+        (PipeTsFileInsertionEvent) event.getEvent();
+    if (!(tsFileInsertionEvent.getIsLoaded()
+        // some insert nodes in the tsfile epoch are not captured by pipe
+        || tsFileInsertionEvent.getFileStartTime()
+            < event.getTsFileEpoch().getInsertNodeMinTime())) {
+      // All data in the tsfile epoch has been extracted in tablet mode, so we 
should
+      // simply ignore this event.
       
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(),
 false);
       return;
     }
@@ -134,7 +140,7 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
 
   @Override
   public boolean isNeedListenToTsFile() {
-    // Only listen to loaded tsFiles
+    // Only listen to tsFiles that can't be represented by insertNodes
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
index b0ec157f016..1ccc4e979dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class TsFileEpoch {
@@ -31,10 +32,12 @@ public class TsFileEpoch {
   private final String filePath;
   private final ConcurrentMap<PipeRealtimeDataRegionExtractor, 
AtomicReference<State>>
       dataRegionExtractor2State;
+  private final AtomicLong insertNodeMinTime;
 
   public TsFileEpoch(String filePath) {
     this.filePath = filePath;
     this.dataRegionExtractor2State = new ConcurrentHashMap<>();
+    this.insertNodeMinTime = new AtomicLong(Long.MAX_VALUE);
   }
 
   public TsFileEpoch.State getState(PipeRealtimeDataRegionExtractor extractor) 
{
@@ -57,6 +60,14 @@ public class TsFileEpoch {
                 .setRecentProcessedTsFileEpochState(extractor.getTaskID(), 
state.get()));
   }
 
+  public void updateInsertNodeMinTime(long newComingMinTime) {
+    insertNodeMinTime.updateAndGet(recordedMinTime -> 
Math.min(recordedMinTime, newComingMinTime));
+  }
+
+  public long getInsertNodeMinTime() {
+    return insertNodeMinTime.get();
+  }
+
   @Override
   public String toString() {
     return "TsFileEpoch{"
@@ -65,6 +76,9 @@ public class TsFileEpoch {
         + '\''
         + ", dataRegionExtractor2State="
         + dataRegionExtractor2State
+        + '\''
+        + ", insertNodeMinTime="
+        + insertNodeMinTime.get()
         + '}';
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
index fdd2ad85c62..a1ad2f49ec9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
@@ -69,9 +69,12 @@ public class TsFileEpochManager {
 
   public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent(
       PipeInsertNodeTabletInsertionEvent event, InsertNode node, 
TsFileResource resource) {
+    final TsFileEpoch epoch =
+        filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), 
TsFileEpoch::new);
+    epoch.updateInsertNodeMinTime(node.getMinTime());
     return new PipeRealtimeEvent(
         event,
-        filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), 
TsFileEpoch::new),
+        epoch,
         Collections.singletonMap(node.getDevicePath().getFullPath(), 
node.getMeasurements()),
         event.getPattern());
   }

Reply via email to