This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fc2746e7a58 [IOTDB-5936] Pipe: correct the behaviour of the historical 
data collector in realtime only mode (#9987)
fc2746e7a58 is described below

commit fc2746e7a58ea7b0713406e0e6fb311694bbf918
Author: yschengzi <[email protected]>
AuthorDate: Wed May 31 02:57:58 2023 +0800

    [IOTDB-5936] Pipe: correct the behaviour of the historical data collector 
in realtime only mode (#9987)
    
    The historical data collector now also starts in the "realtime only" mode 
because it needs to collect data created after the pipe is created when a 
restart or a master switch event occurs. In the fixed logic, the historical 
data collector adds logic to compare the data generation event and the pipe 
creation event.
    
    Now, when the historical data collector is created, it seals all tsfiles in 
the data region to ensure that the generation time of the tsfile matches the 
generation time of the pipe.
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |   2 +-
 .../core/collector/IoTDBDataRegionCollector.java   |  22 +++--
 .../PipeHistoricalDataRegionTsFileCollector.java   | 103 +++++++++++++++++----
 .../org/apache/iotdb/db/pipe/task/PipeBuilder.java |  12 +--
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |  46 +++------
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |   4 +-
 6 files changed, 117 insertions(+), 72 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 10e8679ac83..d0596127340 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -510,7 +510,7 @@ public class PipeTaskAgent {
       PipeTaskMeta pipeTaskMeta) {
     if (pipeTaskMeta.getLeaderDataNodeId() == CONFIG.getDataNodeId()) {
       final PipeTask pipeTask =
-          new PipeTaskBuilder(consensusGroupId, pipeTaskMeta, 
pipeStaticMeta).build();
+          new PipeTaskBuilder(pipeStaticMeta, consensusGroupId, 
pipeTaskMeta).build();
       pipeTask.create();
       pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index b26c880b028..37a9c5101a9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import 
org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionCollector;
-import 
org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionFakeCollector;
 import 
org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
 import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionFakeCollector;
@@ -58,8 +57,9 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
 
   private final AtomicBoolean hasBeenStarted;
 
-  private final ListenableUnboundedBlockingPendingQueue<Event> 
collectorPendingQueue;
   private final PipeTaskMeta pipeTaskMeta;
+  private final long creationTime;
+  private final ListenableUnboundedBlockingPendingQueue<Event> 
collectorPendingQueue;
 
   // TODO: support pattern in historical collector
   private PipeHistoricalDataRegionCollector historicalCollector;
@@ -69,15 +69,13 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
 
   public IoTDBDataRegionCollector(
       PipeTaskMeta pipeTaskMeta,
+      long creationTime,
       ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue) {
-    hasBeenStarted = new AtomicBoolean(false);
+    this.hasBeenStarted = new AtomicBoolean(false);
 
     this.pipeTaskMeta = pipeTaskMeta;
+    this.creationTime = creationTime;
     this.collectorPendingQueue = collectorPendingQueue;
-
-    historicalCollector = new 
PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta);
-    realtimeCollector =
-        new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, 
collectorPendingQueue);
   }
 
   @Override
@@ -119,8 +117,14 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
     // enable historical collector by default
     historicalCollector =
         parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true)
-            ? new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta)
-            : new PipeHistoricalDataRegionFakeCollector();
+            ? new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta, 
Long.MIN_VALUE)
+            // We define the realtime data as the data generated after the 
creation time
+            // of the pipe from user's perspective. But we still need to use
+            // PipeHistoricalDataRegionCollector to collect the realtime data 
generated between the
+            // creation time of the pipe and the time when the pipe starts, 
because those data
+            // can not be listened by PipeRealtimeDataRegionCollector, and 
should be collected by
+            // PipeHistoricalDataRegionCollector from implementation 
perspective.
+            : new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta, 
creationTime);
   }
 
   private void constructRealtimeCollector(PipeParameters parameters) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 90041ed1d59..c1157d96979 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -34,49 +34,98 @@ import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.event.Event;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 import java.time.ZoneId;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME;
