This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rename-collector-to-extractor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6ba8424e5453a272053dafa1e247fbd757208b17
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Jun 24 01:16:29 2023 +0800

    pipe rename: collector -> extractor
---
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  | 20 ++++-----
 .../event/realtime/PipeRealtimeEventFactory.java   |  4 +-
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 48 +++++++++++-----------
 .../PipeRealtimeDataRegionHybridExtractor.java     | 36 ++++++++--------
 .../PipeRealtimeDataRegionLogExtractor.java        | 17 ++++----
 .../PipeRealtimeDataRegionTsFileExtractor.java     | 16 ++++----
 .../listener/PipeInsertionDataNodeListener.java    | 12 +++---
 .../pipe/task/connection/PipeEventCollector.java   |  4 +-
 .../db/pipe/task/stage/PipeTaskExtractorStage.java |  2 +-
 9 files changed, 80 insertions(+), 79 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index e29e979618a..5db7b20bde0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.pipe.api.event.Event;
 import java.util.Map;
 
 /**
- * PipeRealtimeCollectEvent is an event that decorates the EnrichedEvent with 
the information of
+ * PipeRealtimeEvent is an event that decorates the EnrichedEvent with the 
information of
  * TsFileEpoch and schema info. It only exists in the realtime event extractor.
  */
 public class PipeRealtimeEvent extends EnrichedEvent {
@@ -43,7 +43,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
       TsFileEpoch tsFileEpoch,
       Map<String, String[]> device2Measurements,
       String pattern) {
-    // pipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeCollectEvent
+    // pipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeEvent
     // is only used in the realtime event extractor, which does not need to 
report the progress
     // of the event, so the pipeTaskMeta is always null.
     super(null, pattern);
@@ -59,7 +59,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
       Map<String, String[]> device2Measurements,
       PipeTaskMeta pipeTaskMeta,
       String pattern) {
-    // pipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeCollectEvent
+    // pipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeEvent
     // is only used in the realtime event extractor, which does not need to 
report the progress
     // of the event, so the pipeTaskMeta is always null.
     super(pipeTaskMeta, pattern);
@@ -87,10 +87,10 @@ public class PipeRealtimeEvent extends EnrichedEvent {
 
   @Override
   public boolean increaseReferenceCount(String holderMessage) {
-    // This method must be overridden, otherwise during the real-time data 
collection stage, the
-    // current PipeRealtimeCollectEvent rather than the member variable 
EnrichedEvent will increase
+    // This method must be overridden, otherwise during the real-time data 
extraction stage, the
+    // current PipeRealtimeEvent rather than the member variable EnrichedEvent 
will increase
     // the reference count, resulting in errors in the reference count of the 
EnrichedEvent
-    // contained in this PipeRealtimeCollectEvent during the processor and 
connector stages.
+    // contained in this PipeRealtimeEvent during the processor and connector 
stages.
     return event.increaseReferenceCount(holderMessage);
   }
 
@@ -101,10 +101,10 @@ public class PipeRealtimeEvent extends EnrichedEvent {
 
   @Override
   public boolean decreaseReferenceCount(String holderMessage) {
-    // This method must be overridden, otherwise during the real-time data 
collection stage, the
-    // current PipeRealtimeCollectEvent rather than the member variable 
EnrichedEvent will increase
+    // This method must be overridden, otherwise during the real-time data 
extraction stage, the
+    // current PipeRealtimeEvent rather than the member variable EnrichedEvent 
will increase
     // the reference count, resulting in errors in the reference count of the 
EnrichedEvent
-    // contained in this PipeRealtimeCollectEvent during the processor and 
connector stages.
+    // contained in this PipeRealtimeEvent during the processor and connector 
stages.
     return event.decreaseReferenceCount(holderMessage);
   }
 
@@ -131,7 +131,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
 
   @Override
   public String toString() {
-    return "PipeRealtimeCollectEvent{"
+    return "PipeRealtimeEvent{"
         + "event="
         + event
         + ", tsFileEpoch="
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 247a8b17daf..6562ae8d9c0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -30,12 +30,12 @@ public class PipeRealtimeEventFactory {
 
   private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new 
TsFileEpochManager();
 
-  public static PipeRealtimeEvent createCollectEvent(TsFileResource resource) {
+  public static PipeRealtimeEvent createRealtimeEvent(TsFileResource resource) 
{
     return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
         new PipeTsFileInsertionEvent(resource), resource);
   }
 
-  public static PipeRealtimeEvent createCollectEvent(
+  public static PipeRealtimeEvent createRealtimeEvent(
       WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource 
resource) {
     return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
         new PipeInsertNodeTabletInsertionEvent(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index c59ba2a0ec3..f846e6734b7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -62,10 +62,10 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
   private String pattern;
 
-  private long historicalDataCollectionStartTime; // event time
-  private long historicalDataCollectionEndTime; // event time
+  private long historicalDataExtractionStartTime; // event time
+  private long historicalDataExtractionEndTime; // event time
 
-  private long historicalDataCollectionTimeLowerBound; // arrival time
+  private long historicalDataExtractionTimeLowerBound; // arrival time
 
   private Queue<PipeTsFileInsertionEvent> pendingQueue;
 
@@ -88,50 +88,50 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY, 
EXTRACTOR_PATTERN_DEFAULT_VALUE);
 
     // user may set the EXTRACTOR_HISTORY_START_TIME and 
EXTRACTOR_HISTORY_END_TIME without
-    // enabling the historical data collection, which may affect the realtime 
data collection.
+    // enabling the historical data extraction, which may affect the realtime 
data extraction.
     final boolean isHistoricalExtractorEnabledByUser =
         parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true);
-    historicalDataCollectionStartTime =
+    historicalDataExtractionStartTime =
         isHistoricalExtractorEnabledByUser && 
parameters.hasAttribute(EXTRACTOR_HISTORY_START_TIME)
             ? DateTimeUtils.convertDatetimeStrToLong(
                 parameters.getString(EXTRACTOR_HISTORY_START_TIME), 
ZoneId.systemDefault())
             : Long.MIN_VALUE;
-    historicalDataCollectionEndTime =
+    historicalDataExtractionEndTime =
         isHistoricalExtractorEnabledByUser && 
parameters.hasAttribute(EXTRACTOR_HISTORY_END_TIME)
             ? DateTimeUtils.convertDatetimeStrToLong(
                 parameters.getString(EXTRACTOR_HISTORY_END_TIME), 
ZoneId.systemDefault())
             : Long.MAX_VALUE;
 
     // enable historical extractor by default
-    historicalDataCollectionTimeLowerBound =
+    historicalDataExtractionTimeLowerBound =
         parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true)
             ? Long.MIN_VALUE
             // We define the realtime data as the data generated after the 
creation time
             // of the pipe from user's perspective. But we still need to use
             // PipeHistoricalDataRegionExtractor to extract the realtime data 
generated between the
             // creation time of the pipe and the time when the pipe starts, 
because those data
-            // can not be listened by PipeRealtimeDataRegionExtractor, and 
should be collected by
+            // can not be listened by PipeRealtimeDataRegionExtractor, and 
should be extracted by
             // PipeHistoricalDataRegionExtractor from implementation 
perspective.
             : environment.getCreationTime();
 
     // Only invoke flushDataRegionAllTsFiles() when the pipe runs in the 
realtime only mode.
-    // realtime only mode -> (historicalDataCollectionTimeLowerBound != 
Long.MIN_VALUE)
+    // realtime only mode -> (historicalDataExtractionTimeLowerBound != 
Long.MIN_VALUE)
     //
-    // Ensure that all data in the data region is flushed to disk before 
collecting data.
+    // Ensure that all data in the data region is flushed to disk before 
extracting data.
     // This ensures the generation time of all newly generated TsFiles 
(realtime data) after the
     // invocation of flushDataRegionAllTsFiles() is later than the 
creationTime of the pipe
-    // (historicalDataCollectionTimeLowerBound).
+    // (historicalDataExtractionTimeLowerBound).
     //
     // Note that: the generation time of the TsFile is the time when the 
TsFile is created, not
     // the time when the data is flushed to the TsFile.
     //
     // Then we can use the generation time of the TsFile to determine whether 
the data in the
-    // TsFile should be collected by comparing the generation time of the 
TsFile with the
-    // historicalDataCollectionTimeLowerBound when starting the pipe in 
realtime only mode.
+    // TsFile should be extracted by comparing the generation time of the 
TsFile with the
+    // historicalDataExtractionTimeLowerBound when starting the pipe in 
realtime only mode.
     //
     // If we don't invoke flushDataRegionAllTsFiles() in the realtime only 
mode, the data generated
     // between the creation time of the pipe the time when the pipe starts 
will be lost.
-    if (historicalDataCollectionTimeLowerBound != Long.MIN_VALUE) {
+    if (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) {
       flushDataRegionAllTsFiles();
     }
   }
@@ -174,15 +174,15 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                     resource ->
                         
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
-                            && 
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
+                            && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
                 .map(
                     resource ->
                         new PipeTsFileInsertionEvent(
                             resource,
                             pipeTaskMeta,
                             pattern,
-                            historicalDataCollectionStartTime,
-                            historicalDataCollectionEndTime))
+                            historicalDataExtractionStartTime,
+                            historicalDataExtractionEndTime))
                 .collect(Collectors.toList()));
         pendingQueue.addAll(
             tsFileManager.getTsFileList(false).stream()
@@ -190,15 +190,15 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                     resource ->
                         
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
-                            && 
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
+                            && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
                 .map(
                     resource ->
                         new PipeTsFileInsertionEvent(
                             resource,
                             pipeTaskMeta,
                             pattern,
-                            historicalDataCollectionStartTime,
-                            historicalDataCollectionEndTime))
+                            historicalDataExtractionStartTime,
+                            historicalDataExtractionEndTime))
                 .collect(Collectors.toList()));
         pendingQueue.forEach(
             event ->
@@ -213,13 +213,13 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   }
 
   private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource 
