This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch rename-collector-to-extractor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6ba8424e5453a272053dafa1e247fbd757208b17 Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Jun 24 01:16:29 2023 +0800 pipe rename: collector -> extractor --- .../db/pipe/event/realtime/PipeRealtimeEvent.java | 20 ++++----- .../event/realtime/PipeRealtimeEventFactory.java | 4 +- .../PipeHistoricalDataRegionTsFileExtractor.java | 48 +++++++++++----------- .../PipeRealtimeDataRegionHybridExtractor.java | 36 ++++++++-------- .../PipeRealtimeDataRegionLogExtractor.java | 17 ++++---- .../PipeRealtimeDataRegionTsFileExtractor.java | 16 ++++---- .../listener/PipeInsertionDataNodeListener.java | 12 +++--- .../pipe/task/connection/PipeEventCollector.java | 4 +- .../db/pipe/task/stage/PipeTaskExtractorStage.java | 2 +- 9 files changed, 80 insertions(+), 79 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java index e29e979618a..5db7b20bde0 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java @@ -28,7 +28,7 @@ import org.apache.iotdb.pipe.api.event.Event; import java.util.Map; /** - * PipeRealtimeCollectEvent is an event that decorates the EnrichedEvent with the information of + * PipeRealtimeEvent is an event that decorates the EnrichedEvent with the information of * TsFileEpoch and schema info. It only exists in the realtime event extractor. */ public class PipeRealtimeEvent extends EnrichedEvent { @@ -43,7 +43,7 @@ public class PipeRealtimeEvent extends EnrichedEvent { TsFileEpoch tsFileEpoch, Map<String, String[]> device2Measurements, String pattern) { - // pipeTaskMeta is used to report the progress of the event, the PipeRealtimeCollectEvent + // pipeTaskMeta is used to report the progress of the event, the PipeRealtimeEvent // is only used in the realtime event extractor, which does not need to report the progress // of the event, so the pipeTaskMeta is always null. super(null, pattern); @@ -59,7 +59,7 @@ public class PipeRealtimeEvent extends EnrichedEvent { Map<String, String[]> device2Measurements, PipeTaskMeta pipeTaskMeta, String pattern) { - // pipeTaskMeta is used to report the progress of the event, the PipeRealtimeCollectEvent + // pipeTaskMeta is used to report the progress of the event, the PipeRealtimeEvent // is only used in the realtime event extractor, which does not need to report the progress // of the event, so the pipeTaskMeta is always null. super(pipeTaskMeta, pattern); @@ -87,10 +87,10 @@ public class PipeRealtimeEvent extends EnrichedEvent { @Override public boolean increaseReferenceCount(String holderMessage) { - // This method must be overridden, otherwise during the real-time data collection stage, the - // current PipeRealtimeCollectEvent rather than the member variable EnrichedEvent will increase + // This method must be overridden, otherwise during the real-time data extraction stage, the + // current PipeRealtimeEvent rather than the member variable EnrichedEvent will increase // the reference count, resulting in errors in the reference count of the EnrichedEvent - // contained in this PipeRealtimeCollectEvent during the processor and connector stages. + // contained in this PipeRealtimeEvent during the processor and connector stages. return event.increaseReferenceCount(holderMessage); } @@ -101,10 +101,10 @@ public class PipeRealtimeEvent extends EnrichedEvent { @Override public boolean decreaseReferenceCount(String holderMessage) { - // This method must be overridden, otherwise during the real-time data collection stage, the - // current PipeRealtimeCollectEvent rather than the member variable EnrichedEvent will increase + // This method must be overridden, otherwise during the real-time data extraction stage, the + // current PipeRealtimeEvent rather than the member variable EnrichedEvent will increase // the reference count, resulting in errors in the reference count of the EnrichedEvent - // contained in this PipeRealtimeCollectEvent during the processor and connector stages. + // contained in this PipeRealtimeEvent during the processor and connector stages. return event.decreaseReferenceCount(holderMessage); } @@ -131,7 +131,7 @@ public class PipeRealtimeEvent extends EnrichedEvent { @Override public String toString() { - return "PipeRealtimeCollectEvent{" + return "PipeRealtimeEvent{" + "event=" + event + ", tsFileEpoch=" diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java index 247a8b17daf..6562ae8d9c0 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java @@ -30,12 +30,12 @@ public class PipeRealtimeEventFactory { private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new TsFileEpochManager(); - public static PipeRealtimeEvent createCollectEvent(TsFileResource resource) { + public static PipeRealtimeEvent createRealtimeEvent(TsFileResource resource) { return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent( new PipeTsFileInsertionEvent(resource), resource); } - public static PipeRealtimeEvent createCollectEvent( + public static PipeRealtimeEvent createRealtimeEvent( WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource resource) { return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent( new PipeInsertNodeTabletInsertionEvent( diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java index c59ba2a0ec3..f846e6734b7 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -62,10 +62,10 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa private String pattern; - private long historicalDataCollectionStartTime; // event time - private long historicalDataCollectionEndTime; // event time + private long historicalDataExtractionStartTime; // event time + private long historicalDataExtractionEndTime; // event time - private long historicalDataCollectionTimeLowerBound; // arrival time + private long historicalDataExtractionTimeLowerBound; // arrival time private Queue<PipeTsFileInsertionEvent> pendingQueue; @@ -88,50 +88,50 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY, EXTRACTOR_PATTERN_DEFAULT_VALUE); // user may set the EXTRACTOR_HISTORY_START_TIME and EXTRACTOR_HISTORY_END_TIME without - // enabling the historical data collection, which may affect the realtime data collection. + // enabling the historical data extraction, which may affect the realtime data extraction. final boolean isHistoricalExtractorEnabledByUser = parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true); - historicalDataCollectionStartTime = + historicalDataExtractionStartTime = isHistoricalExtractorEnabledByUser && parameters.hasAttribute(EXTRACTOR_HISTORY_START_TIME) ? DateTimeUtils.convertDatetimeStrToLong( parameters.getString(EXTRACTOR_HISTORY_START_TIME), ZoneId.systemDefault()) : Long.MIN_VALUE; - historicalDataCollectionEndTime = + historicalDataExtractionEndTime = isHistoricalExtractorEnabledByUser && parameters.hasAttribute(EXTRACTOR_HISTORY_END_TIME) ? DateTimeUtils.convertDatetimeStrToLong( parameters.getString(EXTRACTOR_HISTORY_END_TIME), ZoneId.systemDefault()) : Long.MAX_VALUE; // enable historical extractor by default - historicalDataCollectionTimeLowerBound = + historicalDataExtractionTimeLowerBound = parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true) ? Long.MIN_VALUE // We define the realtime data as the data generated after the creation time // of the pipe from user's perspective. But we still need to use // PipeHistoricalDataRegionExtractor to extract the realtime data generated between the // creation time of the pipe and the time when the pipe starts, because those data - // can not be listened by PipeRealtimeDataRegionExtractor, and should be collected by + // can not be listened by PipeRealtimeDataRegionExtractor, and should be extracted by // PipeHistoricalDataRegionExtractor from implementation perspective. : environment.getCreationTime(); // Only invoke flushDataRegionAllTsFiles() when the pipe runs in the realtime only mode. - // realtime only mode -> (historicalDataCollectionTimeLowerBound != Long.MIN_VALUE) + // realtime only mode -> (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) // - // Ensure that all data in the data region is flushed to disk before collecting data. + // Ensure that all data in the data region is flushed to disk before extracting data. // This ensures the generation time of all newly generated TsFiles (realtime data) after the // invocation of flushDataRegionAllTsFiles() is later than the creationTime of the pipe - // (historicalDataCollectionTimeLowerBound). + // (historicalDataExtractionTimeLowerBound). // // Note that: the generation time of the TsFile is the time when the TsFile is created, not // the time when the data is flushed to the TsFile. // // Then we can use the generation time of the TsFile to determine whether the data in the - // TsFile should be collected by comparing the generation time of the TsFile with the - // historicalDataCollectionTimeLowerBound when starting the pipe in realtime only mode. + // TsFile should be extracted by comparing the generation time of the TsFile with the + // historicalDataExtractionTimeLowerBound when starting the pipe in realtime only mode. // // If we don't invoke flushDataRegionAllTsFiles() in the realtime only mode, the data generated // between the creation time of the pipe the time when the pipe starts will be lost. - if (historicalDataCollectionTimeLowerBound != Long.MIN_VALUE) { + if (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) { flushDataRegionAllTsFiles(); } } @@ -174,15 +174,15 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa resource -> !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) - && isTsFileGeneratedAfterCollectionTimeLowerBound(resource)) + && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) .map( resource -> new PipeTsFileInsertionEvent( resource, pipeTaskMeta, pattern, - historicalDataCollectionStartTime, - historicalDataCollectionEndTime)) + historicalDataExtractionStartTime, + historicalDataExtractionEndTime)) .collect(Collectors.toList())); pendingQueue.addAll( tsFileManager.getTsFileList(false).stream() @@ -190,15 +190,15 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa resource -> !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) - && isTsFileGeneratedAfterCollectionTimeLowerBound(resource)) + && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) .map( resource -> new PipeTsFileInsertionEvent( resource, pipeTaskMeta, pattern, - historicalDataCollectionStartTime, - historicalDataCollectionEndTime)) + historicalDataExtractionStartTime, + historicalDataExtractionEndTime)) .collect(Collectors.toList())); pendingQueue.forEach( event -> @@ -213,13 +213,13 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa } private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource resource) { - return !(resource.getFileEndTime() < historicalDataCollectionStartTime - || historicalDataCollectionEndTime < resource.getFileStartTime()); + return !(resource.getFileEndTime() < historicalDataExtractionStartTime + || historicalDataExtractionEndTime < resource.getFileStartTime()); } - private boolean isTsFileGeneratedAfterCollectionTimeLowerBound(TsFileResource resource) { + private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(TsFileResource resource) { try { - return historicalDataCollectionTimeLowerBound + return historicalDataExtractionTimeLowerBound <= TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime(); } catch (IOException e) { LOGGER.warn( diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java index 06a0135a4ee..b143fd09f5d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -37,7 +37,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class); - // This queue is used to store pending events collected by the method extract(). The method + // This queue is used to store pending events extracted by the method extract(). The method // supply() will poll events from this queue and send them to the next pipe plugin. private final UnboundedBlockingPendingQueue<Event> pendingQueue; @@ -47,17 +47,17 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio @Override public void extract(PipeRealtimeEvent event) { - final Event eventToCollect = event.getEvent(); + final Event eventToExtract = event.getEvent(); - if (eventToCollect instanceof TabletInsertionEvent) { - collectTabletInsertion(event); - } else if (eventToCollect instanceof TsFileInsertionEvent) { - collectTsFileInsertion(event); + if (eventToExtract instanceof TabletInsertionEvent) { + extractTabletInsertion(event); + } else if (eventToExtract instanceof TsFileInsertionEvent) { + extractTsFileInsertion(event); } else { throw new UnsupportedOperationException( String.format( "Unsupported event type %s for hybrid realtime extractor %s", - eventToCollect.getClass(), this)); + eventToExtract.getClass(), this)); } } @@ -71,7 +71,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio return true; } - private void collectTabletInsertion(PipeRealtimeEvent event) { + private void extractTabletInsertion(PipeRealtimeEvent event) { if (isApproachingCapacity()) { event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); // if the pending queue is approaching capacity, we should not extract any more tablet events. @@ -83,7 +83,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio if (!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE) && !pendingQueue.offer(event)) { LOGGER.warn( - "collectTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} has reached capacity, discard tablet event {}, current state {}", + "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} has reached capacity, discard tablet event {}, current state {}", this, event, event.getTsFileEpoch().getState(this)); @@ -92,7 +92,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } } - private void collectTsFileInsertion(PipeRealtimeEvent event) { + private void extractTsFileInsertion(PipeRealtimeEvent event) { event .getTsFileEpoch() .migrateState( @@ -102,7 +102,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio if (!pendingQueue.offer(event)) { LOGGER.warn( - "collectTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} has reached capacity, discard TsFile event {}, current state {}", + "extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} has reached capacity, discard TsFile event {}, current state {}", this, event, event.getTsFileEpoch().getState(this)); @@ -118,17 +118,17 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio @Override public Event supply() { - PipeRealtimeEvent collectEvent = (PipeRealtimeEvent) pendingQueue.poll(); + PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) pendingQueue.poll(); - while (collectEvent != null) { + while (realtimeEvent != null) { Event suppliedEvent; // used to judge type of event, not directly for supplying. - final Event eventToSupply = collectEvent.getEvent(); + final Event eventToSupply = realtimeEvent.getEvent(); if (eventToSupply instanceof TabletInsertionEvent) { - suppliedEvent = supplyTabletInsertion(collectEvent); + suppliedEvent = supplyTabletInsertion(realtimeEvent); } else if (eventToSupply instanceof TsFileInsertionEvent) { - suppliedEvent = supplyTsFileInsertion(collectEvent); + suppliedEvent = supplyTsFileInsertion(realtimeEvent); } else { throw new UnsupportedOperationException( String.format( @@ -136,12 +136,12 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio eventToSupply.getClass(), this)); } - collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); + realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); if (suppliedEvent != null) { return suppliedEvent; } - collectEvent = (PipeRealtimeEvent) pendingQueue.poll(); + realtimeEvent = (PipeRealtimeEvent) pendingQueue.poll(); } // means the pending queue is empty. diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java index 641150826ee..320c865d96d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java @@ -35,7 +35,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionLogExtractor.class); - // This queue is used to store pending events collected by the method extract(). The method + // This queue is used to store pending events extracted by the method extract(). The method // supply() will poll events from this queue and send them to the next pipe plugin. private final UnboundedBlockingPendingQueue<Event> pendingQueue; @@ -74,13 +74,14 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx @Override public Event supply() { - PipeRealtimeEvent collectEvent = (PipeRealtimeEvent) pendingQueue.poll(); + PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) pendingQueue.poll(); - while (collectEvent != null) { + while (realtimeEvent != null) { Event suppliedEvent = null; - if (collectEvent.increaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName())) { - suppliedEvent = collectEvent.getEvent(); + if (realtimeEvent.increaseReferenceCount( + PipeRealtimeDataRegionLogExtractor.class.getName())) { + suppliedEvent = realtimeEvent.getEvent(); } else { // if the event's reference count can not be increased, it means the data represented by // this event is not reliable anymore. the data has been lost. we simply discard this event @@ -89,17 +90,17 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx String.format( "Tablet Event %s can not be supplied because the reference count can not be increased, " + "the data represented by this event is lost", - collectEvent.getEvent()); + realtimeEvent.getEvent()); LOGGER.warn(errorMessage); PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); } - collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()); + realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()); if (suppliedEvent != null) { return suppliedEvent; } - collectEvent = (PipeRealtimeEvent) pendingQueue.poll(); + realtimeEvent = (PipeRealtimeEvent) pendingQueue.poll(); } // means the pending queue is empty. diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java index da90c60e53f..b35d977b9db 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java @@ -35,7 +35,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileExtractor.class); - // This queue is used to store pending events collected by the method extract(). The method + // This queue is used to store pending events extracted by the method extract(). The method // supply() will poll events from this queue and send them to the next pipe plugin. private final UnboundedBlockingPendingQueue<Event> pendingQueue; @@ -74,14 +74,14 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio @Override public Event supply() { - PipeRealtimeEvent collectEvent = (PipeRealtimeEvent) pendingQueue.poll(); + PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) pendingQueue.poll(); - while (collectEvent != null) { + while (realtimeEvent != null) { Event suppliedEvent = null; - if (collectEvent.increaseReferenceCount( + if (realtimeEvent.increaseReferenceCount( PipeRealtimeDataRegionTsFileExtractor.class.getName())) { - suppliedEvent = collectEvent.getEvent(); + suppliedEvent = realtimeEvent.getEvent(); } else { // if the event's reference count can not be increased, it means the data represented by // this event is not reliable anymore. the data has been lost. we simply discard this event @@ -90,17 +90,17 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio String.format( "TsFile Event %s can not be supplied because the reference count can not be increased, " + "the data represented by this event is lost", - collectEvent.getEvent()); + realtimeEvent.getEvent()); LOGGER.warn(errorMessage); PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); } - collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName()); + realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName()); if (suppliedEvent != null) { return suppliedEvent; } - collectEvent = (PipeRealtimeEvent) pendingQueue.poll(); + realtimeEvent = (PipeRealtimeEvent) pendingQueue.poll(); } // means the pending queue is empty. diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java index 50dea414a78..29099e883fa 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java @@ -36,9 +36,9 @@ import java.util.concurrent.atomic.AtomicInteger; * * <p>It is used to listen to events from storage engine and publish them to pipe engine. * - * <p>2 kinds of events are collected: 1. level-0 tsfile sealed event 2. insertion operation event + * <p>2 kinds of events are extracted: 1. level-0 tsfile sealed event 2. insertion operation event * - * <p>All events collected by this listener will be first published to different + * <p>All events extracted by this listener will be first published to different * PipeEventDataRegionAssigners (identified by data region id), and then PipeEventDataRegionAssigner * will filter events and assign them to different PipeRealtimeEventDataRegionExtractors. */ @@ -101,12 +101,12 @@ public class PipeInsertionDataNodeListener { final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); - // only events from registered data region will be collected + // only events from registered data region will be extracted if (assigner == null) { return; } - assigner.publishToAssign(PipeRealtimeEventFactory.createCollectEvent(tsFileResource)); + assigner.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource)); } public void listenToInsertNode( @@ -120,13 +120,13 @@ public class PipeInsertionDataNodeListener { final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); - // only events from registered data region will be collected + // only events from registered data region will be extracted if (assigner == null) { return; } assigner.publishToAssign( - PipeRealtimeEventFactory.createCollectEvent(walEntryHandler, insertNode, tsFileResource)); + PipeRealtimeEventFactory.createRealtimeEvent(walEntryHandler, insertNode, tsFileResource)); } /////////////////////////////// singleton /////////////////////////////// diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java index 4da053d9526..498fc5aecfa 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java @@ -32,9 +32,9 @@ public class PipeEventCollector implements EventCollector { // buffer queue is used to store events that are not offered to pending queue // because the pending queue is full. when pending queue is full, pending queue - // will notify tasks to stop collecting events, and buffer queue will be used to store + // will notify tasks to stop extracting events, and buffer queue will be used to store // events before tasks are stopped. when pending queue is not full and tasks are - // notified by the pending queue to start collecting events, buffer queue will be used to store + // notified by the pending queue to start extracting events, buffer queue will be used to store // events before events in buffer queue are offered to pending queue. private final Queue<Event> bufferQueue; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java index c2bf2ea2704..0b86fbccacc 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java @@ -85,7 +85,7 @@ public class PipeTaskExtractorStage extends PipeTaskStage { @Override public void stopSubtask() throws PipeException { - // extractor continuously collects data, so do nothing in stop + // extractor continuously extracts data, so do nothing in stop } @Override