+import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME;
+import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.DATA_REGION_KEY;
+
 public class PipeHistoricalDataRegionTsFileCollector extends 
PipeHistoricalDataRegionCollector {
 
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileCollector.class);
+
   private final PipeTaskMeta pipeTaskMeta;
   private final ProgressIndex startIndex;
 
   private int dataRegionId;
 
-  private long collectStartTime;
-  private long collectEndTime;
+  private final long historicalDataCollectionTimeLowerBound;
+  private long historicalDataCollectionStartTime;
+  private long historicalDataCollectionEndTime;
 
   private Queue<PipeTsFileInsertionEvent> pendingQueue;
 
-  public PipeHistoricalDataRegionTsFileCollector(PipeTaskMeta pipeTaskMeta) {
+  public PipeHistoricalDataRegionTsFileCollector(
+      PipeTaskMeta pipeTaskMeta, long historicalDataCollectionTimeLowerBound) {
     this.pipeTaskMeta = pipeTaskMeta;
     this.startIndex = pipeTaskMeta.getProgressIndex();
+
+    this.historicalDataCollectionTimeLowerBound = 
historicalDataCollectionTimeLowerBound;
   }
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
-    validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
+    validator.validateRequiredAttribute(DATA_REGION_KEY);
   }
 
   @Override
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration 
configuration) {
-    dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
-    collectStartTime =
-        
parameters.hasAttribute(PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME)
+    dataRegionId = parameters.getInt(DATA_REGION_KEY);
+    historicalDataCollectionStartTime =
+        parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME)
             ? DateTimeUtils.convertDatetimeStrToLong(
-                
parameters.getString(PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME),
-                ZoneId.systemDefault())
+                parameters.getString(COLLECTOR_HISTORY_START_TIME), 
ZoneId.systemDefault())
             : Long.MIN_VALUE;
-    collectEndTime =
-        
parameters.hasAttribute(PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME)
+    historicalDataCollectionEndTime =
+        parameters.hasAttribute(COLLECTOR_HISTORY_END_TIME)
             ? DateTimeUtils.convertDatetimeStrToLong(
-                
parameters.getString(PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME),
-                ZoneId.systemDefault())
+                parameters.getString(COLLECTOR_HISTORY_END_TIME), 
ZoneId.systemDefault())
             : Long.MAX_VALUE;
+
+    // Only invoke flushDataRegionAllTsFiles() when the pipe runs in the 
realtime only mode.
+    // realtime only mode -> (historicalDataCollectionTimeLowerBound != 
Long.MIN_VALUE)
+    //
+    // Ensure that all data in the data region is flushed to disk before 
collecting data.
+    // This ensures the generation time of all newly generated TsFiles 
(realtime data) after the
+    // invocation of flushDataRegionAllTsFiles() is later than the 
creationTime of the pipe
+    // (historicalDataCollectionTimeLowerBound).
+    //
+    // Note that: the generation time of the TsFile is the time when the 
TsFile is created, not
+    // the time when the data is flushed to the TsFile.
+    //
+    // Then we can use the generation time of the TsFile to determine whether 
the data in the
+    // TsFile should be collected by comparing the generation time of the 
TsFile with the
+    // historicalDataCollectionTimeLowerBound when starting the pipe in 
realtime only mode.
+    //
+    // If we don't invoke flushDataRegionAllTsFiles() in the realtime only 
mode, the data generated
+    // between the creation time of the pipe the time when the pipe starts 
will be lost.
+    if (historicalDataCollectionTimeLowerBound != Long.MIN_VALUE) {
+      flushDataRegionAllTsFiles();
+    }
+  }
+
+  private void flushDataRegionAllTsFiles() {
+    final DataRegion dataRegion =
+        StorageEngine.getInstance().getDataRegion(new 
DataRegionId(dataRegionId));
+    if (dataRegion == null) {
+      return;
+    }
+
+    dataRegion.writeLock("Pipe: create historical TsFile collector");
+    try {
+      dataRegion.syncCloseAllWorkingTsFileProcessors();
+    } finally {
+      dataRegion.writeUnlock();
+    }
   }
 
   @Override