resource) {
-    return !(resource.getFileEndTime() < historicalDataCollectionStartTime
-        || historicalDataCollectionEndTime < resource.getFileStartTime());
+    return !(resource.getFileEndTime() < historicalDataExtractionStartTime
+        || historicalDataExtractionEndTime < resource.getFileStartTime());
   }
 
-  private boolean 
isTsFileGeneratedAfterCollectionTimeLowerBound(TsFileResource resource) {
+  private boolean 
isTsFileGeneratedAfterExtractionTimeLowerBound(TsFileResource resource) {
     try {
-      return historicalDataCollectionTimeLowerBound
+      return historicalDataExtractionTimeLowerBound
           <= 
TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
     } catch (IOException e) {
       LOGGER.warn(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 06a0135a4ee..b143fd09f5d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -37,7 +37,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
 
-  // This queue is used to store pending events collected by the method 
extract(). The method
+  // This queue is used to store pending events extracted by the method 
extract(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
@@ -47,17 +47,17 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
 
   @Override
   public void extract(PipeRealtimeEvent event) {
-    final Event eventToCollect = event.getEvent();
+    final Event eventToExtract = event.getEvent();
 
-    if (eventToCollect instanceof TabletInsertionEvent) {
-      collectTabletInsertion(event);
-    } else if (eventToCollect instanceof TsFileInsertionEvent) {
-      collectTsFileInsertion(event);
+    if (eventToExtract instanceof TabletInsertionEvent) {
+      extractTabletInsertion(event);
+    } else if (eventToExtract instanceof TsFileInsertionEvent) {
+      extractTsFileInsertion(event);
     } else {
       throw new UnsupportedOperationException(
           String.format(
               "Unsupported event type %s for hybrid realtime extractor %s",
-              eventToCollect.getClass(), this));
+              eventToExtract.getClass(), this));
     }
   }
 
@@ -71,7 +71,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     return true;
   }
 
-  private void collectTabletInsertion(PipeRealtimeEvent event) {
+  private void extractTabletInsertion(PipeRealtimeEvent event) {
     if (isApproachingCapacity()) {
       event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
       // if the pending queue is approaching capacity, we should not extract 
any more tablet events.
@@ -83,7 +83,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     if 
(!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)
         && !pendingQueue.offer(event)) {
       LOGGER.warn(
-          "collectTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor {} has reached capacity, discard tablet 
event {}, current state {}",
+          "extractTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor {} has reached capacity, discard tablet 
event {}, current state {}",
           this,
           event,
           event.getTsFileEpoch().getState(this));
@@ -92,7 +92,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     }
   }
 
-  private void collectTsFileInsertion(PipeRealtimeEvent event) {
+  private void extractTsFileInsertion(PipeRealtimeEvent event) {
     event
         .getTsFileEpoch()
         .migrateState(
@@ -102,7 +102,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
 
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
-          "collectTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor {} has reached capacity, discard TsFile 
event {}, current state {}",
+          "extractTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor {} has reached capacity, discard TsFile 
event {}, current state {}",
           this,
           event,
           event.getTsFileEpoch().getState(this));
@@ -118,17 +118,17 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
 
   @Override
   public Event supply() {
-    PipeRealtimeEvent collectEvent = (PipeRealtimeEvent) pendingQueue.poll();
+    PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) pendingQueue.poll();
 
-    while (collectEvent != null) {
+    while (realtimeEvent != null) {
       Event suppliedEvent;
 
       // used to judge type of event, not directly for supplying.
-      final Event eventToSupply = collectEvent.getEvent();
+      final Event eventToSupply = realtimeEvent.getEvent();
       if (eventToSupply instanceof TabletInsertionEvent) {
-        suppliedEvent = supplyTabletInsertion(collectEvent);
+        suppliedEvent = supplyTabletInsertion(realtimeEvent);
       } else if (eventToSupply instanceof TsFileInsertionEvent) {
-        suppliedEvent = supplyTsFileInsertion(collectEvent);
+        suppliedEvent = supplyTsFileInsertion(realtimeEvent);
       } else {
         throw new UnsupportedOperationException(
             String.format(
@@ -136,12 +136,12 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
                 eventToSupply.getClass(), this));
       }
 
-      
collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+      
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
       if (suppliedEvent != null) {
         return suppliedEvent;
       }
 
-      collectEvent = (PipeRealtimeEvent) pendingQueue.poll();
+      realtimeEvent = (PipeRealtimeEvent) pendingQueue.poll();
     }
 
     // means the pending queue is empty.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 641150826ee..320c865d96d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -35,7 +35,7 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionLogExtractor.class);
 
-  // This queue is used to store pending events collected by the method 
extract(). The method
+  // This queue is used to store pending events extracted by the method 
extract(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
@@ -74,13 +74,14 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
 
   @Override
   public Event supply() {
-    PipeRealtimeEvent collectEvent = (PipeRealtimeEvent) pendingQueue.poll();
+    PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) pendingQueue.poll();
 
-    while (collectEvent != null) {
+    while (realtimeEvent != null) {
       Event suppliedEvent = null;
 
-      if 
(collectEvent.increaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()))
 {
-        suppliedEvent = collectEvent.getEvent();
+      if (realtimeEvent.increaseReferenceCount(
+          PipeRealtimeDataRegionLogExtractor.class.getName())) {
+        suppliedEvent = realtimeEvent.getEvent();
       } else {
         // if the event's reference count can not be increased, it means the 
data represented by
         // this event is not reliable anymore. the data has been lost. we 
simply discard this event
@@ -89,17 +90,17 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
             String.format(
                 "Tablet Event %s can not be supplied because the reference 
count can not be increased, "
                     + "the data represented by this event is lost",
-                collectEvent.getEvent());
+                realtimeEvent.getEvent());
         LOGGER.warn(errorMessage);
         PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
       }
 
-      
collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+      
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
       if (suppliedEvent != null) {
         return suppliedEvent;
       }
 
-      collectEvent = (PipeRealtimeEvent) pendingQueue.poll();
+      realtimeEvent = (PipeRealtimeEvent) pendingQueue.poll();
     }
 
     // means the pending queue is empty.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index da90c60e53f..b35d977b9db 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -35,7 +35,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileExtractor.class);
 
-  // This queue is used to store pending events collected by the method 
extract(). The method
+  // This queue is used to store pending events extracted by the method 
extract(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
@@ -74,14 +74,14 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
 
   @Override
   public Event supply() {
-    PipeRealtimeEvent collectEvent = (PipeRealtimeEvent) pendingQueue.poll();
+    PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) pendingQueue.poll();
 
-    while (collectEvent != null) {
+    while (realtimeEvent != null) {
       Event suppliedEvent = null;
 
-      if (collectEvent.increaseReferenceCount(
+      if (realtimeEvent.increaseReferenceCount(
           PipeRealtimeDataRegionTsFileExtractor.class.getName())) {
-        suppliedEvent = collectEvent.getEvent();
+        suppliedEvent = realtimeEvent.getEvent();
       } else {
         // if the event's reference count can not be increased, it means the 
data represented by
         // this event is not reliable anymore. the data has been lost. we 
simply discard this event
@@ -90,17 +90,17 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
             String.format(
                 "TsFile Event %s can not be supplied because the reference 
count can not be increased, "
                     + "the data represented by this event is lost",
-                collectEvent.getEvent());
+                realtimeEvent.getEvent());
         LOGGER.warn(errorMessage);
         PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
       }
 
-      
collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+      
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
       if (suppliedEvent != null) {
         return suppliedEvent;
       }
 
-      collectEvent = (PipeRealtimeEvent) pendingQueue.poll();
+      realtimeEvent = (PipeRealtimeEvent) pendingQueue.poll();
     }
 
     // means the pending queue is empty.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index 50dea414a78..29099e883fa 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -36,9 +36,9 @@ import java.util.concurrent.atomic.AtomicInteger;
  *
  * <p>It is used to listen to events from storage engine and publish them to 
pipe engine.
  *
- * <p>2 kinds of events are collected: 1. level-0 tsfile sealed event 2. 
insertion operation event
+ * <p>2 kinds of events are extracted: 1. level-0 tsfile sealed event 2. 
insertion operation event
  *
- * <p>All events collected by this listener will be first published to 
different
+ * <p>All events extracted by this listener will be first published to 
different
  * PipeEventDataRegionAssigners (identified by data region id), and then 
PipeEventDataRegionAssigner
  * will filter events and assign them to different 
PipeRealtimeEventDataRegionExtractors.
  */
@@ -101,12 +101,12 @@ public class PipeInsertionDataNodeListener {
 
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
 
-    // only events from registered data region will be collected
+    // only events from registered data region will be extracted
     if (assigner == null) {
       return;
     }
 
-    
assigner.publishToAssign(PipeRealtimeEventFactory.createCollectEvent(tsFileResource));
+    
assigner.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource));
   }
 
   public void listenToInsertNode(
@@ -120,13 +120,13 @@ public class PipeInsertionDataNodeListener {
 
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
 
-    // only events from registered data region will be collected
+    // only events from registered data region will be extracted
     if (assigner == null) {
       return;
     }
 
     assigner.publishToAssign(
-        PipeRealtimeEventFactory.createCollectEvent(walEntryHandler, 
insertNode, tsFileResource));
+        PipeRealtimeEventFactory.createRealtimeEvent(walEntryHandler, 
insertNode, tsFileResource));
   }
 
   /////////////////////////////// singleton ///////////////////////////////
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 4da053d9526..498fc5aecfa 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -32,9 +32,9 @@ public class PipeEventCollector implements EventCollector {
 
   // buffer queue is used to store events that are not offered to pending queue
   // because the pending queue is full. when pending queue is full, pending 
queue
-  // will notify tasks to stop collecting events, and buffer queue will be 
used to store
+  // will notify tasks to stop extracting events, and buffer queue will be 
used to store
   // events before tasks are stopped. when pending queue is not full and tasks 
are
-  // notified by the pending queue to start collecting events, buffer queue 
will be used to store
+  // notified by the pending queue to start extracting events, buffer queue 
will be used to store
   // events before events in buffer queue are offered to pending queue.
   private final Queue<Event> bufferQueue;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
index c2bf2ea2704..0b86fbccacc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
@@ -85,7 +85,7 @@ public class PipeTaskExtractorStage extends PipeTaskStage {
 
   @Override
   public void stopSubtask() throws PipeException {
-    // extractor continuously collects data, so do nothing in stop
+    // extractor continuously extracts data, so do nothing in stop
   }
 
   @Override

Reply via email to