This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rename-collector-to-extractor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4f60b69d27e2257baf98b0f9759ad9780119acc2
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jun 23 23:14:40 2023 +0800

    pipe rename: collector -> extractor
---
 .../response/pipe/task/PipeTableResp.java          |   2 +-
 .../persistence/pipe/PipePluginInfo.java           |   2 +-
 .../commons/pipe/task/meta/PipeStaticMeta.java     |   2 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   2 +-
 .../db/pipe/agent/plugin/PipePluginAgent.java      |   6 +-
 ...torConstant.java => PipeExtractorConstant.java} |  22 ++--
 ...va => PipeTaskExtractorRuntimeEnvironment.java} |   4 +-
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  |   2 +-
 .../common/tablet/PipeRawTabletInsertionEvent.java |   4 +-
 ...imeCollectEvent.java => PipeRealtimeEvent.java} |  18 +--
 ...tFactory.java => PipeRealtimeEventFactory.java} |  10 +-
 .../IoTDBDataRegionExtractor.java                  | 138 ++++++++++-----------
 .../PipeHistoricalDataRegionExtractor.java         |   2 +-
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  52 ++++----
 .../realtime/PipeRealtimeDataRegionExtractor.java  |  20 +--
 .../PipeRealtimeDataRegionFakeExtractor.java       |   8 +-
 .../PipeRealtimeDataRegionHybridExtractor.java     |  32 ++---
 .../PipeRealtimeDataRegionLogExtractor.java        |  16 +--
 .../PipeRealtimeDataRegionTsFileExtractor.java     |  16 +--
 .../realtime/assigner/DisruptorQueue.java          |   2 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  |  42 +++----
 .../realtime/epoch/TsFileEpoch.java                |  24 ++--
 .../realtime/epoch/TsFileEpochManager.java         |  12 +-
 .../realtime/epoch/TsFileEpochStateMigrator.java   |   2 +-
 .../listener/PipeInsertionDataNodeListener.java    |  51 ++++----
 .../matcher/CachedSchemaPatternMatcher.java        | 100 +++++++--------
 .../realtime/matcher/PipeDataRegionMatcher.java    |  28 ++---
 .../db/pipe/processor/PipeDoNothingProcessor.java  |   4 +-
 .../org/apache/iotdb/db/pipe/task/PipeTask.java    |  14 +--
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |  18 +--
 .../db/pipe/task/connection/EventSupplier.java     |   4 +-
 ...ectorStage.java => PipeTaskExtractorStage.java} |  30 ++---
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |   2 +-
 .../CachedSchemaPatternMatcherTest.java            |  34 ++---
 .../PipeRealtimeCollectTest.java                   |  26 ++--
 35 files changed, 375 insertions(+), 376 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 2b5bb5e6894..d5edeb49674 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -116,7 +116,7 @@ public class PipeTableResp implements DataSet {
               staticMeta.getPipeName(),
               staticMeta.getCreationTime(),
               runtimeMeta.getStatus().get().name(),
-              staticMeta.getCollectorParameters().toString(),
+              staticMeta.getExtractorParameters().toString(),
               staticMeta.getProcessorParameters().toString(),
               staticMeta.getConnectorParameters().toString(),
               exceptionMessageBuilder.toString()));
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 7a24618d6a7..4c6466cdf40 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -126,7 +126,7 @@ public class PipePluginInfo implements SnapshotProcessor {
         new PipeParameters(createPipeRequest.getCollectorAttributes());
     final String collectorPluginName =
         collectorParameters.getStringOrDefault(
-            PipeExtractorConstant.COLLECTOR_KEY, 
IOTDB_EXTRACTOR.getPipePluginName());
+            PipeExtractorConstant.EXTRACTOR_KEY, 
IOTDB_EXTRACTOR.getPipePluginName());
     if (!pipePluginMetaKeeper.containsPipePlugin(collectorPluginName)) {
       final String exceptionMessage =
           String.format(
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index 09a43c4ef76..1024b866dda 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -63,7 +63,7 @@ public class PipeStaticMeta {
     return creationTime;
   }
 
-  public PipeParameters getCollectorParameters() {
+  public PipeParameters getExtractorParameters() {
     return collectorParameters;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 26b32a1be61..8d499872195 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -53,7 +53,7 @@ import org.apache.iotdb.db.mpp.metric.QueryResourceMetricSet;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
-import 
org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.rescon.MemTableManager;
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 7aa7c3d5449..0074e795134 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -195,11 +195,11 @@ public class PipePluginAgent {
     }
   }
 
-  public PipeExtractor reflectCollector(PipeParameters collectorParameters) {
+  public PipeExtractor reflectExtractor(PipeParameters extractorParameters) {
     return (PipeExtractor)
         reflect(
-            collectorParameters.getStringOrDefault(
-                PipeExtractorConstant.COLLECTOR_KEY,
+            extractorParameters.getStringOrDefault(
+                PipeExtractorConstant.EXTRACTOR_KEY,
                 BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()));
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
similarity index 59%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
index 5c3f660f8e6..6b003860352 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
@@ -21,20 +21,20 @@ package org.apache.iotdb.db.pipe.config.constant;
 
 public class PipeExtractorConstant {
 
-  public static final String COLLECTOR_KEY = "collector";
+  public static final String EXTRACTOR_KEY = "extractor";
 
-  public static final String COLLECTOR_PATTERN_KEY = "collector.pattern";
-  public static final String COLLECTOR_PATTERN_DEFAULT_VALUE = "root";
+  public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern";
+  public static final String EXTRACTOR_PATTERN_DEFAULT_VALUE = "root";
 
-  public static final String COLLECTOR_HISTORY_ENABLE_KEY = 
"collector.history.enable";
-  public static final String COLLECTOR_HISTORY_START_TIME = 
"collector.history.start-time";
-  public static final String COLLECTOR_HISTORY_END_TIME = 
"collector.history.end-time";
+  public static final String EXTRACTOR_HISTORY_ENABLE_KEY = 
"extractor.history.enable";
+  public static final String EXTRACTOR_HISTORY_START_TIME = 
"extractor.history.start-time";
+  public static final String EXTRACTOR_HISTORY_END_TIME = 
"extractor.history.end-time";
 
-  public static final String COLLECTOR_REALTIME_ENABLE = 
"collector.realtime.enable";
-  public static final String COLLECTOR_REALTIME_MODE = 
"collector.realtime.mode";
-  public static final String COLLECTOR_REALTIME_MODE_HYBRID = "hybrid";
-  public static final String COLLECTOR_REALTIME_MODE_FILE = "file";
-  public static final String COLLECTOR_REALTIME_MODE_LOG = "log";
+  public static final String EXTRACTOR_REALTIME_ENABLE = 
"extractor.realtime.enable";
+  public static final String EXTRACTOR_REALTIME_MODE = 
"extractor.realtime.mode";
+  public static final String EXTRACTOR_REALTIME_MODE_HYBRID = "hybrid";
+  public static final String EXTRACTOR_REALTIME_MODE_FILE = "file";
+  public static final String EXTRACTOR_REALTIME_MODE_LOG = "log";
 
   private PipeExtractorConstant() {
     throw new IllegalStateException("Utility class");
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
similarity index 92%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
index 530c977b9eb..a726637d5b0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
@@ -21,13 +21,13 @@ package org.apache.iotdb.db.pipe.config.plugin.env;
 
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 
-public class PipeTaskCollectorRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
+public class PipeTaskExtractorRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
 
   private final int regionId;
 
   private final PipeTaskMeta pipeTaskMeta;
 
-  public PipeTaskCollectorRuntimeEnvironment(
+  public PipeTaskExtractorRuntimeEnvironment(
       String pipeName, long creationTime, int regionId, PipeTaskMeta 
pipeTaskMeta) {
     super(pipeName, creationTime);
     this.regionId = regionId;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 4f70f17674c..ddd0c78877c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -125,7 +125,7 @@ public abstract class EnrichedEvent implements Event {
    * @return the pattern
    */
   public final String getPattern() {
-    return pattern == null ? 
PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern;
+    return pattern == null ? 
PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern;
   }
 
   public abstract EnrichedEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 24f52d0ea87..b769be62237 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -47,7 +47,7 @@ public class PipeRawTabletInsertionEvent implements 
TabletInsertionEvent {
   }
 
   public String getPattern() {
-    return pattern == null ? 
PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern;
+    return pattern == null ? 
PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern;
   }
 
   /////////////////////////// TabletInsertionEvent ///////////////////////////
@@ -78,7 +78,7 @@ public class PipeRawTabletInsertionEvent implements 
TabletInsertionEvent {
     final String notNullPattern = getPattern();
 
     // if notNullPattern is "root", we don't need to convert, just return the 
original tablet
-    if 
(notNullPattern.equals(PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
+    if 
(notNullPattern.equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE)) {
       return tablet;
     }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
similarity index 90%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index cb84d0bc11a..e29e979618a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -21,30 +21,30 @@ package org.apache.iotdb.db.pipe.event.realtime;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.Map;
 
 /**
  * PipeRealtimeCollectEvent is an event that decorates the EnrichedEvent with 
the information of
- * TsFileEpoch and schema info. It only exists in the realtime event collector.
+ * TsFileEpoch and schema info. It only exists in the realtime event extractor.
  */
-public class PipeRealtimeCollectEvent extends EnrichedEvent {
+public class PipeRealtimeEvent extends EnrichedEvent {
 
   private final EnrichedEvent event;
   private final TsFileEpoch tsFileEpoch;
 
   private Map<String, String[]> device2Measurements;
 
-  public PipeRealtimeCollectEvent(
+  public PipeRealtimeEvent(
       EnrichedEvent event,
       TsFileEpoch tsFileEpoch,
       Map<String, String[]> device2Measurements,
       String pattern) {
     // pipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeCollectEvent
-    // is only used in the realtime event collector, which does not need to 
report the progress
+    // is only used in the realtime event extractor, which does not need to 
report the progress
     // of the event, so the pipeTaskMeta is always null.
     super(null, pattern);
 
@@ -53,14 +53,14 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent 
{
     this.device2Measurements = device2Measurements;
   }
 
-  public PipeRealtimeCollectEvent(
+  public PipeRealtimeEvent(
       EnrichedEvent event,
       TsFileEpoch tsFileEpoch,
       Map<String, String[]> device2Measurements,
       PipeTaskMeta pipeTaskMeta,
       String pattern) {
     // pipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeCollectEvent
-    // is only used in the realtime event collector, which does not need to 
report the progress
+    // is only used in the realtime event extractor, which does not need to 
report the progress
     // of the event, so the pipeTaskMeta is always null.
     super(pipeTaskMeta, pattern);
 
@@ -119,9 +119,9 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent 
{
   }
 
   @Override
-  public PipeRealtimeCollectEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+  public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       PipeTaskMeta pipeTaskMeta, String pattern) {
-    return new PipeRealtimeCollectEvent(
+    return new PipeRealtimeEvent(
         
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta, 
pattern),
         this.tsFileEpoch,
         this.device2Measurements,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
similarity index 85%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 2b2be4abc21..247a8b17daf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -21,21 +21,21 @@ package org.apache.iotdb.db.pipe.event.realtime;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpochManager;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpochManager;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 
-public class PipeRealtimeCollectEventFactory {
+public class PipeRealtimeEventFactory {
 
   private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new 
TsFileEpochManager();
 
-  public static PipeRealtimeCollectEvent createCollectEvent(TsFileResource 
resource) {
+  public static PipeRealtimeEvent createCollectEvent(TsFileResource resource) {
     return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
         new PipeTsFileInsertionEvent(resource), resource);
   }
 
-  public static PipeRealtimeCollectEvent createCollectEvent(
+  public static PipeRealtimeEvent createCollectEvent(
       WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource 
resource) {
     return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
         new PipeInsertNodeTabletInsertionEvent(
@@ -44,7 +44,7 @@ public class PipeRealtimeCollectEventFactory {
         resource);
   }
 
-  private PipeRealtimeCollectEventFactory() {
+  private PipeRealtimeEventFactory() {
     // factory class, do not instantiate
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionExtractor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
similarity index 58%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionExtractor.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index 48a34fc923c..d909e9b9493 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionExtractor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -17,18 +17,18 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector;
+package org.apache.iotdb.db.pipe.extractor;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.db.engine.StorageEngine;
-import 
org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionExtractor;
-import 
org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionTsFileExtractor;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionFakeExtractor;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridExtractor;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionLogExtractor;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionTsFileExtractor;
-import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import 
org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionTsFileExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionFakeExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionHybridExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionLogExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionTsFileExtractor;
 import org.apache.iotdb.pipe.api.PipeExtractor;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -42,12 +42,12 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_ENABLE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE_FILE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE_HYBRID;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE_LOG;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG;
 
 public class IoTDBDataRegionExtractor implements PipeExtractor {
 
@@ -55,8 +55,8 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
 
   private final AtomicBoolean hasBeenStarted;
 
-  private PipeHistoricalDataRegionExtractor historicalCollector;
-  private PipeRealtimeDataRegionExtractor realtimeCollector;
+  private PipeHistoricalDataRegionExtractor historicalExtractor;
+  private PipeRealtimeDataRegionExtractor realtimeExtractor;
 
   private int dataRegionId;
 
@@ -66,70 +66,70 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
-    // validate collector.history.enable and collector.realtime.enable
+    // validate extractor.history.enable and extractor.realtime.enable
     validator
         .validateAttributeValueRange(
-            COLLECTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
+            EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
         .validateAttributeValueRange(
-            COLLECTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
+            EXTRACTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
         .validate(
             args -> (boolean) args[0] || (boolean) args[1],
             String.format(
                 "Should not set both %s and %s to false.",
-                COLLECTOR_HISTORY_ENABLE_KEY, COLLECTOR_REALTIME_ENABLE),
-            
validator.getParameters().getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, 
true),
-            
validator.getParameters().getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true));
+                EXTRACTOR_HISTORY_ENABLE_KEY, EXTRACTOR_REALTIME_ENABLE),
+            
validator.getParameters().getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, 
true),
+            
validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true));
 
-    // validate collector.realtime.mode
-    if 
(validator.getParameters().getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, 
true)) {
+    // validate extractor.realtime.mode
+    if 
(validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, 
true)) {
       validator.validateAttributeValueRange(
-          COLLECTOR_REALTIME_MODE,
+          EXTRACTOR_REALTIME_MODE,
           true,
-          COLLECTOR_REALTIME_MODE_HYBRID,
-          COLLECTOR_REALTIME_MODE_FILE,
-          COLLECTOR_REALTIME_MODE_LOG);
+          EXTRACTOR_REALTIME_MODE_HYBRID,
+          EXTRACTOR_REALTIME_MODE_FILE,
+          EXTRACTOR_REALTIME_MODE_LOG);
     }
 
-    constructHistoricalCollector();
-    constructRealtimeCollector(validator.getParameters());
+    constructHistoricalExtractor();
+    constructRealtimeExtractor(validator.getParameters());
 
-    historicalCollector.validate(validator);
-    realtimeCollector.validate(validator);
+    historicalExtractor.validate(validator);
+    realtimeExtractor.validate(validator);
   }
 
-  private void constructHistoricalCollector() {
-    // enable historical collector by default
-    historicalCollector = new PipeHistoricalDataRegionTsFileExtractor();
+  private void constructHistoricalExtractor() {
+    // enable historical extractor by default
+    historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
   }
 
-  private void constructRealtimeCollector(PipeParameters parameters) {
-    // enable realtime collector by default
-    if (!parameters.getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true)) {
-      realtimeCollector = new PipeRealtimeDataRegionFakeExtractor();
+  private void constructRealtimeExtractor(PipeParameters parameters) {
+    // enable realtime extractor by default
+    if (!parameters.getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) {
+      realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
       return;
     }
 
     // use hybrid mode by default
-    if (!parameters.hasAttribute(COLLECTOR_REALTIME_MODE)) {
-      realtimeCollector = new PipeRealtimeDataRegionHybridExtractor();
+    if (!parameters.hasAttribute(EXTRACTOR_REALTIME_MODE)) {
+      realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
       return;
     }
 
-    switch (parameters.getString(COLLECTOR_REALTIME_MODE)) {
-      case COLLECTOR_REALTIME_MODE_FILE:
-        realtimeCollector = new PipeRealtimeDataRegionTsFileExtractor();
+    switch (parameters.getString(EXTRACTOR_REALTIME_MODE)) {
+      case EXTRACTOR_REALTIME_MODE_FILE:
+        realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
         break;
-      case COLLECTOR_REALTIME_MODE_LOG:
-        realtimeCollector = new PipeRealtimeDataRegionLogExtractor();
+      case EXTRACTOR_REALTIME_MODE_LOG:
+        realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
         break;
-      case COLLECTOR_REALTIME_MODE_HYBRID:
-        realtimeCollector = new PipeRealtimeDataRegionHybridExtractor();
+      case EXTRACTOR_REALTIME_MODE_HYBRID:
+        realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
         break;
       default:
-        realtimeCollector = new PipeRealtimeDataRegionHybridExtractor();
+        realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
         LOGGER.warn(
-            "Unsupported collector realtime mode: {}, create a hybrid 
collector.",
-            parameters.getString(COLLECTOR_REALTIME_MODE));
+            "Unsupported extractor realtime mode: {}, create a hybrid 
extractor.",
+            parameters.getString(EXTRACTOR_REALTIME_MODE));
     }
   }
 
@@ -137,10 +137,10 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
   public void customize(PipeParameters parameters, 
PipeExtractorRuntimeConfiguration configuration)
       throws Exception {
     dataRegionId =
-        ((PipeTaskCollectorRuntimeEnvironment) 
configuration.getRuntimeEnvironment()).getRegionId();
+        ((PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment()).getRegionId();
 
-    historicalCollector.customize(parameters, configuration);
-    realtimeCollector.customize(parameters, configuration);
+    historicalExtractor.customize(parameters, configuration);
+    realtimeExtractor.customize(parameters, configuration);
   }
 
   @Override
@@ -153,7 +153,7 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
     final AtomicReference<Exception> exceptionHolder = new 
AtomicReference<>(null);
     final DataRegionId dataRegionIdObject = new 
DataRegionId(this.dataRegionId);
     while (true) {
-      // try to start collectors in the data region ...
+      // try to start extractors in the data region ...
       // first try to run if data region exists, then try to run if data 
region does not exist.
       // both conditions fail is not common, which means the data region is 
created during the
       // runIfPresent and runIfAbsent operations. in this case, we need to 
retry.
@@ -165,7 +165,7 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
                         String.format(
                             "Pipe: starting %s", 
IoTDBDataRegionExtractor.class.getName()));
                     try {
-                      
startHistoricalCollectorAndRealtimeCollector(exceptionHolder);
+                      
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder);
                     } finally {
                       dataRegion.writeUnlock();
                     }
@@ -173,7 +173,7 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
           || StorageEngine.getInstance()
               .runIfAbsent(
                   dataRegionIdObject,
-                  () -> 
startHistoricalCollectorAndRealtimeCollector(exceptionHolder))) {
+                  () -> 
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder))) {
         rethrowExceptionIfAny(exceptionHolder);
         return;
       }
@@ -181,37 +181,37 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
     }
   }
 
-  private void startHistoricalCollectorAndRealtimeCollector(
+  private void startHistoricalExtractorAndRealtimeExtractor(
       AtomicReference<Exception> exceptionHolder) {
     try {
-      historicalCollector.start();
-      realtimeCollector.start();
+      historicalExtractor.start();
+      realtimeExtractor.start();
     } catch (Exception e) {
       exceptionHolder.set(e);
       LOGGER.warn(
           String.format(
-              "Start historical collector %s and realtime collector %s error.",
-              historicalCollector, realtimeCollector),
+              "Start historical extractor %s and realtime extractor %s error.",
+              historicalExtractor, realtimeExtractor),
           e);
     }
   }
 
   private void rethrowExceptionIfAny(AtomicReference<Exception> 
exceptionHolder) {
     if (exceptionHolder.get() != null) {
-      throw new PipeException("failed to start collectors.", 
exceptionHolder.get());
+      throw new PipeException("failed to start extractors.", 
exceptionHolder.get());
     }
   }
 
   @Override
   public Event supply() throws Exception {
-    return historicalCollector.hasConsumedAll()
-        ? realtimeCollector.supply()
-        : historicalCollector.supply();
+    return historicalExtractor.hasConsumedAll()
+        ? realtimeExtractor.supply()
+        : historicalExtractor.supply();
   }
 
   @Override
   public void close() throws Exception {
-    historicalCollector.close();
-    realtimeCollector.close();
+    historicalExtractor.close();
+    realtimeExtractor.close();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionExtractor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java
similarity index 94%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionExtractor.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java
index 858ae933cb5..4cba6de3f01 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionExtractor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.historical;
+package org.apache.iotdb.db.pipe.extractor.historical;
 
 import org.apache.iotdb.pipe.api.PipeExtractor;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
similarity index 84%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileExtractor.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 2327e63b07a..c59ba2a0ec3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.historical;
+package org.apache.iotdb.db.pipe.extractor.historical;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
@@ -44,11 +44,11 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_END_TIME;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_START_TIME;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_PATTERN_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
 
 public class PipeHistoricalDataRegionTsFileExtractor implements 
PipeHistoricalDataRegionExtractor {
 
@@ -77,41 +77,41 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   @Override
   public void customize(
       PipeParameters parameters, PipeExtractorRuntimeConfiguration 
configuration) {
-    final PipeTaskCollectorRuntimeEnvironment environment =
-        (PipeTaskCollectorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
+    final PipeTaskExtractorRuntimeEnvironment environment =
+        (PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
 
     pipeTaskMeta = environment.getPipeTaskMeta();
     startIndex = environment.getPipeTaskMeta().getProgressIndex();
 
     dataRegionId = environment.getRegionId();
 
-    pattern = parameters.getStringOrDefault(COLLECTOR_PATTERN_KEY, 
COLLECTOR_PATTERN_DEFAULT_VALUE);
+    pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY, 
EXTRACTOR_PATTERN_DEFAULT_VALUE);
 
-    // user may set the COLLECTOR_HISTORY_START_TIME and 
COLLECTOR_HISTORY_END_TIME without
+    // user may set the EXTRACTOR_HISTORY_START_TIME and 
EXTRACTOR_HISTORY_END_TIME without
     // enabling the historical data collection, which may affect the realtime 
data collection.
-    final boolean isHistoricalCollectorEnabledByUser =
-        parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true);
+    final boolean isHistoricalExtractorEnabledByUser =
+        parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true);
     historicalDataCollectionStartTime =
-        isHistoricalCollectorEnabledByUser && 
parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME)
+        isHistoricalExtractorEnabledByUser && 
parameters.hasAttribute(EXTRACTOR_HISTORY_START_TIME)
             ? DateTimeUtils.convertDatetimeStrToLong(
-                parameters.getString(COLLECTOR_HISTORY_START_TIME), 
ZoneId.systemDefault())
+                parameters.getString(EXTRACTOR_HISTORY_START_TIME), 
ZoneId.systemDefault())
             : Long.MIN_VALUE;
     historicalDataCollectionEndTime =
-        isHistoricalCollectorEnabledByUser && 
parameters.hasAttribute(COLLECTOR_HISTORY_END_TIME)
+        isHistoricalExtractorEnabledByUser && 
parameters.hasAttribute(EXTRACTOR_HISTORY_END_TIME)
             ? DateTimeUtils.convertDatetimeStrToLong(
-                parameters.getString(COLLECTOR_HISTORY_END_TIME), 
ZoneId.systemDefault())
+                parameters.getString(EXTRACTOR_HISTORY_END_TIME), 
ZoneId.systemDefault())
             : Long.MAX_VALUE;
 
-    // enable historical collector by default
+    // enable historical extractor by default
     historicalDataCollectionTimeLowerBound =
-        parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true)
+        parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true)
             ? Long.MIN_VALUE
             // We define the realtime data as the data generated after the 
creation time
             // of the pipe from user's perspective. But we still need to use
-            // PipeHistoricalDataRegionCollector to collect the realtime data 
generated between the
+            // PipeHistoricalDataRegionExtractor to extract the realtime data 
generated between the
             // creation time of the pipe and the time when the pipe starts, 
because those data
-            // can not be listened by PipeRealtimeDataRegionCollector, and 
should be collected by
-            // PipeHistoricalDataRegionCollector from implementation 
perspective.
+            // can not be listened by PipeRealtimeDataRegionExtractor, and 
should be collected by
+            // PipeHistoricalDataRegionExtractor from implementation 
perspective.
             : environment.getCreationTime();
 
     // Only invoke flushDataRegionAllTsFiles() when the pipe runs in the 
realtime only mode.
@@ -143,7 +143,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       return;
     }
 
-    dataRegion.writeLock("Pipe: create historical TsFile collector");
+    dataRegion.writeLock("Pipe: create historical TsFile extractor");
     try {
       dataRegion.syncCloseAllWorkingTsFileProcessors();
     } finally {
@@ -160,7 +160,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       return;
     }
 
-    dataRegion.writeLock("Pipe: start to collect historical TsFile");
+    dataRegion.writeLock("Pipe: start to extract historical TsFile");
     try {
       dataRegion.syncCloseAllWorkingTsFileProcessors();
 
@@ -224,10 +224,10 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     } catch (IOException e) {
       LOGGER.warn(
           String.format(
-              "failed to get the generation time of TsFile %s, collect it 
anyway",
+              "failed to get the generation time of TsFile %s, extract it 
anyway",
               resource.getTsFilePath()),
           e);
-      // If failed to get the generation time of the TsFile, we will collect 
the data in the TsFile
+      // If failed to get the generation time of the TsFile, we will extract 
the data in the TsFile
       // anyway.
       return true;
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionExtractor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
similarity index 81%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionExtractor.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index aec400b6d5d..aa0346aace9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -17,13 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.realtime;
+package org.apache.iotdb.db.pipe.extractor.realtime;
 
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import 
org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
-import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
-import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.pipe.api.PipeExtractor;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -45,11 +45,11 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
       throws Exception {
     pattern =
         parameters.getStringOrDefault(
-            PipeExtractorConstant.COLLECTOR_PATTERN_KEY,
-            PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
+            PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
+            PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
 
-    final PipeTaskCollectorRuntimeEnvironment environment =
-        (PipeTaskCollectorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
+    final PipeTaskExtractorRuntimeEnvironment environment =
+        (PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
     dataRegionId = String.valueOf(environment.getRegionId());
     pipeTaskMeta = environment.getPipeTaskMeta();
   }
@@ -65,7 +65,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
   }
 
   /** @param event the event from the storage engine */
-  public abstract void collect(PipeRealtimeCollectEvent event);
+  public abstract void extract(PipeRealtimeEvent event);
 
   public abstract boolean isNeedListenToTsFile();
 
@@ -81,7 +81,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
 
   @Override
   public String toString() {
-    return "PipeRealtimeDataRegionCollector{"
+    return "PipeRealtimeDataRegionExtractor{"
         + "pattern='"
         + pattern
         + '\''
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeExtractor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionFakeExtractor.java
similarity index 88%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeExtractor.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionFakeExtractor.java
index cffb273cee1..0f04ba475c5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeExtractor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionFakeExtractor.java
@@ -17,9 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.realtime;
+package org.apache.iotdb.db.pipe.extractor.realtime;
 
-import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -49,7 +49,7 @@ public class PipeRealtimeDataRegionFakeExtractor extends 
PipeRealtimeDataRegionE
   }
 
   @Override
-  public void collect(PipeRealtimeCollectEvent event) {
+  public void extract(PipeRealtimeEvent event) {
     // do nothing
   }
 
@@ -70,6 +70,6 @@ public class PipeRealtimeDataRegionFakeExtractor extends 
PipeRealtimeDataRegionE
 
   @Override
   public String toString() {
-    return "PipeRealtimeDataRegionFakeCollector{}";
+    return "PipeRealtimeDataRegionFakeExtractor{}";
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
similarity index 87%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridExtractor.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index a78ad713feb..06a0135a4ee 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -17,13 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.realtime;
+package org.apache.iotdb.db.pipe.extractor.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch;
-import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -37,7 +37,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
 
-  // This queue is used to store pending events collected by the method 
collect(). The method
+  // This queue is used to store pending events collected by the method 
extract(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
@@ -46,7 +46,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   }
 
   @Override
-  public void collect(PipeRealtimeCollectEvent event) {
+  public void extract(PipeRealtimeEvent event) {
     final Event eventToCollect = event.getEvent();
 
     if (eventToCollect instanceof TabletInsertionEvent) {
@@ -56,7 +56,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     } else {
       throw new UnsupportedOperationException(
           String.format(
-              "Unsupported event type %s for Hybrid Realtime Collector %s",
+              "Unsupported event type %s for hybrid realtime extractor %s",
               eventToCollect.getClass(), this));
     }
   }
@@ -71,10 +71,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     return true;
   }
 
-  private void collectTabletInsertion(PipeRealtimeCollectEvent event) {
+  private void collectTabletInsertion(PipeRealtimeEvent event) {
     if (isApproachingCapacity()) {
       event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
-      // if the pending queue is approaching capacity, we should not collect 
any more tablet events.
+      // if the pending queue is approaching capacity, we should not extract 
any more tablet events.
       // all the data represented by the tablet events should be carried by 
the following tsfile
       // event.
       return;
@@ -83,7 +83,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     if 
(!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)
         && !pendingQueue.offer(event)) {
       LOGGER.warn(
-          "collectTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridCollector {} has reached capacity, discard tablet 
event {}, current state {}",
+          "collectTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor {} has reached capacity, discard tablet 
event {}, current state {}",
           this,
           event,
           event.getTsFileEpoch().getState(this));
@@ -92,7 +92,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     }
   }
 
-  private void collectTsFileInsertion(PipeRealtimeCollectEvent event) {
+  private void collectTsFileInsertion(PipeRealtimeEvent event) {
     event
         .getTsFileEpoch()
         .migrateState(
@@ -102,7 +102,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
 
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
-          "collectTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridCollector {} has reached capacity, discard TsFile 
event {}, current state {}",
+          "collectTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor {} has reached capacity, discard TsFile 
event {}, current state {}",
           this,
           event,
           event.getTsFileEpoch().getState(this));
@@ -118,7 +118,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
 
   @Override
   public Event supply() {
-    PipeRealtimeCollectEvent collectEvent = (PipeRealtimeCollectEvent) 
pendingQueue.poll();
+    PipeRealtimeEvent collectEvent = (PipeRealtimeEvent) pendingQueue.poll();
 
     while (collectEvent != null) {
       Event suppliedEvent;
@@ -132,7 +132,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
       } else {
         throw new UnsupportedOperationException(
             String.format(
-                "Unsupported event type %s for Hybrid Realtime Collector %s to 
supply.",
+                "Unsupported event type %s for hybrid realtime extractor %s to 
supply.",
                 eventToSupply.getClass(), this));
       }
 
@@ -141,14 +141,14 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
         return suppliedEvent;
       }
 
-      collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
+      collectEvent = (PipeRealtimeEvent) pendingQueue.poll();
     }
 
     // means the pending queue is empty.
     return null;
   }
 
-  private Event supplyTabletInsertion(PipeRealtimeCollectEvent event) {
+  private Event supplyTabletInsertion(PipeRealtimeEvent event) {
     event
         .getTsFileEpoch()
         .migrateState(
@@ -172,7 +172,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     return null;
   }
 
-  private Event supplyTsFileInsertion(PipeRealtimeCollectEvent event) {
+  private Event supplyTsFileInsertion(PipeRealtimeEvent event) {
     event
         .getTsFileEpoch()
         .migrateState(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogExtractor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
similarity index 86%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogExtractor.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index b50e48536c2..641150826ee 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -17,12 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.realtime;
+package org.apache.iotdb.db.pipe.extractor.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch;
-import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -35,7 +35,7 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionLogExtractor.class);
 
-  // This queue is used to store pending events collected by the method 
collect(). The method
+  // This queue is used to store pending events collected by the method 
extract(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
@@ -44,7 +44,7 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
   }
 
   @Override
-  public void collect(PipeRealtimeCollectEvent event) {
+  public void extract(PipeRealtimeEvent event) {
     event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TABLET);
 
     if (!(event.getEvent() instanceof TabletInsertionEvent)) {
@@ -53,7 +53,7 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
 
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
-          "collect: pending queue of PipeRealtimeDataRegionLogCollector {} has 
reached capacity, discard tablet event {}, current state {}",
+          "extract: pending queue of PipeRealtimeDataRegionLogExtractor {} has 
reached capacity, discard tablet event {}, current state {}",
           this,
           event,
           event.getTsFileEpoch().getState(this));
@@ -74,7 +74,7 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
 
   @Override
   public Event supply() {
-    PipeRealtimeCollectEvent collectEvent = (PipeRealtimeCollectEvent) 
pendingQueue.poll();
+    PipeRealtimeEvent collectEvent = (PipeRealtimeEvent) pendingQueue.poll();
 
     while (collectEvent != null) {
       Event suppliedEvent = null;
@@ -99,7 +99,7 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
         return suppliedEvent;
       }
 
-      collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
+      collectEvent = (PipeRealtimeEvent) pendingQueue.poll();
     }
 
     // means the pending queue is empty.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileExtractor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
similarity index 86%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileExtractor.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index 2bc9abee7ad..da90c60e53f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -17,12 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.realtime;
+package org.apache.iotdb.db.pipe.extractor.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch;
-import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -35,7 +35,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileExtractor.class);
 
-  // This queue is used to store pending events collected by the method 
collect(). The method
+  // This queue is used to store pending events collected by the method 
extract(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
@@ -44,7 +44,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
   }
 
   @Override
-  public void collect(PipeRealtimeCollectEvent event) {
+  public void extract(PipeRealtimeEvent event) {
     event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
 
     if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
@@ -53,7 +53,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
 
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
-          "collect: pending queue of PipeRealtimeDataRegionTsFileCollector {} 
has reached capacity, discard TsFile event {}, current state {}",
+          "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} 
has reached capacity, discard TsFile event {}, current state {}",
           this,
           event,
           event.getTsFileEpoch().getState(this));
@@ -74,7 +74,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
 
   @Override
   public Event supply() {
-    PipeRealtimeCollectEvent collectEvent = (PipeRealtimeCollectEvent) 
pendingQueue.poll();
+    PipeRealtimeEvent collectEvent = (PipeRealtimeEvent) pendingQueue.poll();
 
     while (collectEvent != null) {
       Event suppliedEvent = null;
@@ -100,7 +100,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
         return suppliedEvent;
       }
 
-      collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
+      collectEvent = (PipeRealtimeEvent) pendingQueue.poll();
     }
 
     // means the pending queue is empty.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
similarity index 98%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
index bc22132d4cd..0b335bf11e7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.realtime.assigner;
+package org.apache.iotdb.db.pipe.extractor.realtime.assigner;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
similarity index 60%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
index 116f63f3073..d5ca1a10b90 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
@@ -17,61 +17,61 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.realtime.assigner;
+package org.apache.iotdb.db.pipe.extractor.realtime.assigner;
 
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
-import 
org.apache.iotdb.db.pipe.collector.realtime.matcher.CachedSchemaPatternMatcher;
-import 
org.apache.iotdb.db.pipe.collector.realtime.matcher.PipeDataRegionMatcher;
-import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.matcher.CachedSchemaPatternMatcher;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.matcher.PipeDataRegionMatcher;
 
 import com.lmax.disruptor.dsl.ProducerType;
 
 public class PipeDataRegionAssigner {
 
-  /** The matcher is used to match the event with the collector based on the 
pattern. */
+  /** The matcher is used to match the event with the extractor based on the 
pattern. */
   private final PipeDataRegionMatcher matcher;
 
-  /** The disruptor is used to assign the event to the collector. */
-  private final DisruptorQueue<PipeRealtimeCollectEvent> disruptor;
+  /** The disruptor is used to assign the event to the extractor. */
+  private final DisruptorQueue<PipeRealtimeEvent> disruptor;
 
   public PipeDataRegionAssigner() {
     this.matcher = new CachedSchemaPatternMatcher();
     this.disruptor =
-        new DisruptorQueue.Builder<PipeRealtimeCollectEvent>()
+        new DisruptorQueue.Builder<PipeRealtimeEvent>()
             .setProducerType(ProducerType.SINGLE)
-            .addEventHandler(this::assignToCollector)
+            .addEventHandler(this::assignToExtractor)
             .build();
   }
 
-  public void publishToAssign(PipeRealtimeCollectEvent event) {
+  public void publishToAssign(PipeRealtimeEvent event) {
     event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
     disruptor.publish(event);
   }
 
-  public void assignToCollector(PipeRealtimeCollectEvent event, long sequence, 
boolean endOfBatch) {
+  public void assignToExtractor(PipeRealtimeEvent event, long sequence, 
boolean endOfBatch) {
     matcher
         .match(event)
         .forEach(
-            collector -> {
-              final PipeRealtimeCollectEvent copiedEvent =
+            extractor -> {
+              final PipeRealtimeEvent copiedEvent =
                   event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-                      collector.getPipeTaskMeta(), collector.getPattern());
+                      extractor.getPipeTaskMeta(), extractor.getPattern());
               
copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
-              collector.collect(copiedEvent);
+              extractor.extract(copiedEvent);
             });
     event.gcSchemaInfo();
     event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName());
   }
 
-  public void startAssignTo(PipeRealtimeDataRegionExtractor collector) {
-    matcher.register(collector);
+  public void startAssignTo(PipeRealtimeDataRegionExtractor extractor) {
+    matcher.register(extractor);
   }
 
-  public void stopAssignTo(PipeRealtimeDataRegionExtractor collector) {
-    matcher.deregister(collector);
+  public void stopAssignTo(PipeRealtimeDataRegionExtractor extractor) {
+    matcher.deregister(extractor);
   }
 
-  public boolean notMoreCollectorNeededToBeAssigned() {
+  public boolean notMoreExtractorNeededToBeAssigned() {
     return matcher.getRegisterCount() == 0;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
similarity index 73%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
index a4806c30774..55a500bb1ed 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
@@ -17,9 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.realtime.epoch;
+package org.apache.iotdb.db.pipe.extractor.realtime.epoch;
 
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -29,23 +29,23 @@ public class TsFileEpoch {
 
   private final String filePath;
   private final ConcurrentMap<PipeRealtimeDataRegionExtractor, 
AtomicReference<State>>
-      dataRegionCollector2State;
+      dataRegionExtractor2State;
 
   public TsFileEpoch(String filePath) {
     this.filePath = filePath;
-    this.dataRegionCollector2State = new ConcurrentHashMap<>();
+    this.dataRegionExtractor2State = new ConcurrentHashMap<>();
   }
 
-  public TsFileEpoch.State getState(PipeRealtimeDataRegionExtractor collector) 
{
-    return dataRegionCollector2State
-        .computeIfAbsent(collector, o -> new AtomicReference<>(State.EMPTY))
+  public TsFileEpoch.State getState(PipeRealtimeDataRegionExtractor extractor) 
{
+    return dataRegionExtractor2State
+        .computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
         .get();
   }
 
   public void migrateState(
-      PipeRealtimeDataRegionExtractor collector, TsFileEpochStateMigrator 
visitor) {
-    dataRegionCollector2State
-        .computeIfAbsent(collector, o -> new AtomicReference<>(State.EMPTY))
+      PipeRealtimeDataRegionExtractor extractor, TsFileEpochStateMigrator 
visitor) {
+    dataRegionExtractor2State
+        .computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
         .getAndUpdate(visitor::migrate);
   }
 
@@ -55,8 +55,8 @@ public class TsFileEpoch {
         + "filePath='"
         + filePath
         + '\''
-        + ", dataRegionCollector2State="
-        + dataRegionCollector2State
+        + ", dataRegionExtractor2State="
+        + dataRegionExtractor2State
         + '}';
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
similarity index 87%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
index dbecc2f2a72..ebf57441fdc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
@@ -17,13 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.realtime.epoch;
+package org.apache.iotdb.db.pipe.extractor.realtime.epoch;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
-import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +41,7 @@ public class TsFileEpochManager {
 
   private final Map<String, TsFileEpoch> filePath2Epoch = new HashMap<>();
 
-  public PipeRealtimeCollectEvent bindPipeTsFileInsertionEvent(
+  public PipeRealtimeEvent bindPipeTsFileInsertionEvent(
       PipeTsFileInsertionEvent event, TsFileResource resource) {
     final String filePath = resource.getTsFilePath();
 
@@ -53,7 +53,7 @@ public class TsFileEpochManager {
           return new TsFileEpoch(path);
         });
 
-    return new PipeRealtimeCollectEvent(
+    return new PipeRealtimeEvent(
         event,
         filePath2Epoch.remove(filePath),
         resource.getDevices().stream()
@@ -61,9 +61,9 @@ public class TsFileEpochManager {
         event.getPattern());
   }
 
-  public PipeRealtimeCollectEvent bindPipeInsertNodeTabletInsertionEvent(
+  public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent(
       PipeInsertNodeTabletInsertionEvent event, InsertNode node, 
TsFileResource resource) {
-    return new PipeRealtimeCollectEvent(
+    return new PipeRealtimeEvent(
         event,
         filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), 
TsFileEpoch::new),
         Collections.singletonMap(node.getDevicePath().getFullPath(), 
node.getMeasurements()),
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochStateMigrator.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochStateMigrator.java
similarity index 94%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochStateMigrator.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochStateMigrator.java
index b95d3349b47..b4b6ada0ba9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochStateMigrator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochStateMigrator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.realtime.epoch;
+package org.apache.iotdb.db.pipe.extractor.realtime.epoch;
 
 @FunctionalInterface
 public interface TsFileEpochStateMigrator {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
similarity index 73%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index 00758562073..50dea414a78 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -17,14 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.realtime.listener;
+package org.apache.iotdb.db.pipe.extractor.realtime.listener;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
-import 
org.apache.iotdb.db.pipe.collector.realtime.assigner.PipeDataRegionAssigner;
-import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEventFactory;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.assigner.PipeDataRegionAssigner;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 
 import java.util.concurrent.ConcurrentHashMap;
@@ -40,49 +40,49 @@ import java.util.concurrent.atomic.AtomicInteger;
  *
  * <p>All events collected by this listener will be first published to 
different
  * PipeEventDataRegionAssigners (identified by data region id), and then 
PipeEventDataRegionAssigner
- * will filter events and assign them to different 
PipeRealtimeEventDataRegionCollectors.
+ * will filter events and assign them to different 
PipeRealtimeEventDataRegionExtractors.
  */
 public class PipeInsertionDataNodeListener {
 
   private final ConcurrentMap<String, PipeDataRegionAssigner> 
dataRegionId2Assigner =
       new ConcurrentHashMap<>();
 
-  private final AtomicInteger listenToTsFileCollectorCount = new 
AtomicInteger(0);
-  private final AtomicInteger listenToInsertNodeCollectorCount = new 
AtomicInteger(0);
+  private final AtomicInteger listenToTsFileExtractorCount = new 
AtomicInteger(0);
+  private final AtomicInteger listenToInsertNodeExtractorCount = new 
AtomicInteger(0);
 
   //////////////////////////// start & stop ////////////////////////////
 
   public synchronized void startListenAndAssign(
-      String dataRegionId, PipeRealtimeDataRegionExtractor collector) {
+      String dataRegionId, PipeRealtimeDataRegionExtractor extractor) {
     dataRegionId2Assigner
         .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner())
-        .startAssignTo(collector);
+        .startAssignTo(extractor);
 
-    if (collector.isNeedListenToTsFile()) {
-      listenToTsFileCollectorCount.incrementAndGet();
+    if (extractor.isNeedListenToTsFile()) {
+      listenToTsFileExtractorCount.incrementAndGet();
     }
-    if (collector.isNeedListenToInsertNode()) {
-      listenToInsertNodeCollectorCount.incrementAndGet();
+    if (extractor.isNeedListenToInsertNode()) {
+      listenToInsertNodeExtractorCount.incrementAndGet();
     }
   }
 
   public synchronized void stopListenAndAssign(
-      String dataRegionId, PipeRealtimeDataRegionExtractor collector) {
+      String dataRegionId, PipeRealtimeDataRegionExtractor extractor) {
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
     if (assigner == null) {
       return;
     }
 
-    assigner.stopAssignTo(collector);
+    assigner.stopAssignTo(extractor);
 
-    if (collector.isNeedListenToTsFile()) {
-      listenToTsFileCollectorCount.decrementAndGet();
+    if (extractor.isNeedListenToTsFile()) {
+      listenToTsFileExtractorCount.decrementAndGet();
     }
-    if (collector.isNeedListenToInsertNode()) {
-      listenToInsertNodeCollectorCount.decrementAndGet();
+    if (extractor.isNeedListenToInsertNode()) {
+      listenToInsertNodeExtractorCount.decrementAndGet();
     }
 
-    if (assigner.notMoreCollectorNeededToBeAssigned()) {
+    if (assigner.notMoreExtractorNeededToBeAssigned()) {
       // the removed assigner will is the same as the one referenced by the 
variable `assigner`
       dataRegionId2Assigner.remove(dataRegionId);
       // this will help to release the memory occupied by the assigner
@@ -93,9 +93,9 @@ public class PipeInsertionDataNodeListener {
   //////////////////////////// listen to events ////////////////////////////
 
   public void listenToTsFile(String dataRegionId, TsFileResource 
tsFileResource) {
-    // wo don't judge whether listenToTsFileCollectorCount.get() == 0 here, 
because
+    // wo don't judge whether listenToTsFileExtractorCount.get() == 0 here, 
because
     // when using SimpleProgressIndex, the tsfile event needs to be assigned 
to the
-    // collector even if listenToTsFileCollectorCount.get() == 0 to record the 
progress
+    // extractor even if listenToTsFileExtractorCount.get() == 0 to record the 
progress
 
     PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
 
@@ -106,7 +106,7 @@ public class PipeInsertionDataNodeListener {
       return;
     }
 
-    
assigner.publishToAssign(PipeRealtimeCollectEventFactory.createCollectEvent(tsFileResource));
+    
assigner.publishToAssign(PipeRealtimeEventFactory.createCollectEvent(tsFileResource));
   }
 
   public void listenToInsertNode(
@@ -114,7 +114,7 @@ public class PipeInsertionDataNodeListener {
       WALEntryHandler walEntryHandler,
       InsertNode insertNode,
       TsFileResource tsFileResource) {
-    if (listenToInsertNodeCollectorCount.get() == 0) {
+    if (listenToInsertNodeExtractorCount.get() == 0) {
       return;
     }
 
@@ -126,8 +126,7 @@ public class PipeInsertionDataNodeListener {
     }
 
     assigner.publishToAssign(
-        PipeRealtimeCollectEventFactory.createCollectEvent(
-            walEntryHandler, insertNode, tsFileResource));
+        PipeRealtimeEventFactory.createCollectEvent(walEntryHandler, 
insertNode, tsFileResource));
   }
 
   /////////////////////////////// singleton ///////////////////////////////
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/CachedSchemaPatternMatcher.java
similarity index 67%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/CachedSchemaPatternMatcher.java
index 0af3151ed9a..bb046ee972b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -17,11 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.realtime.matcher;
+package org.apache.iotdb.db.pipe.extractor.realtime.matcher;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
-import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 
 import com.github.benmanes.caffeine.cache.Cache;
@@ -40,35 +40,35 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
 
   private final ReentrantReadWriteLock lock;
 
-  private final Set<PipeRealtimeDataRegionExtractor> collectors;
-  private final Cache<String, Set<PipeRealtimeDataRegionExtractor>> 
deviceToCollectorsCache;
+  private final Set<PipeRealtimeDataRegionExtractor> extractors;
+  private final Cache<String, Set<PipeRealtimeDataRegionExtractor>> 
deviceToExtractorsCache;
 
   public CachedSchemaPatternMatcher() {
     this.lock = new ReentrantReadWriteLock();
-    this.collectors = new HashSet<>();
-    this.deviceToCollectorsCache =
+    this.extractors = new HashSet<>();
+    this.deviceToExtractorsCache =
         Caffeine.newBuilder()
             
.maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize())
             .build();
   }
 
   @Override
-  public void register(PipeRealtimeDataRegionExtractor collector) {
+  public void register(PipeRealtimeDataRegionExtractor extractor) {
     lock.writeLock().lock();
     try {
-      collectors.add(collector);
-      deviceToCollectorsCache.invalidateAll();
+      extractors.add(extractor);
+      deviceToExtractorsCache.invalidateAll();
     } finally {
       lock.writeLock().unlock();
     }
   }
 
   @Override
-  public void deregister(PipeRealtimeDataRegionExtractor collector) {
+  public void deregister(PipeRealtimeDataRegionExtractor extractor) {
     lock.writeLock().lock();
     try {
-      collectors.remove(collector);
-      deviceToCollectorsCache.invalidateAll();
+      extractors.remove(extractor);
+      deviceToExtractorsCache.invalidateAll();
     } finally {
       lock.writeLock().unlock();
     }
@@ -78,75 +78,75 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   public int getRegisterCount() {
     lock.readLock().lock();
     try {
-      return collectors.size();
+      return extractors.size();
     } finally {
       lock.readLock().unlock();
     }
   }
 
   @Override
-  public Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeCollectEvent 
event) {
-    final Set<PipeRealtimeDataRegionExtractor> matchedCollectors = new 
HashSet<>();
+  public Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeEvent event) {
+    final Set<PipeRealtimeDataRegionExtractor> matchedExtractors = new 
HashSet<>();
 
     lock.readLock().lock();
     try {
-      if (collectors.isEmpty()) {
-        return matchedCollectors;
+      if (extractors.isEmpty()) {
+        return matchedExtractors;
       }
 
       for (final Map.Entry<String, String[]> entry : 
event.getSchemaInfo().entrySet()) {
         final String device = entry.getKey();
         final String[] measurements = entry.getValue();
 
-        // 1. try to get matched collectors from cache, if not success, match 
them by device
-        final Set<PipeRealtimeDataRegionExtractor> collectorsFilteredByDevice =
-            deviceToCollectorsCache.get(device, 
this::filterCollectorsByDevice);
+        // 1. try to get matched extractors from cache, if not success, match 
them by device
+        final Set<PipeRealtimeDataRegionExtractor> extractorsFilteredByDevice =
+            deviceToExtractorsCache.get(device, 
this::filterExtractorsByDevice);
         // this would not happen
-        if (collectorsFilteredByDevice == null) {
+        if (extractorsFilteredByDevice == null) {
           LOGGER.warn("Match result NPE when handle device {}", device);
           continue;
         }
 
-        // 2. filter matched candidate collectors by measurements
+        // 2. filter matched candidate extractors by measurements
         if (measurements.length == 0) {
-          // `measurements` is empty (only in case of tsfile event). match all 
collectors.
+          // `measurements` is empty (only in case of tsfile event). match all 
extractors.
           //
           // case 1: for example, pattern is root.a.b, device is root.a.b.c, 
measurement can be any.
-          // in this case, the collector can be matched without checking the 
measurements.
+          // in this case, the extractor can be matched without checking the 
measurements.
           //
           // case 2: for example, pattern is root.a.b.c, device is root.a.b.
-          // in this situation, the collector can not be matched in some 
cases, but we can not know
-          // all the measurements of the device in an efficient way, so we 
ASSUME that the collector
+          // in this situation, the extractor can not be matched in some 
cases, but we can not know
+          // all the measurements of the device in an efficient way, so we 
ASSUME that the extractor
           // can be matched. this is a trade-off between efficiency and 
accuracy. for most user's
           // usage, this is acceptable, which may result in some unnecessary 
data processing and
           // transmission, but will not result in data loss.
-          matchedCollectors.addAll(collectorsFilteredByDevice);
+          matchedExtractors.addAll(extractorsFilteredByDevice);
         } else {
-          // `measurements` is not empty (only in case of tablet event). match 
collectors by
+          // `measurements` is not empty (only in case of tablet event). match 
extractors by
           // measurements.
-          collectorsFilteredByDevice.forEach(
-              collector -> {
-                final String pattern = collector.getPattern();
+          extractorsFilteredByDevice.forEach(
+              extractor -> {
+                final String pattern = extractor.getPattern();
 
                 // case 1: for example, pattern is root.a.b and device is 
root.a.b.c
-                // in this case, the collector can be matched without checking 
the measurements
+                // in this case, the extractor can be matched without checking 
the measurements
                 if (pattern.length() <= device.length()) {
-                  matchedCollectors.add(collector);
+                  matchedExtractors.add(extractor);
                 }
                 // case 2: for example, pattern is root.a.b.c and device is 
root.a.b
                 // in this case, we need to check the full path
                 else {
                   for (String measurement : measurements) {
                     // for example, pattern is root.a.b.c, device is root.a.b 
and measurement is c
-                    // in this case, the collector can be matched. other cases 
are not matched.
+                    // in this case, the extractor can be matched. other cases 
are not matched.
                     // please note that there should be a . between device and 
measurement.
                     if (
                     // low cost check comes first
                     pattern.length() == device.length() + measurement.length() 
+ 1
                         // high cost check comes later
                         && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + 
measurement)) {
-                      matchedCollectors.add(collector);
-                      // there would be no more matched collectors because the 
measurements are
+                      matchedExtractors.add(extractor);
+                      // there would be no more matched extractors because the 
measurements are
                       // unique
                       break;
                     }
@@ -155,7 +155,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
               });
         }
 
-        if (matchedCollectors.size() == collectors.size()) {
+        if (matchedExtractors.size() == extractors.size()) {
           break;
         }
       }
@@ -163,36 +163,36 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
       lock.readLock().unlock();
     }
 
-    return matchedCollectors;
+    return matchedExtractors;
   }
 
-  private Set<PipeRealtimeDataRegionExtractor> filterCollectorsByDevice(String 
device) {
-    final Set<PipeRealtimeDataRegionExtractor> filteredCollectors = new 
HashSet<>();
+  private Set<PipeRealtimeDataRegionExtractor> filterExtractorsByDevice(String 
device) {
+    final Set<PipeRealtimeDataRegionExtractor> filteredExtractors = new 
HashSet<>();
 
-    for (PipeRealtimeDataRegionExtractor collector : collectors) {
-      String pattern = collector.getPattern();
+    for (PipeRealtimeDataRegionExtractor extractor : extractors) {
+      String pattern = extractor.getPattern();
       if (
       // for example, pattern is root.a.b and device is root.a.b.c
-      // in this case, the collector can be matched without checking the 
measurements
+      // in this case, the extractor can be matched without checking the 
measurements
       (pattern.length() <= device.length() && device.startsWith(pattern))
           // for example, pattern is root.a.b.c and device is root.a.b
-          // in this case, the collector can be selected as candidate, but the 
measurements should
+          // in this case, the extractor can be selected as candidate, but the 
measurements should
           // be checked further
           || (pattern.length() > device.length() && 
pattern.startsWith(device))) {
-        filteredCollectors.add(collector);
+        filteredExtractors.add(extractor);
       }
     }
 
-    return filteredCollectors;
+    return filteredExtractors;
   }
 
   @Override
   public void clear() {
     lock.writeLock().lock();
     try {
-      collectors.clear();
-      deviceToCollectorsCache.invalidateAll();
-      deviceToCollectorsCache.cleanUp();
+      extractors.clear();
+      deviceToExtractorsCache.invalidateAll();
+      deviceToExtractorsCache.cleanUp();
     } finally {
       lock.writeLock().unlock();
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/PipeDataRegionMatcher.java
similarity index 57%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/PipeDataRegionMatcher.java
index 44b544329c2..8b145c2fac7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/PipeDataRegionMatcher.java
@@ -17,36 +17,36 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector.realtime.matcher;
+package org.apache.iotdb.db.pipe.extractor.realtime.matcher;
 
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
-import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
 
 import java.util.Set;
 
 public interface PipeDataRegionMatcher {
 
   /**
-   * Register a collector. If the collector's pattern matches the event's 
schema info, the event
-   * will be assigned to the collector.
+   * Register a extractor. If the extractor's pattern matches the event's 
schema info, the event
+   * will be assigned to the extractor.
    */
-  void register(PipeRealtimeDataRegionExtractor collector);
+  void register(PipeRealtimeDataRegionExtractor extractor);
 
-  /** Deregister a collector. */
-  void deregister(PipeRealtimeDataRegionExtractor collector);
+  /** Deregister a extractor. */
+  void deregister(PipeRealtimeDataRegionExtractor extractor);
 
-  /** Get the number of registered collectors in this matcher. */
+  /** Get the number of registered extractors in this matcher. */
   int getRegisterCount();
 
   /**
-   * Match the event's schema info with the registered collectors' patterns. 
If the event's schema
-   * info matches the pattern of a collector, the collector will be returned.
+   * Match the event's schema info with the registered extractors' patterns. 
If the event's schema
+   * info matches the pattern of a extractor, the extractor will be returned.
    *
    * @param event the event to be matched
-   * @return the matched collectors
+   * @return the matched extractors
    */
-  Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeCollectEvent event);
+  Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeEvent event);
 
-  /** Clear all the registered collectors and internal data structures. */
+  /** Clear all the registered extractors and internal data structures. */
   void clear();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
index 18ea33be4ff..7de66c55ca9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
@@ -54,7 +54,7 @@ public class PipeDoNothingProcessor implements PipeProcessor {
       final EnrichedEvent enrichedEvent = (EnrichedEvent) tabletInsertionEvent;
       if (enrichedEvent
           .getPattern()
-          .equals(PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
+          .equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE)) {
         eventCollector.collect(tabletInsertionEvent);
       } else {
         tabletInsertionEvent
@@ -86,7 +86,7 @@ public class PipeDoNothingProcessor implements PipeProcessor {
     if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
       final PipeTsFileInsertionEvent enrichedEvent =
           (PipeTsFileInsertionEvent) tsFileInsertionEvent;
-      if 
(enrichedEvent.getPattern().equals(PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)
+      if 
(enrichedEvent.getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE)
           && !enrichedEvent.hasTimeFilter()) {
         eventCollector.collect(tsFileInsertionEvent);
       } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
index 887b31a6e3a..3639a4c60da 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
@@ -27,44 +27,44 @@ public class PipeTask {
   private final String pipeName;
   private final TConsensusGroupId dataRegionId;
 
-  private final PipeTaskStage collectorStage;
+  private final PipeTaskStage extractorStage;
   private final PipeTaskStage processorStage;
   private final PipeTaskStage connectorStage;
 
   PipeTask(
       String pipeName,
       TConsensusGroupId dataRegionId,
-      PipeTaskStage collectorStage,
+      PipeTaskStage extractorStage,
       PipeTaskStage processorStage,
       PipeTaskStage connectorStage) {
     this.pipeName = pipeName;
     this.dataRegionId = dataRegionId;
 
-    this.collectorStage = collectorStage;
+    this.extractorStage = extractorStage;
     this.processorStage = processorStage;
     this.connectorStage = connectorStage;
   }
 
   public void create() {
-    collectorStage.create();
+    extractorStage.create();
     processorStage.create();
     connectorStage.create();
   }
 
   public void drop() {
-    collectorStage.drop();
+    extractorStage.drop();
     processorStage.drop();
     connectorStage.drop();
   }
 
   public void start() {
-    collectorStage.start();
+    extractorStage.start();
     processorStage.start();
     connectorStage.start();
   }
 
   public void stop() {
-    collectorStage.stop();
+    extractorStage.stop();
     processorStage.stop();
     connectorStage.stop();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 5a9e40ccfb0..47353d89ecf 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -22,8 +22,8 @@ package org.apache.iotdb.db.pipe.task;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.pipe.task.stage.PipeTaskCollectorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
+import org.apache.iotdb.db.pipe.task.stage.PipeTaskExtractorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
 
 public class PipeTaskBuilder {
@@ -40,14 +40,14 @@ public class PipeTaskBuilder {
   }
 
   public PipeTask build() {
-    // event flow: collector -> processor -> connector
+    // event flow: extractor -> processor -> connector
 
-    // we first build the collector and connector, then build the processor.
-    final PipeTaskCollectorStage collectorStage =
-        new PipeTaskCollectorStage(
+    // we first build the extractor and connector, then build the processor.
+    final PipeTaskExtractorStage extractorStage =
+        new PipeTaskExtractorStage(
             pipeStaticMeta.getPipeName(),
             pipeStaticMeta.getCreationTime(),
-            pipeStaticMeta.getCollectorParameters(),
+            pipeStaticMeta.getExtractorParameters(),
             dataRegionId,
             pipeTaskMeta);
 
@@ -57,17 +57,17 @@ public class PipeTaskBuilder {
             pipeStaticMeta.getCreationTime(),
             pipeStaticMeta.getConnectorParameters());
 
-    // the processor connects the collector and connector.
+    // the processor connects the extractor and connector.
     final PipeTaskProcessorStage processorStage =
         new PipeTaskProcessorStage(
             pipeStaticMeta.getPipeName(),
             pipeStaticMeta.getCreationTime(),
             pipeStaticMeta.getProcessorParameters(),
             dataRegionId,
-            collectorStage.getEventSupplier(),
+            extractorStage.getEventSupplier(),
             connectorStage.getPipeConnectorPendingQueue());
 
     return new PipeTask(
-        pipeStaticMeta.getPipeName(), dataRegionId, collectorStage, 
processorStage, connectorStage);
+        pipeStaticMeta.getPipeName(), dataRegionId, extractorStage, 
processorStage, connectorStage);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java
index c00d294be00..74efb7a2122 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.pipe.api.event.Event;
 public interface EventSupplier {
 
   /**
-   * @return the event to be supplied. the event may be null if the collector 
has no more events at
-   *     the moment, but the collector is still running for more events.
+   * @return the event to be supplied. the event may be null if the extractor 
has no more events at
+   *     the moment, but the extractor is still running for more events.
    * @throws Exception if the supplier fails to supply the event.
    */
   @SuppressWarnings("squid:S00112") // Exception is thrown by the interface
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
similarity index 79%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
index 807c7b0c758..c2bf2ea2704 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
@@ -23,47 +23,47 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.collector.IoTDBDataRegionExtractor;
 import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.db.pipe.extractor.IoTDBDataRegionExtractor;
 import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
 import org.apache.iotdb.pipe.api.PipeExtractor;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public class PipeTaskCollectorStage extends PipeTaskStage {
+public class PipeTaskExtractorStage extends PipeTaskStage {
 
   private final PipeExtractor pipeExtractor;
 
-  public PipeTaskCollectorStage(
+  public PipeTaskExtractorStage(
       String pipeName,
       long creationTime,
-      PipeParameters collectorParameters,
+      PipeParameters extractorParameters,
       TConsensusGroupId dataRegionId,
       PipeTaskMeta pipeTaskMeta) {
     pipeExtractor =
-        collectorParameters
+        extractorParameters
                 .getStringOrDefault(
-                    PipeExtractorConstant.COLLECTOR_KEY,
+                    PipeExtractorConstant.EXTRACTOR_KEY,
                     BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
                 .equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
             ? new IoTDBDataRegionExtractor()
-            : PipeAgent.plugin().reflectCollector(collectorParameters);
+            : PipeAgent.plugin().reflectExtractor(extractorParameters);
 
-    // validate and customize should be called before createSubtask. this 
allows collector exposing
+    // validate and customize should be called before createSubtask. this 
allows extractor exposing
     // exceptions in advance.
     try {
-      // 1. validate collector parameters
-      pipeExtractor.validate(new PipeParameterValidator(collectorParameters));
+      // 1. validate extractor parameters
+      pipeExtractor.validate(new PipeParameterValidator(extractorParameters));
 
-      // 2. customize collector
+      // 2. customize extractor
       final PipeTaskRuntimeConfiguration runtimeConfiguration =
           new PipeTaskRuntimeConfiguration(
-              new PipeTaskCollectorRuntimeEnvironment(
+              new PipeTaskExtractorRuntimeEnvironment(
                   pipeName, creationTime, dataRegionId.getId(), pipeTaskMeta));
-      pipeExtractor.customize(collectorParameters, runtimeConfiguration);
+      pipeExtractor.customize(extractorParameters, runtimeConfiguration);
     } catch (Exception e) {
       throw new PipeException(e.getMessage(), e);
     }
@@ -85,7 +85,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
 
   @Override
   public void stopSubtask() throws PipeException {
-    // collector continuously collects data, so do nothing in stop
+    // extractor continuously collects data, so do nothing in stop
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index fade5393d72..1e4d4e8a515 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -70,7 +70,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
             ? new PipeDoNothingProcessor()
             : PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
 
-    // validate and customize should be called before createSubtask. this 
allows collector exposing
+    // validate and customize should be called before createSubtask. this 
allows extractor exposing
     // exceptions in advance.
     try {
       // 1. validate processor parameters
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java
similarity index 84%
rename from 
server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
rename to 
server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java
index e84b8195956..f1aa273bfaa 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java
@@ -17,14 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector;
+package org.apache.iotdb.db.pipe.extractor;
 
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
-import 
org.apache.iotdb.db.pipe.collector.realtime.matcher.CachedSchemaPatternMatcher;
 import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
-import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.matcher.CachedSchemaPatternMatcher;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -70,10 +70,10 @@ public class CachedSchemaPatternMatcherTest {
         new PipeParameters(
             new HashMap<String, String>() {
               {
-                put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, "root");
+                put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root");
               }
             }),
-        new PipeTaskRuntimeConfiguration(new 
PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null)));
+        new PipeTaskRuntimeConfiguration(new 
PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null)));
     collectorList.add(databaseCollector);
 
     int deviceCollectorNum = 10;
@@ -85,11 +85,11 @@ public class CachedSchemaPatternMatcherTest {
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, "root." + 
finalI1);
+                  put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root." + 
finalI1);
                 }
               }),
           new PipeTaskRuntimeConfiguration(
-              new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null)));
+              new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null)));
       collectorList.add(deviceCollector);
       for (int j = 0; j < seriesCollectorNum; j++) {
         PipeRealtimeDataRegionExtractor seriesCollector = new 
PipeRealtimeDataRegionFakeExtractor();
@@ -100,12 +100,12 @@ public class CachedSchemaPatternMatcherTest {
                 new HashMap<String, String>() {
                   {
                     put(
-                        PipeExtractorConstant.COLLECTOR_PATTERN_KEY,
+                        PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
                         "root." + finalI + "." + finalJ);
                   }
                 }),
             new PipeTaskRuntimeConfiguration(
-                new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null)));
+                new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null)));
         collectorList.add(seriesCollector);
       }
     }
@@ -126,16 +126,16 @@ public class CachedSchemaPatternMatcherTest {
     long totalTime = 0;
     for (int i = 0; i < epochNum; i++) {
       for (int j = 0; j < deviceNum; j++) {
-        PipeRealtimeCollectEvent event =
-            new PipeRealtimeCollectEvent(
+        PipeRealtimeEvent event =
+            new PipeRealtimeEvent(
                 null, null, Collections.singletonMap("root." + i, 
measurements), "root");
         long startTime = System.currentTimeMillis();
-        matcher.match(event).forEach(collector -> collector.collect(event));
+        matcher.match(event).forEach(collector -> collector.extract(event));
         totalTime += (System.currentTimeMillis() - startTime);
       }
-      PipeRealtimeCollectEvent event = new PipeRealtimeCollectEvent(null, 
null, deviceMap, "root");
+      PipeRealtimeEvent event = new PipeRealtimeEvent(null, null, deviceMap, 
"root");
       long startTime = System.currentTimeMillis();
-      matcher.match(event).forEach(collector -> collector.collect(event));
+      matcher.match(event).forEach(collector -> collector.extract(event));
       totalTime += (System.currentTimeMillis() - startTime);
     }
     System.out.println("matcher.getRegisterCount() = " + 
matcher.getRegisterCount());
@@ -157,7 +157,7 @@ public class CachedSchemaPatternMatcherTest {
     }
 
     @Override
-    public void collect(PipeRealtimeCollectEvent event) {
+    public void extract(PipeRealtimeEvent event) {
       final boolean[] match = {false};
       event
           .getSchemaInfo()
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeCollectTest.java
similarity index 93%
rename from 
server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
rename to 
server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeCollectTest.java
index 0ef1058c12a..a30854cc95d 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeCollectTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.collector;
+package org.apache.iotdb.db.pipe.extractor;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -25,12 +25,12 @@ import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridExtractor;
-import 
org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionHybridExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -115,41 +115,41 @@ public class PipeRealtimeCollectTest {
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern1);
+                  put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern1);
                 }
               }),
           new PipeTaskRuntimeConfiguration(
-              new PipeTaskCollectorRuntimeEnvironment(
+              new PipeTaskExtractorRuntimeEnvironment(
                   "1", 1, Integer.parseInt(dataRegion1), null)));
       collector2.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern2);
+                  put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern2);
                 }
               }),
           new PipeTaskRuntimeConfiguration(
-              new PipeTaskCollectorRuntimeEnvironment(
+              new PipeTaskExtractorRuntimeEnvironment(
                   "1", 1, Integer.parseInt(dataRegion1), null)));
       collector3.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern1);
+                  put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern1);
                 }
               }),
           new PipeTaskRuntimeConfiguration(
-              new PipeTaskCollectorRuntimeEnvironment(
+              new PipeTaskExtractorRuntimeEnvironment(
                   "1", 1, Integer.parseInt(dataRegion2), null)));
       collector4.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern2);
+                  put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern2);
                 }
               }),
           new PipeTaskRuntimeConfiguration(
-              new PipeTaskCollectorRuntimeEnvironment(
+              new PipeTaskExtractorRuntimeEnvironment(
                   "1", 1, Integer.parseInt(dataRegion2), null)));
 
       PipeRealtimeDataRegionExtractor[] collectors =

Reply via email to