@@ -88,7 +137,7 @@ public class PipeHistoricalDataRegionTsFileCollector extends 
PipeHistoricalDataR
       return;
     }
 
-    dataRegion.writeLock("Pipe: collect historical TsFile");
+    dataRegion.writeLock("Pipe: start to collect historical TsFile");
     try {
       dataRegion.syncCloseAllWorkingTsFileProcessors();
 
@@ -101,7 +150,8 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
                 .filter(
                     resource ->
                         
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
-                            && 
isTsFileResourceOverlappedWithTimeRange(resource))
+                            && 
isTsFileResourceOverlappedWithTimeRange(resource)
+                            && 
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
                 .map(resource -> new PipeTsFileInsertionEvent(resource, 
pipeTaskMeta))
                 .collect(Collectors.toList()));
         pendingQueue.addAll(
@@ -109,7 +159,8 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
                 .filter(
                     resource ->
                         
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
-                            && 
isTsFileResourceOverlappedWithTimeRange(resource))
+                            && 
isTsFileResourceOverlappedWithTimeRange(resource)
+                            && 
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
                 .map(resource -> new PipeTsFileInsertionEvent(resource, 
pipeTaskMeta))
                 .collect(Collectors.toList()));
         pendingQueue.forEach(
@@ -125,8 +176,22 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
   }
 
   private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource 
resource) {
-    return !(resource.getFileEndTime() < collectStartTime
-        || collectEndTime < resource.getFileStartTime());
+    return !(resource.getFileEndTime() < historicalDataCollectionStartTime
+        || historicalDataCollectionEndTime < resource.getFileStartTime());
+  }
+
+  private boolean 
isTsFileGeneratedAfterCollectionTimeLowerBound(TsFileResource resource) {
+    try {
+      return historicalDataCollectionTimeLowerBound
+          <= 
TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
+    } catch (IOException e) {
+      LOGGER.warn(
+          String.format("failed to get the generation time of TsFile %s", 
resource.getTsFilePath()),
+          e);
+      // If failed to get the generation time of the TsFile, we will collect 
the data in the TsFile
+      // anyway.
+      return true;
+    }
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
index 481eb0e5ad7..3bd469907f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -42,10 +41,6 @@ public class PipeBuilder {
 
   public Map<TConsensusGroupId, PipeTask> build() {
     final PipeStaticMeta pipeStaticMeta = pipeMeta.getStaticMeta();
-    final String pipeName = pipeStaticMeta.getPipeName();
-    final PipeParameters collectorParameters = 
pipeStaticMeta.getCollectorParameters();
-    final PipeParameters processorParameters = 
pipeStaticMeta.getProcessorParameters();
-    final PipeParameters connectorParameters = 
pipeStaticMeta.getConnectorParameters();
 
     final Map<TConsensusGroupId, PipeTask> consensusGroupIdToPipeTaskMap = new 
HashMap<>();
 
@@ -57,12 +52,9 @@ public class PipeBuilder {
         consensusGroupIdToPipeTaskMap.put(
             consensusGroupIdToPipeTaskMeta.getKey(),
             new PipeTaskBuilder(
-                    pipeName,
+                    pipeStaticMeta,
                     consensusGroupIdToPipeTaskMeta.getKey(),
-                    consensusGroupIdToPipeTaskMeta.getValue(),
-                    collectorParameters,
-                    processorParameters,
-                    connectorParameters)
+                    consensusGroupIdToPipeTaskMeta.getValue())
                 .build());
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 274b50e1a98..1886f489c41 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -25,41 +25,18 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskCollectorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 
 public class PipeTaskBuilder {
 
-  private final String pipeName;
+  private final PipeStaticMeta pipeStaticMeta;
   private final TConsensusGroupId dataRegionId;
   private final PipeTaskMeta pipeTaskMeta;
-  private final PipeParameters pipeCollectorParameters;
-  private final PipeParameters pipeProcessorParameters;
-  private final PipeParameters pipeConnectorParameters;
 
-  PipeTaskBuilder(
-      String pipeName,
-      TConsensusGroupId dataRegionId,
-      PipeTaskMeta pipeTaskMeta,
-      PipeParameters pipeCollectorParameters,
-      PipeParameters pipeProcessorParameters,
-      PipeParameters pipeConnectorParameters) {
-    this.pipeName = pipeName;
+  public PipeTaskBuilder(
+      PipeStaticMeta pipeStaticMeta, TConsensusGroupId dataRegionId, 
PipeTaskMeta pipeTaskMeta) {
+    this.pipeStaticMeta = pipeStaticMeta;
     this.dataRegionId = dataRegionId;
     this.pipeTaskMeta = pipeTaskMeta;
-    this.pipeCollectorParameters = pipeCollectorParameters;
-    this.pipeProcessorParameters = pipeProcessorParameters;
-    this.pipeConnectorParameters = pipeConnectorParameters;
-  }
-
-  public PipeTaskBuilder(
-      TConsensusGroupId dataRegionId, PipeTaskMeta pipeTaskMeta, 
PipeStaticMeta pipeStaticMeta) {
-    this(
-        pipeStaticMeta.getPipeName(),
-        dataRegionId,
-        pipeTaskMeta,
-        pipeStaticMeta.getCollectorParameters(),
-        pipeStaticMeta.getProcessorParameters(),
-        pipeStaticMeta.getConnectorParameters());
   }
 
   public PipeTask build() {
@@ -67,21 +44,26 @@ public class PipeTaskBuilder {
 
     // we first build the collector and connector, then build the processor.
     final PipeTaskCollectorStage collectorStage =
-        new PipeTaskCollectorStage(dataRegionId, pipeTaskMeta, 
pipeCollectorParameters);
+        new PipeTaskCollectorStage(
+            dataRegionId,
+            pipeTaskMeta,
+            pipeStaticMeta.getCreationTime(),
+            pipeStaticMeta.getCollectorParameters());
     final PipeTaskConnectorStage connectorStage =
-        new PipeTaskConnectorStage(pipeConnectorParameters, pipeTaskMeta);
+        new PipeTaskConnectorStage(pipeStaticMeta.getConnectorParameters(), 
pipeTaskMeta);
 
     // the processor connects the collector and connector.
     final PipeTaskProcessorStage processorStage =
         new PipeTaskProcessorStage(
-            pipeName,
+            pipeStaticMeta.getPipeName(),
             dataRegionId,
             pipeTaskMeta,
             collectorStage.getEventSupplier(),
             collectorStage.getCollectorPendingQueue(),
-            pipeProcessorParameters,
+            pipeStaticMeta.getProcessorParameters(),
             connectorStage.getPipeConnectorPendingQueue());
 
-    return new PipeTask(pipeName, dataRegionId, collectorStage, 
processorStage, connectorStage);
+    return new PipeTask(
+        pipeStaticMeta.getPipeName(), dataRegionId, collectorStage, 
processorStage, connectorStage);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 8146a3a3f28..60488c6d253 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -58,6 +58,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
   public PipeTaskCollectorStage(
       TConsensusGroupId dataRegionId,
       PipeTaskMeta pipeTaskMeta,
+      long creationTime,
       PipeParameters collectorParameters) {
     // TODO: avoid if-else, use reflection to create collector all the time
     if (collectorParameters
@@ -77,7 +78,8 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
           .put(PipeCollectorConstant.DATA_REGION_KEY, 
String.valueOf(dataRegionId.getId()));
 
       collectorPendingQueue = new ListenableUnboundedBlockingPendingQueue<>();
-      this.pipeCollector = new IoTDBDataRegionCollector(pipeTaskMeta, 
collectorPendingQueue);
+      this.pipeCollector =
+          new IoTDBDataRegionCollector(pipeTaskMeta, creationTime, 
collectorPendingQueue);
     } else {
       this.collectorParameters = collectorParameters;
 

Reply via email to