This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rename-collector-to-extractor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 64e04dbef55e0d5dc004bc6ae1d465a8290af952
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jun 23 22:27:00 2023 +0800

    pipe rename: collector -> extractor
---
 .../persistence/pipe/PipePluginInfo.java           |  8 +--
 .../org/apache/iotdb/pipe/api/PipeConnector.java   |  2 +-
 .../api/{PipeCollector.java => PipeExtractor.java} | 42 ++++++++--------
 .../org/apache/iotdb/pipe/api/PipeProcessor.java   |  4 +-
 .../PipeCollectorRuntimeConfiguration.java         |  2 +-
 .../src/main/thrift/confignode.thrift              |  2 +-
 .../resources/conf/iotdb-common.properties         |  8 +--
 .../apache/iotdb/commons/conf/CommonConfig.java    | 44 ++++++++--------
 .../iotdb/commons/conf/CommonDescriptor.java       | 24 ++++-----
 .../iotdb/commons/pipe/config/PipeConfig.java      | 26 +++++-----
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |  4 +-
 .../{IoTDBCollector.java => IoTDBExtractor.java}   |  8 +--
 .../db/mpp/common/header/ColumnHeaderConstant.java |  4 +-
 .../execution/config/sys/pipe/ShowPipeTask.java    |  2 +-
 .../db/pipe/agent/plugin/PipePluginAgent.java      | 12 ++---
 ...ollector.java => IoTDBDataRegionExtractor.java} | 58 +++++++++++-----------
 ...java => PipeHistoricalDataRegionExtractor.java} |  4 +-
 ...> PipeHistoricalDataRegionTsFileExtractor.java} | 20 ++++----
 ...r.java => PipeRealtimeDataRegionExtractor.java} | 16 +++---
 ...va => PipeRealtimeDataRegionFakeExtractor.java} |  6 +--
 ... => PipeRealtimeDataRegionHybridExtractor.java} | 14 +++---
 ...ava => PipeRealtimeDataRegionLogExtractor.java} | 10 ++--
 ... => PipeRealtimeDataRegionTsFileExtractor.java} | 10 ++--
 .../realtime/assigner/DisruptorQueue.java          |  2 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  |  6 +--
 .../pipe/collector/realtime/epoch/TsFileEpoch.java |  8 +--
 .../listener/PipeInsertionDataNodeListener.java    |  6 +--
 .../matcher/CachedSchemaPatternMatcher.java        | 24 ++++-----
 .../realtime/matcher/PipeDataRegionMatcher.java    |  8 +--
 .../config/constant/PipeCollectorConstant.java     |  4 +-
 .../configuraion/PipeTaskRuntimeConfiguration.java |  4 +-
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  |  4 +-
 .../common/tablet/PipeRawTabletInsertionEvent.java |  6 +--
 .../db/pipe/processor/PipeDoNothingProcessor.java  |  6 +--
 .../db/pipe/task/stage/PipeTaskCollectorStage.java | 28 +++++------
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  6 +--
 .../collector/CachedSchemaPatternMatcherTest.java  | 22 ++++----
 .../db/pipe/collector/PipeRealtimeCollectTest.java | 36 +++++++-------
 38 files changed, 250 insertions(+), 250 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 784bf7df744..7a24618d6a7 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -34,8 +34,8 @@ import 
org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTabl
 import org.apache.iotdb.confignode.consensus.response.udf.JarResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -57,7 +57,7 @@ import java.util.Objects;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.DO_NOTHING_PROCESSOR;
-import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_COLLECTOR;
+import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_EXTRACTOR;
 
 public class PipePluginInfo implements SnapshotProcessor {
 
@@ -126,11 +126,11 @@ public class PipePluginInfo implements SnapshotProcessor {
         new PipeParameters(createPipeRequest.getCollectorAttributes());
     final String collectorPluginName =
         collectorParameters.getStringOrDefault(
-            PipeCollectorConstant.COLLECTOR_KEY, 
IOTDB_COLLECTOR.getPipePluginName());
+            PipeExtractorConstant.COLLECTOR_KEY, 
IOTDB_EXTRACTOR.getPipePluginName());
     if (!pipePluginMetaKeeper.containsPipePlugin(collectorPluginName)) {
       final String exceptionMessage =
           String.format(
-              "Failed to create pipe, the pipe collector plugin %s does not 
exist",
+              "Failed to create pipe, the pipe extractor plugin %s does not 
exist",
               collectorPluginName);
       LOGGER.warn(exceptionMessage);
       throw new PipeException(exceptionMessage);
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index 083590c1aef..5d14da1e3aa 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -46,7 +46,7 @@ import 
org.apache.iotdb.pipe.api.exception.PipeConnectionException;
  *       PipeConnector#handshake()} will be called to create a connection with 
sink.
  *   <li>While the collaboration task is in progress:
  *       <ul>
- *         <li>PipeCollector captures the events and wraps them into three 
types of Event instances.
+ *         <li>PipeExtractor captures the events and wraps them into three 
types of Event instances.
  *         <li>PipeProcessor processes the event and then passes them to the 
PipeConnector.
  *         <li>PipeConnector serializes the events into binaries and send them 
to sinks. The
  *             following 3 methods will be called: {@link
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java
similarity index 72%
rename from 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
rename to 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java
index 8771febead4..4213da842bc 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java
@@ -19,40 +19,40 @@
 
 package org.apache.iotdb.pipe.api;
 
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
 /**
- * PipeCollector
+ * PipeExtractor
  *
- * <p>PipeCollector is responsible for capturing events from sources.
+ * <p>PipeExtractor is responsible for capturing events from sources.
  *
- * <p>Various data sources can be supported by implementing different 
PipeCollector classes.
+ * <p>Various data sources can be supported by implementing different 
PipeExtractor classes.
  *
- * <p>The lifecycle of a PipeCollector is as follows:
+ * <p>The lifecycle of a PipeExtractor is as follows:
  *
  * <ul>
  *   <li>When a collaboration task is created, the KV pairs of `WITH 
COLLECTOR` clause in SQL are
- *       parsed and the validation method {@link 
PipeCollector#validate(PipeParameterValidator)}
+ *       parsed and the validation method {@link 
PipeExtractor#validate(PipeParameterValidator)}
  *       will be called to validate the parameters.
  *   <li>Before the collaboration task starts, the method {@link
- *       PipeCollector#customize(PipeParameters, 
PipeCollectorRuntimeConfiguration)} will be called
- *       to config the runtime behavior of the PipeCollector.
- *   <li>Then the method {@link PipeCollector#start()} will be called to start 
the PipeCollector.
- *   <li>While the collaboration task is in progress, the method {@link 
PipeCollector#supply()} will
+ *       PipeExtractor#customize(PipeParameters, 
PipeExtractorRuntimeConfiguration)} will be called
+ *       to config the runtime behavior of the PipeExtractor.
+ *   <li>Then the method {@link PipeExtractor#start()} will be called to start 
the PipeExtractor.
+ *   <li>While the collaboration task is in progress, the method {@link 
PipeExtractor#supply()} will
  *       be called to capture events from sources and then the events will be 
passed to the
  *       PipeProcessor.
- *   <li>The method {@link PipeCollector#close()} will be called when the 
collaboration task is
+ *   <li>The method {@link PipeExtractor#close()} will be called when the 
collaboration task is
  *       cancelled (the `DROP PIPE` command is executed).
  * </ul>
  */
-public interface PipeCollector extends PipePlugin {
+public interface PipeExtractor extends PipePlugin {
 
   /**
    * This method is mainly used to validate {@link PipeParameters} and it is 
executed before {@link
-   * PipeCollector#customize(PipeParameters, 
PipeCollectorRuntimeConfiguration)} is called.
+   * PipeExtractor#customize(PipeParameters, 
PipeExtractorRuntimeConfiguration)} is called.
    *
    * @param validator the validator used to validate {@link PipeParameters}
    * @throws Exception if any parameter is not valid
@@ -60,28 +60,28 @@ public interface PipeCollector extends PipePlugin {
   void validate(PipeParameterValidator validator) throws Exception;
 
   /**
-   * This method is mainly used to customize PipeCollector. In this method, 
the user can do the
+   * This method is mainly used to customize PipeExtractor. In this method, 
the user can do the
    * following things:
    *
    * <ul>
    *   <li>Use PipeParameters to parse key-value pair attributes entered by 
the user.
-   *   <li>Set the running configurations in PipeCollectorRuntimeConfiguration.
+   *   <li>Set the running configurations in PipeExtractorRuntimeConfiguration.
    * </ul>
    *
    * <p>This method is called after the method {@link
-   * PipeCollector#validate(PipeParameterValidator)} is called.
+   * PipeExtractor#validate(PipeParameterValidator)} is called.
    *
    * @param parameters used to parse the input parameters entered by the user
-   * @param configuration used to set the required properties of the running 
PipeCollector
+   * @param configuration used to set the required properties of the running 
PipeExtractor
    * @throws Exception the user can throw errors if necessary
    */
-  void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration 
configuration)
+  void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration 
configuration)
       throws Exception;
 
   /**
    * Start the collector. After this method is called, events should be ready 
to be supplied by
-   * {@link PipeCollector#supply()}. This method is called after {@link
-   * PipeCollector#customize(PipeParameters, 
PipeCollectorRuntimeConfiguration)} is called.
+   * {@link PipeExtractor#supply()}. This method is called after {@link
+   * PipeExtractor#customize(PipeParameters, 
PipeExtractorRuntimeConfiguration)} is called.
    *
    * @throws Exception the user can throw errors if necessary
    */
@@ -89,7 +89,7 @@ public interface PipeCollector extends PipePlugin {
 
   /**
    * Supply single event from the collector and the caller will send the event 
to the processor.
-   * This method is called after {@link PipeCollector#start()} is called.
+   * This method is called after {@link PipeExtractor#start()} is called.
    *
    * @return the event to be supplied. the event may be null if the collector 
has no more events at
    *     the moment, but the collector is still running for more events.
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
index 3c35e8b9e13..f752aafef3a 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
@@ -30,7 +30,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 /**
  * PipeProcessor
  *
- * <p>PipeProcessor is used to filter and transform the Event formed by the 
PipeCollector.
+ * <p>PipeProcessor is used to filter and transform the Event formed by the 
PipeExtractor.
  *
  * <p>The lifecycle of a PipeProcessor is as follows:
  *
@@ -43,7 +43,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
  *       to config the runtime behavior of the PipeProcessor.
  *   <li>While the collaboration task is in progress:
  *       <ul>
- *         <li>PipeCollector captures the events and wraps them into three 
types of Event instances.
+ *         <li>PipeExtractor captures the events and wraps them into three 
types of Event instances.
  *         <li>PipeProcessor processes the event and then passes them to the 
PipeConnector. The
  *             following 3 methods will be called: {@link
  *             PipeProcessor#process(TabletInsertionEvent, EventCollector)}, 
{@link
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java
index 071bfeb60c6..2671d3be5e7 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java
@@ -19,4 +19,4 @@
 
 package org.apache.iotdb.pipe.api.customizer.configuration;
 
-public interface PipeCollectorRuntimeConfiguration extends 
PipeRuntimeConfiguration {}
+public interface PipeExtractorRuntimeConfiguration extends 
PipeRuntimeConfiguration {}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 8ad6fc9ac76..7d3b990b8dd 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -612,7 +612,7 @@ struct TShowPipeInfo {
   1: required string id
   2: required i64 creationTime
   3: required string state
-  4: required string pipeCollector
+  4: required string pipeExtractor
   5: required string pipeProcessor
   6: required string pipeConnector
   7: required string exceptionMessage
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 38a57fa0205..033322e456e 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -960,17 +960,17 @@ cluster_name=defaultCluster
 # pipe_subtask_executor_pending_queue_max_blocking_time_ms=1000
 
 # The default size of ring buffer in the realtime collector's disruptor queue.
-# pipe_collector_assigner_disruptor_ring_buffer_size=65536
+# pipe_extractor_assigner_disruptor_ring_buffer_size=65536
 
 # The maximum number of entries the deviceToCollectorsCache can hold.
-# pipe_collector_matcher_cache_size=1024
+# pipe_extractor_matcher_cache_size=1024
 
 # The capacity for the number of tablet events that can be stored in the 
pending queue of the Hybrid Realtime Collector.
-# pipe_collector_pending_queue_capacity=128
+# pipe_extractor_pending_queue_capacity=128
 
 # The limit for the number of tablet events that can be held in the pending 
queue of the Hybrid Realtime Collector.
 # Noted that: this should be less than or equals to 
realtimeCollectorPendingQueueCapacity
-# pipe_collector_pending_queue_tablet_limit=64
+# pipe_extractor_pending_queue_tablet_limit=64
 
 # The buffer size used for reading file during file transfer.
 # pipe_connector_read_file_buffer_size=8388608
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index d4f768fc0c4..c402ec2f460 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -151,11 +151,11 @@ public class CommonConfig {
   private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 
1000L;
   private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
 
-  private int pipeCollectorAssignerDisruptorRingBufferSize = 65536;
-  private int pipeCollectorMatcherCacheSize = 1024;
-  private int pipeCollectorPendingQueueCapacity = 128;
+  private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
+  private int pipeExtractorMatcherCacheSize = 1024;
+  private int pipeExtractorPendingQueueCapacity = 128;
   // this should be less than or equals to 
realtimeCollectorPendingQueueCapacity
-  private int pipeCollectorPendingQueueTabletLimit = 
pipeCollectorPendingQueueCapacity / 2;
+  private int pipeExtractorPendingQueueTabletLimit = 
pipeExtractorPendingQueueCapacity / 2;
 
   private int pipeConnectorReadFileBufferSize = 8388608;
   private long pipeConnectorRetryIntervalMs = 1000L;
@@ -458,38 +458,38 @@ public class CommonConfig {
     this.pipeHardlinkTsFileDirName = pipeHardlinkTsFileDirName;
   }
 
-  public int getPipeCollectorAssignerDisruptorRingBufferSize() {
-    return pipeCollectorAssignerDisruptorRingBufferSize;
+  public int getPipeExtractorAssignerDisruptorRingBufferSize() {
+    return pipeExtractorAssignerDisruptorRingBufferSize;
   }
 
-  public void setPipeCollectorAssignerDisruptorRingBufferSize(
-      int pipeCollectorAssignerDisruptorRingBufferSize) {
-    this.pipeCollectorAssignerDisruptorRingBufferSize =
-        pipeCollectorAssignerDisruptorRingBufferSize;
+  public void setPipeExtractorAssignerDisruptorRingBufferSize(
+      int pipeExtractorAssignerDisruptorRingBufferSize) {
+    this.pipeExtractorAssignerDisruptorRingBufferSize =
+        pipeExtractorAssignerDisruptorRingBufferSize;
   }
 
-  public int getPipeCollectorMatcherCacheSize() {
-    return pipeCollectorMatcherCacheSize;
+  public int getPipeExtractorMatcherCacheSize() {
+    return pipeExtractorMatcherCacheSize;
   }
 
-  public void setPipeCollectorMatcherCacheSize(int 
pipeCollectorMatcherCacheSize) {
-    this.pipeCollectorMatcherCacheSize = pipeCollectorMatcherCacheSize;
+  public void setPipeExtractorMatcherCacheSize(int 
pipeExtractorMatcherCacheSize) {
+    this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize;
   }
 
-  public int getPipeCollectorPendingQueueCapacity() {
-    return pipeCollectorPendingQueueCapacity;
+  public int getPipeExtractorPendingQueueCapacity() {
+    return pipeExtractorPendingQueueCapacity;
   }
 
-  public void setPipeCollectorPendingQueueCapacity(int 
pipeCollectorPendingQueueCapacity) {
-    this.pipeCollectorPendingQueueCapacity = pipeCollectorPendingQueueCapacity;
+  public void setPipeExtractorPendingQueueCapacity(int 
pipeExtractorPendingQueueCapacity) {
+    this.pipeExtractorPendingQueueCapacity = pipeExtractorPendingQueueCapacity;
   }
 
-  public int getPipeCollectorPendingQueueTabletLimit() {
-    return pipeCollectorPendingQueueTabletLimit;
+  public int getPipeExtractorPendingQueueTabletLimit() {
+    return pipeExtractorPendingQueueTabletLimit;
   }
 
-  public void setPipeCollectorPendingQueueTabletLimit(int 
pipeCollectorPendingQueueTabletLimit) {
-    this.pipeCollectorPendingQueueTabletLimit = 
pipeCollectorPendingQueueTabletLimit;
+  public void setPipeExtractorPendingQueueTabletLimit(int 
pipeExtractorPendingQueueTabletLimit) {
+    this.pipeExtractorPendingQueueTabletLimit = 
pipeExtractorPendingQueueTabletLimit;
   }
 
   public int getPipeConnectorReadFileBufferSize() {
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 7dd417d5b3b..a8058a549b7 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -267,26 +267,26 @@ public class CommonDescriptor {
                 "pipe_subtask_executor_pending_queue_max_blocking_time_ms",
                 
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
 
-    config.setPipeCollectorAssignerDisruptorRingBufferSize(
+    config.setPipeExtractorAssignerDisruptorRingBufferSize(
         Integer.parseInt(
             properties.getProperty(
-                "pipe_collector_assigner_disruptor_ring_buffer_size",
-                
String.valueOf(config.getPipeCollectorAssignerDisruptorRingBufferSize()))));
-    config.setPipeCollectorMatcherCacheSize(
+                "pipe_extractor_assigner_disruptor_ring_buffer_size",
+                
String.valueOf(config.getPipeExtractorAssignerDisruptorRingBufferSize()))));
+    config.setPipeExtractorMatcherCacheSize(
         Integer.parseInt(
             properties.getProperty(
-                "pipe_collector_matcher_cache_size",
-                String.valueOf(config.getPipeCollectorMatcherCacheSize()))));
-    config.setPipeCollectorPendingQueueCapacity(
+                "pipe_extractor_matcher_cache_size",
+                String.valueOf(config.getPipeExtractorMatcherCacheSize()))));
+    config.setPipeExtractorPendingQueueCapacity(
         Integer.parseInt(
             properties.getProperty(
-                "pipe_collector_pending_queue_capacity",
-                
String.valueOf(config.getPipeCollectorPendingQueueCapacity()))));
-    config.setPipeCollectorPendingQueueTabletLimit(
+                "pipe_extractor_pending_queue_capacity",
+                
String.valueOf(config.getPipeExtractorPendingQueueCapacity()))));
+    config.setPipeExtractorPendingQueueTabletLimit(
         Integer.parseInt(
             properties.getProperty(
-                "pipe_collector_pending_queue_tablet_limit",
-                
String.valueOf(config.getPipeCollectorPendingQueueTabletLimit()))));
+                "pipe_extractor_pending_queue_tablet_limit",
+                
String.valueOf(config.getPipeExtractorPendingQueueTabletLimit()))));
 
     config.setPipeConnectorReadFileBufferSize(
         Integer.parseInt(
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 365ea0b8003..2e54fc9c353 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -55,20 +55,20 @@ public class PipeConfig {
 
   /////////////////////////////// Collector ///////////////////////////////
 
-  public int getPipeCollectorAssignerDisruptorRingBufferSize() {
-    return COMMON_CONFIG.getPipeCollectorAssignerDisruptorRingBufferSize();
+  public int getPipeExtractorAssignerDisruptorRingBufferSize() {
+    return COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferSize();
   }
 
-  public int getPipeCollectorMatcherCacheSize() {
-    return COMMON_CONFIG.getPipeCollectorMatcherCacheSize();
+  public int getPipeExtractorMatcherCacheSize() {
+    return COMMON_CONFIG.getPipeExtractorMatcherCacheSize();
   }
 
-  public int getPipeCollectorPendingQueueCapacity() {
-    return COMMON_CONFIG.getPipeCollectorPendingQueueCapacity();
+  public int getPipeExtractorPendingQueueCapacity() {
+    return COMMON_CONFIG.getPipeExtractorPendingQueueCapacity();
   }
 
-  public int getPipeCollectorPendingQueueTabletLimit() {
-    return COMMON_CONFIG.getPipeCollectorPendingQueueTabletLimit();
+  public int getPipeExtractorPendingQueueTabletLimit() {
+    return COMMON_CONFIG.getPipeExtractorPendingQueueTabletLimit();
   }
 
   /////////////////////////////// Connector ///////////////////////////////
@@ -118,12 +118,12 @@ public class PipeConfig {
         getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs());
 
     LOGGER.info(
-        "PipeCollectorAssignerDisruptorRingBufferSize: {}",
-        getPipeCollectorAssignerDisruptorRingBufferSize());
-    LOGGER.info("PipeCollectorMatcherCacheSize: {}", 
getPipeCollectorMatcherCacheSize());
-    LOGGER.info("PipeCollectorPendingQueueCapacity: {}", 
getPipeCollectorPendingQueueCapacity());
+        "PipeExtractorAssignerDisruptorRingBufferSize: {}",
+        getPipeExtractorAssignerDisruptorRingBufferSize());
+    LOGGER.info("PipeExtractorMatcherCacheSize: {}", 
getPipeExtractorMatcherCacheSize());
+    LOGGER.info("PipeExtractorPendingQueueCapacity: {}", 
getPipeExtractorPendingQueueCapacity());
     LOGGER.info(
-        "PipeCollectorPendingQueueTabletLimit: {}", 
getPipeCollectorPendingQueueTabletLimit());
+        "PipeExtractorPendingQueueTabletLimit: {}", 
getPipeExtractorPendingQueueTabletLimit());
 
     LOGGER.info("PipeConnectorReadFileBufferSize: {}", 
getPipeConnectorReadFileBufferSize());
     LOGGER.info("PipeConnectorRetryIntervalMs: {}", 
getPipeConnectorRetryIntervalMs());
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 14d22f578c3..1bcbe17b4ad 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.commons.pipe.plugin.builtin;
 
-import org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBCollector;
+import org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBExtractor;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSyncConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
@@ -30,7 +30,7 @@ import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor
 public enum BuiltinPipePlugin {
 
   // collectors
-  IOTDB_COLLECTOR("iotdb_collector", IoTDBCollector.class),
+  IOTDB_EXTRACTOR("iotdb_extractor", IoTDBExtractor.class),
 
   // processors
   DO_NOTHING_PROCESSOR("do_nothing_processor", DoNothingProcessor.class),
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBExtractor.java
similarity index 89%
rename from 
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
rename to 
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBExtractor.java
index 8d3f0276126..9857f5dfd7d 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBExtractor.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.commons.pipe.plugin.builtin.collector;
 
-import org.apache.iotdb.pipe.api.PipeCollector;
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.PipeExtractor;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -31,14 +31,14 @@ import org.apache.iotdb.pipe.api.event.Event;
  * imported here. The pipe agent in the server module will replace this class 
with the real
  * implementation when initializing the collector.
  */
-public class IoTDBCollector implements PipeCollector {
+public class IoTDBExtractor implements PipeExtractor {
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 
   @Override
-  public void customize(PipeParameters parameters, 
PipeCollectorRuntimeConfiguration configuration)
+  public void customize(PipeParameters parameters, 
PipeExtractorRuntimeConfiguration configuration)
       throws Exception {
     throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index 304222a7407..a982df07a4d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -146,7 +146,7 @@ public class ColumnHeaderConstant {
   // column names for show pipe
   public static final String ID = "ID";
   public static final String CREATION_TIME = "CreationTime";
-  public static final String PIPE_COLLECTOR = "PipeCollector";
+  public static final String PIPE_EXTRACTOR = "PipeExtractor";
   public static final String PIPE_PROCESSOR = "PipeProcessor";
   public static final String PIPE_CONNECTOR = "PipeConnector";
   public static final String EXCEPTION_MESSAGE = "ExceptionMessage";
@@ -385,7 +385,7 @@ public class ColumnHeaderConstant {
           new ColumnHeader(ID, TSDataType.TEXT),
           new ColumnHeader(CREATION_TIME, TSDataType.TEXT),
           new ColumnHeader(STATE, TSDataType.TEXT),
-          new ColumnHeader(PIPE_COLLECTOR, TSDataType.TEXT),
+          new ColumnHeader(PIPE_EXTRACTOR, TSDataType.TEXT),
           new ColumnHeader(PIPE_PROCESSOR, TSDataType.TEXT),
           new ColumnHeader(PIPE_CONNECTOR, TSDataType.TEXT),
           new ColumnHeader(EXCEPTION_MESSAGE, TSDataType.TEXT));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/pipe/ShowPipeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/pipe/ShowPipeTask.java
index dde5561e9de..a9a79015bbc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/pipe/ShowPipeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/pipe/ShowPipeTask.java
@@ -68,7 +68,7 @@ public class ShowPipeTask implements IConfigTask {
           .getColumnBuilder(1)
           .writeBinary(new 
Binary(DateTimeUtils.convertLongToDate(tPipeInfo.getCreationTime())));
       builder.getColumnBuilder(2).writeBinary(new 
Binary(tPipeInfo.getState()));
-      builder.getColumnBuilder(3).writeBinary(new 
Binary(tPipeInfo.getPipeCollector()));
+      builder.getColumnBuilder(3).writeBinary(new 
Binary(tPipeInfo.getPipeExtractor()));
       builder.getColumnBuilder(4).writeBinary(new 
Binary(tPipeInfo.getPipeProcessor()));
       builder.getColumnBuilder(5).writeBinary(new 
Binary(tPipeInfo.getPipeConnector()));
       builder.getColumnBuilder(6).writeBinary(new 
Binary(tPipeInfo.getExceptionMessage()));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 5a05cb43b53..7aa7c3d5449 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -25,11 +25,11 @@ import 
org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
 import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
 import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
-import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant;
-import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.PipeExtractor;
 import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -195,12 +195,12 @@ public class PipePluginAgent {
     }
   }
 
-  public PipeCollector reflectCollector(PipeParameters collectorParameters) {
-    return (PipeCollector)
+  public PipeExtractor reflectCollector(PipeParameters collectorParameters) {
+    return (PipeExtractor)
         reflect(
             collectorParameters.getStringOrDefault(
-                PipeCollectorConstant.COLLECTOR_KEY,
-                BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName()));
+                PipeExtractorConstant.COLLECTOR_KEY,
+                BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()));
   }
 
   public PipeProcessor reflectProcessor(PipeParameters processorParameters) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionExtractor.java
similarity index 82%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionExtractor.java
index cbe7fee083e..48a34fc923c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionExtractor.java
@@ -21,16 +21,16 @@ package org.apache.iotdb.db.pipe.collector;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.db.engine.StorageEngine;
-import 
org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionCollector;
-import 
org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionTsFileCollector;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionFakeCollector;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridCollector;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionLogCollector;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionTsFileCollector;
+import 
org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionTsFileExtractor;
+import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionFakeExtractor;
+import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridExtractor;
+import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionLogExtractor;
+import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionTsFileExtractor;
 import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
-import org.apache.iotdb.pipe.api.PipeCollector;
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.PipeExtractor;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -42,25 +42,25 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_ENABLE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_FILE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_HYBRID;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_LOG;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_ENABLE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE_FILE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE_HYBRID;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE_LOG;
 
-public class IoTDBDataRegionCollector implements PipeCollector {
+public class IoTDBDataRegionExtractor implements PipeExtractor {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataRegionCollector.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataRegionExtractor.class);
 
   private final AtomicBoolean hasBeenStarted;
 
-  private PipeHistoricalDataRegionCollector historicalCollector;
-  private PipeRealtimeDataRegionCollector realtimeCollector;
+  private PipeHistoricalDataRegionExtractor historicalCollector;
+  private PipeRealtimeDataRegionExtractor realtimeCollector;
 
   private int dataRegionId;
 
-  public IoTDBDataRegionCollector() {
+  public IoTDBDataRegionExtractor() {
     this.hasBeenStarted = new AtomicBoolean(false);
   }
 
@@ -99,34 +99,34 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
 
   private void constructHistoricalCollector() {
     // enable historical collector by default
-    historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
+    historicalCollector = new PipeHistoricalDataRegionTsFileExtractor();
   }
 
   private void constructRealtimeCollector(PipeParameters parameters) {
     // enable realtime collector by default
     if (!parameters.getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true)) {
-      realtimeCollector = new PipeRealtimeDataRegionFakeCollector();
+      realtimeCollector = new PipeRealtimeDataRegionFakeExtractor();
       return;
     }
 
     // use hybrid mode by default
     if (!parameters.hasAttribute(COLLECTOR_REALTIME_MODE)) {
-      realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
+      realtimeCollector = new PipeRealtimeDataRegionHybridExtractor();
       return;
     }
 
     switch (parameters.getString(COLLECTOR_REALTIME_MODE)) {
       case COLLECTOR_REALTIME_MODE_FILE:
-        realtimeCollector = new PipeRealtimeDataRegionTsFileCollector();
+        realtimeCollector = new PipeRealtimeDataRegionTsFileExtractor();
         break;
       case COLLECTOR_REALTIME_MODE_LOG:
-        realtimeCollector = new PipeRealtimeDataRegionLogCollector();
+        realtimeCollector = new PipeRealtimeDataRegionLogExtractor();
         break;
       case COLLECTOR_REALTIME_MODE_HYBRID:
-        realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
+        realtimeCollector = new PipeRealtimeDataRegionHybridExtractor();
         break;
       default:
-        realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
+        realtimeCollector = new PipeRealtimeDataRegionHybridExtractor();
         LOGGER.warn(
             "Unsupported collector realtime mode: {}, create a hybrid 
collector.",
             parameters.getString(COLLECTOR_REALTIME_MODE));
@@ -134,7 +134,7 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
   }
 
   @Override
-  public void customize(PipeParameters parameters, 
PipeCollectorRuntimeConfiguration configuration)
+  public void customize(PipeParameters parameters, 
PipeExtractorRuntimeConfiguration configuration)
       throws Exception {
     dataRegionId =
         ((PipeTaskCollectorRuntimeEnvironment) 
configuration.getRuntimeEnvironment()).getRegionId();
@@ -163,7 +163,7 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
                   (dataRegion -> {
                     dataRegion.writeLock(
                         String.format(
-                            "Pipe: starting %s", 
IoTDBDataRegionCollector.class.getName()));
+                            "Pipe: starting %s", 
IoTDBDataRegionExtractor.class.getName()));
                     try {
                       
startHistoricalCollectorAndRealtimeCollector(exceptionHolder);
                     } finally {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionExtractor.java
similarity index 87%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionExtractor.java
index 08d5684851e..858ae933cb5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionExtractor.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.db.pipe.collector.historical;
 
-import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.PipeExtractor;
 
-public interface PipeHistoricalDataRegionCollector extends PipeCollector {
+public interface PipeHistoricalDataRegionExtractor extends PipeExtractor {
 
   boolean hasConsumedAll();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileExtractor.java
similarity index 93%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 6ade5ca4ad8..2327e63b07a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.utils.DateTimeUtils;
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -44,16 +44,16 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE;
-import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_END_TIME;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_START_TIME;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_PATTERN_KEY;
 
-public class PipeHistoricalDataRegionTsFileCollector implements 
PipeHistoricalDataRegionCollector {
+public class PipeHistoricalDataRegionTsFileExtractor implements 
PipeHistoricalDataRegionExtractor {
 
   private static final Logger LOGGER =
-      LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileCollector.class);
+      LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileExtractor.class);
 
   private PipeTaskMeta pipeTaskMeta;
   private ProgressIndex startIndex;
@@ -76,7 +76,7 @@ public class PipeHistoricalDataRegionTsFileCollector 
implements PipeHistoricalDa
 
   @Override
   public void customize(
-      PipeParameters parameters, PipeCollectorRuntimeConfiguration 
configuration) {
+      PipeParameters parameters, PipeExtractorRuntimeConfiguration 
configuration) {
     final PipeTaskCollectorRuntimeEnvironment environment =
         (PipeTaskCollectorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
 
@@ -203,7 +203,7 @@ public class PipeHistoricalDataRegionTsFileCollector 
implements PipeHistoricalDa
         pendingQueue.forEach(
             event ->
                 event.increaseReferenceCount(
-                    PipeHistoricalDataRegionTsFileCollector.class.getName()));
+                    PipeHistoricalDataRegionTsFileExtractor.class.getName()));
       } finally {
         tsFileManager.readUnlock();
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionExtractor.java
similarity index 84%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionExtractor.java
index 4e6e98c168e..aec400b6d5d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionExtractor.java
@@ -21,32 +21,32 @@ package org.apache.iotdb.db.pipe.collector.realtime;
 
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener;
-import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
-import org.apache.iotdb.pipe.api.PipeCollector;
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.PipeExtractor;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
-public abstract class PipeRealtimeDataRegionCollector implements PipeCollector 
{
+public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor 
{
 
   protected String pattern;
   protected String dataRegionId;
   protected PipeTaskMeta pipeTaskMeta;
 
-  protected PipeRealtimeDataRegionCollector() {}
+  protected PipeRealtimeDataRegionExtractor() {}
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {}
 
   @Override
-  public void customize(PipeParameters parameters, 
PipeCollectorRuntimeConfiguration configuration)
+  public void customize(PipeParameters parameters, 
PipeExtractorRuntimeConfiguration configuration)
       throws Exception {
     pattern =
         parameters.getStringOrDefault(
-            PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
-            PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
+            PipeExtractorConstant.COLLECTOR_PATTERN_KEY,
+            PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
 
     final PipeTaskCollectorRuntimeEnvironment environment =
         (PipeTaskCollectorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeExtractor.java
similarity index 88%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeExtractor.java
index 6051930e4e9..cffb273cee1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeExtractor.java
@@ -20,12 +20,12 @@
 package org.apache.iotdb.db.pipe.collector.realtime;
 
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
-public class PipeRealtimeDataRegionFakeCollector extends 
PipeRealtimeDataRegionCollector {
+public class PipeRealtimeDataRegionFakeExtractor extends 
PipeRealtimeDataRegionExtractor {
 
   @Override
   public void validate(PipeParameterValidator validator) {
@@ -34,7 +34,7 @@ public class PipeRealtimeDataRegionFakeCollector extends 
PipeRealtimeDataRegionC
 
   @Override
   public void customize(
-      PipeParameters parameters, PipeCollectorRuntimeConfiguration 
configuration) {
+      PipeParameters parameters, PipeExtractorRuntimeConfiguration 
configuration) {
     // do nothing
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridExtractor.java
similarity index 95%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridExtractor.java
index eeca9a82dad..a78ad713feb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -32,16 +32,16 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegionCollector {
+public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegionExtractor {
 
   private static final Logger LOGGER =
-      LoggerFactory.getLogger(PipeRealtimeDataRegionHybridCollector.class);
+      LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
 
   // This queue is used to store pending events collected by the method 
collect(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
-  public PipeRealtimeDataRegionHybridCollector() {
+  public PipeRealtimeDataRegionHybridExtractor() {
     this.pendingQueue = new UnboundedBlockingPendingQueue<>();
   }
 
@@ -113,7 +113,7 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
 
   private boolean isApproachingCapacity() {
     return pendingQueue.size()
-        >= PipeConfig.getInstance().getPipeCollectorPendingQueueTabletLimit();
+        >= PipeConfig.getInstance().getPipeExtractorPendingQueueTabletLimit();
   }
 
   @Override
@@ -136,7 +136,7 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
                 eventToSupply.getClass(), this));
       }
 
-      
collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName());
+      
collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
       if (suppliedEvent != null) {
         return suppliedEvent;
       }
@@ -157,7 +157,7 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
                 (state.equals(TsFileEpoch.State.EMPTY)) ? 
TsFileEpoch.State.USING_TABLET : state);
 
     if 
(event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TABLET)) {
-      if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName()))
 {
+      if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
 {
         return event.getEvent();
       } else {
         // if the event's reference count can not be increased, it means the 
data represented by
@@ -188,7 +188,7 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
             });
 
     if 
(event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) {
-      if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName()))
 {
+      if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
 {
         return event.getEvent();
       } else {
         // if the event's reference count can not be increased, it means the 
data represented by
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogExtractor.java
similarity index 93%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogExtractor.java
index 2d02d4fba78..b50e48536c2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -30,16 +30,16 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PipeRealtimeDataRegionLogCollector extends 
PipeRealtimeDataRegionCollector {
+public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionExtractor {
 
   private static final Logger LOGGER =
-      LoggerFactory.getLogger(PipeRealtimeDataRegionLogCollector.class);
+      LoggerFactory.getLogger(PipeRealtimeDataRegionLogExtractor.class);
 
   // This queue is used to store pending events collected by the method 
collect(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
-  public PipeRealtimeDataRegionLogCollector() {
+  public PipeRealtimeDataRegionLogExtractor() {
     this.pendingQueue = new UnboundedBlockingPendingQueue<>();
   }
 
@@ -79,7 +79,7 @@ public class PipeRealtimeDataRegionLogCollector extends 
PipeRealtimeDataRegionCo
     while (collectEvent != null) {
       Event suppliedEvent = null;
 
-      if 
(collectEvent.increaseReferenceCount(PipeRealtimeDataRegionLogCollector.class.getName()))
 {
+      if 
(collectEvent.increaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()))
 {
         suppliedEvent = collectEvent.getEvent();
       } else {
         // if the event's reference count can not be increased, it means the 
data represented by
@@ -94,7 +94,7 @@ public class PipeRealtimeDataRegionLogCollector extends 
PipeRealtimeDataRegionCo
         PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
       }
 
-      
collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogCollector.class.getName());
+      
collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
       if (suppliedEvent != null) {
         return suppliedEvent;
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileExtractor.java
similarity index 92%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index f43582a5541..2bc9abee7ad 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -30,16 +30,16 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PipeRealtimeDataRegionTsFileCollector extends 
PipeRealtimeDataRegionCollector {
+public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegionExtractor {
 
   private static final Logger LOGGER =
-      LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileCollector.class);
+      LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileExtractor.class);
 
   // This queue is used to store pending events collected by the method 
collect(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
-  public PipeRealtimeDataRegionTsFileCollector() {
+  public PipeRealtimeDataRegionTsFileExtractor() {
     this.pendingQueue = new UnboundedBlockingPendingQueue<>();
   }
 
@@ -80,7 +80,7 @@ public class PipeRealtimeDataRegionTsFileCollector extends 
PipeRealtimeDataRegio
       Event suppliedEvent = null;
 
       if (collectEvent.increaseReferenceCount(
-          PipeRealtimeDataRegionTsFileCollector.class.getName())) {
+          PipeRealtimeDataRegionTsFileExtractor.class.getName())) {
         suppliedEvent = collectEvent.getEvent();
       } else {
         // if the event's reference count can not be increased, it means the 
data represented by
@@ -95,7 +95,7 @@ public class PipeRealtimeDataRegionTsFileCollector extends 
PipeRealtimeDataRegio
         PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
       }
 
-      
collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileCollector.class.getName());
+      
collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
       if (suppliedEvent != null) {
         return suppliedEvent;
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java
index 885913a44cf..bc22132d4cd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java
@@ -49,7 +49,7 @@ public class DisruptorQueue<E> {
 
   public static class Builder<E> {
     private int ringBufferSize =
-        
PipeConfig.getInstance().getPipeCollectorAssignerDisruptorRingBufferSize();
+        
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize();
     private ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
     private ProducerType producerType = ProducerType.MULTI;
     private WaitStrategy waitStrategy = new BlockingWaitStrategy();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java
index fe482147e04..116f63f3073 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.collector.realtime.assigner;
 
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector;
+import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.collector.realtime.matcher.CachedSchemaPatternMatcher;
 import 
org.apache.iotdb.db.pipe.collector.realtime.matcher.PipeDataRegionMatcher;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
@@ -63,11 +63,11 @@ public class PipeDataRegionAssigner {
     event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName());
   }
 
-  public void startAssignTo(PipeRealtimeDataRegionCollector collector) {
+  public void startAssignTo(PipeRealtimeDataRegionExtractor collector) {
     matcher.register(collector);
   }
 
-  public void stopAssignTo(PipeRealtimeDataRegionCollector collector) {
+  public void stopAssignTo(PipeRealtimeDataRegionExtractor collector) {
     matcher.deregister(collector);
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java
index f20bf4cc6dc..a4806c30774 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.collector.realtime.epoch;
 
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector;
+import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class TsFileEpoch {
 
   private final String filePath;
-  private final ConcurrentMap<PipeRealtimeDataRegionCollector, 
AtomicReference<State>>
+  private final ConcurrentMap<PipeRealtimeDataRegionExtractor, 
AtomicReference<State>>
       dataRegionCollector2State;
 
   public TsFileEpoch(String filePath) {
@@ -36,14 +36,14 @@ public class TsFileEpoch {
     this.dataRegionCollector2State = new ConcurrentHashMap<>();
   }
 
-  public TsFileEpoch.State getState(PipeRealtimeDataRegionCollector collector) 
{
+  public TsFileEpoch.State getState(PipeRealtimeDataRegionExtractor collector) 
{
     return dataRegionCollector2State
         .computeIfAbsent(collector, o -> new AtomicReference<>(State.EMPTY))
         .get();
   }
 
   public void migrateState(
-      PipeRealtimeDataRegionCollector collector, TsFileEpochStateMigrator 
visitor) {
+      PipeRealtimeDataRegionExtractor collector, TsFileEpochStateMigrator 
visitor) {
     dataRegionCollector2State
         .computeIfAbsent(collector, o -> new AtomicReference<>(State.EMPTY))
         .getAndUpdate(visitor::migrate);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java
index 6fd82618ff9..00758562073 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.collector.realtime.listener;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector;
+import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.collector.realtime.assigner.PipeDataRegionAssigner;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEventFactory;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
@@ -53,7 +53,7 @@ public class PipeInsertionDataNodeListener {
   //////////////////////////// start & stop ////////////////////////////
 
   public synchronized void startListenAndAssign(
-      String dataRegionId, PipeRealtimeDataRegionCollector collector) {
+      String dataRegionId, PipeRealtimeDataRegionExtractor collector) {
     dataRegionId2Assigner
         .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner())
         .startAssignTo(collector);
@@ -67,7 +67,7 @@ public class PipeInsertionDataNodeListener {
   }
 
   public synchronized void stopListenAndAssign(
-      String dataRegionId, PipeRealtimeDataRegionCollector collector) {
+      String dataRegionId, PipeRealtimeDataRegionExtractor collector) {
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
     if (assigner == null) {
       return;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
index 14df36e9267..0af3151ed9a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.collector.realtime.matcher;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector;
+import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 
@@ -40,20 +40,20 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
 
   private final ReentrantReadWriteLock lock;
 
-  private final Set<PipeRealtimeDataRegionCollector> collectors;
-  private final Cache<String, Set<PipeRealtimeDataRegionCollector>> 
deviceToCollectorsCache;
+  private final Set<PipeRealtimeDataRegionExtractor> collectors;
+  private final Cache<String, Set<PipeRealtimeDataRegionExtractor>> 
deviceToCollectorsCache;
 
   public CachedSchemaPatternMatcher() {
     this.lock = new ReentrantReadWriteLock();
     this.collectors = new HashSet<>();
     this.deviceToCollectorsCache =
         Caffeine.newBuilder()
-            
.maximumSize(PipeConfig.getInstance().getPipeCollectorMatcherCacheSize())
+            
.maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize())
             .build();
   }
 
   @Override
-  public void register(PipeRealtimeDataRegionCollector collector) {
+  public void register(PipeRealtimeDataRegionExtractor collector) {
     lock.writeLock().lock();
     try {
       collectors.add(collector);
@@ -64,7 +64,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   }
 
   @Override
-  public void deregister(PipeRealtimeDataRegionCollector collector) {
+  public void deregister(PipeRealtimeDataRegionExtractor collector) {
     lock.writeLock().lock();
     try {
       collectors.remove(collector);
@@ -85,8 +85,8 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   }
 
   @Override
-  public Set<PipeRealtimeDataRegionCollector> match(PipeRealtimeCollectEvent 
event) {
-    final Set<PipeRealtimeDataRegionCollector> matchedCollectors = new 
HashSet<>();
+  public Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeCollectEvent 
event) {
+    final Set<PipeRealtimeDataRegionExtractor> matchedCollectors = new 
HashSet<>();
 
     lock.readLock().lock();
     try {
@@ -99,7 +99,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
         final String[] measurements = entry.getValue();
 
         // 1. try to get matched collectors from cache, if not success, match 
them by device
-        final Set<PipeRealtimeDataRegionCollector> collectorsFilteredByDevice =
+        final Set<PipeRealtimeDataRegionExtractor> collectorsFilteredByDevice =
             deviceToCollectorsCache.get(device, 
this::filterCollectorsByDevice);
         // this would not happen
         if (collectorsFilteredByDevice == null) {
@@ -166,10 +166,10 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     return matchedCollectors;
   }
 
-  private Set<PipeRealtimeDataRegionCollector> filterCollectorsByDevice(String 
device) {
-    final Set<PipeRealtimeDataRegionCollector> filteredCollectors = new 
HashSet<>();
+  private Set<PipeRealtimeDataRegionExtractor> filterCollectorsByDevice(String 
device) {
+    final Set<PipeRealtimeDataRegionExtractor> filteredCollectors = new 
HashSet<>();
 
-    for (PipeRealtimeDataRegionCollector collector : collectors) {
+    for (PipeRealtimeDataRegionExtractor collector : collectors) {
       String pattern = collector.getPattern();
       if (
       // for example, pattern is root.a.b and device is root.a.b.c
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java
index 6e84cdf0ce2..44b544329c2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.collector.realtime.matcher;
 
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector;
+import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
 
 import java.util.Set;
@@ -30,10 +30,10 @@ public interface PipeDataRegionMatcher {
    * Register a collector. If the collector's pattern matches the event's 
schema info, the event
    * will be assigned to the collector.
    */
-  void register(PipeRealtimeDataRegionCollector collector);
+  void register(PipeRealtimeDataRegionExtractor collector);
 
   /** Deregister a collector. */
-  void deregister(PipeRealtimeDataRegionCollector collector);
+  void deregister(PipeRealtimeDataRegionExtractor collector);
 
   /** Get the number of registered collectors in this matcher. */
   int getRegisterCount();
@@ -45,7 +45,7 @@ public interface PipeDataRegionMatcher {
    * @param event the event to be matched
    * @return the matched collectors
    */
-  Set<PipeRealtimeDataRegionCollector> match(PipeRealtimeCollectEvent event);
+  Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeCollectEvent event);
 
   /** Clear all the registered collectors and internal data structures. */
   void clear();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java
index a98bac82bdd..5c3f660f8e6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.config.constant;
 
-public class PipeCollectorConstant {
+public class PipeExtractorConstant {
 
   public static final String COLLECTOR_KEY = "collector";
 
@@ -36,7 +36,7 @@ public class PipeCollectorConstant {
   public static final String COLLECTOR_REALTIME_MODE_FILE = "file";
   public static final String COLLECTOR_REALTIME_MODE_LOG = "log";
 
-  private PipeCollectorConstant() {
+  private PipeExtractorConstant() {
     throw new IllegalStateException("Utility class");
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java
index 29810c46960..bd4999d68ab 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java
@@ -19,13 +19,13 @@
 
 package org.apache.iotdb.db.pipe.config.plugin.configuraion;
 
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
 
 public class PipeTaskRuntimeConfiguration
-    implements PipeCollectorRuntimeConfiguration,
+    implements PipeExtractorRuntimeConfiguration,
         PipeProcessorRuntimeConfiguration,
         PipeConnectorRuntimeConfiguration {
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 462ba9164f6..4f70f17674c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.concurrent.atomic.AtomicInteger;
@@ -125,7 +125,7 @@ public abstract class EnrichedEvent implements Event {
    * @return the pattern
    */
   public final String getPattern() {
-    return pattern == null ? 
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern;
+    return pattern == null ? 
PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern;
   }
 
   public abstract EnrichedEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 168400443c8..24f52d0ea87 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.event.common.tablet;
 
-import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -47,7 +47,7 @@ public class PipeRawTabletInsertionEvent implements 
TabletInsertionEvent {
   }
 
   public String getPattern() {
-    return pattern == null ? 
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern;
+    return pattern == null ? 
PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern;
   }
 
   /////////////////////////// TabletInsertionEvent ///////////////////////////
@@ -78,7 +78,7 @@ public class PipeRawTabletInsertionEvent implements 
TabletInsertionEvent {
     final String notNullPattern = getPattern();
 
     // if notNullPattern is "root", we don't need to convert, just return the 
original tablet
-    if 
(notNullPattern.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
+    if 
(notNullPattern.equals(PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
       return tablet;
     }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
index 70d21e85b3a..18ea33be4ff 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.processor;
 
-import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.PipeProcessor;
@@ -54,7 +54,7 @@ public class PipeDoNothingProcessor implements PipeProcessor {
       final EnrichedEvent enrichedEvent = (EnrichedEvent) tabletInsertionEvent;
       if (enrichedEvent
           .getPattern()
-          .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
+          .equals(PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
         eventCollector.collect(tabletInsertionEvent);
       } else {
         tabletInsertionEvent
@@ -86,7 +86,7 @@ public class PipeDoNothingProcessor implements PipeProcessor {
     if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
       final PipeTsFileInsertionEvent enrichedEvent =
           (PipeTsFileInsertionEvent) tsFileInsertionEvent;
-      if 
(enrichedEvent.getPattern().equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)
+      if 
(enrichedEvent.getPattern().equals(PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)
           && !enrichedEvent.hasTimeFilter()) {
         eventCollector.collect(tsFileInsertionEvent);
       } else {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 0d676ac4992..807c7b0c758 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -23,19 +23,19 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.collector.IoTDBDataRegionCollector;
-import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.collector.IoTDBDataRegionExtractor;
+import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
-import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.PipeExtractor;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 public class PipeTaskCollectorStage extends PipeTaskStage {
 
-  private final PipeCollector pipeCollector;
+  private final PipeExtractor pipeExtractor;
 
   public PipeTaskCollectorStage(
       String pipeName,
@@ -43,27 +43,27 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
       PipeParameters collectorParameters,
       TConsensusGroupId dataRegionId,
       PipeTaskMeta pipeTaskMeta) {
-    pipeCollector =
+    pipeExtractor =
         collectorParameters
                 .getStringOrDefault(
-                    PipeCollectorConstant.COLLECTOR_KEY,
-                    BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())
-                .equals(BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())
-            ? new IoTDBDataRegionCollector()
+                    PipeExtractorConstant.COLLECTOR_KEY,
+                    BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+                .equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+            ? new IoTDBDataRegionExtractor()
             : PipeAgent.plugin().reflectCollector(collectorParameters);
 
     // validate and customize should be called before createSubtask. this 
allows collector exposing
     // exceptions in advance.
     try {
       // 1. validate collector parameters
-      pipeCollector.validate(new PipeParameterValidator(collectorParameters));
+      pipeExtractor.validate(new PipeParameterValidator(collectorParameters));
 
       // 2. customize collector
       final PipeTaskRuntimeConfiguration runtimeConfiguration =
           new PipeTaskRuntimeConfiguration(
               new PipeTaskCollectorRuntimeEnvironment(
                   pipeName, creationTime, dataRegionId.getId(), pipeTaskMeta));
-      pipeCollector.customize(collectorParameters, runtimeConfiguration);
+      pipeExtractor.customize(collectorParameters, runtimeConfiguration);
     } catch (Exception e) {
       throw new PipeException(e.getMessage(), e);
     }
@@ -77,7 +77,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
   @Override
   public void startSubtask() throws PipeException {
     try {
-      pipeCollector.start();
+      pipeExtractor.start();
     } catch (Exception e) {
       throw new PipeException(e.getMessage(), e);
     }
@@ -91,13 +91,13 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
   @Override
   public void dropSubtask() throws PipeException {
     try {
-      pipeCollector.close();
+      pipeExtractor.close();
     } catch (Exception e) {
       throw new PipeException(e.getMessage(), e);
     }
   }
 
   public EventSupplier getEventSupplier() {
-    return pipeCollector::supply;
+    return pipeExtractor::supply;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 60fbc3798df..fade5393d72 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -51,7 +51,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
    * @param creationTime pipe creation time
    * @param pipeProcessorParameters used to create pipe processor
    * @param dataRegionId data region id
-   * @param pipeCollectorInputEventSupplier used to input events from pipe 
collector
+   * @param pipeExtractorInputEventSupplier used to input events from pipe 
extractor
    * @param pipeConnectorOutputPendingQueue used to output events to pipe 
connector
    */
   public PipeTaskProcessorStage(
@@ -59,7 +59,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       long creationTime,
       PipeParameters pipeProcessorParameters,
       TConsensusGroupId dataRegionId,
-      EventSupplier pipeCollectorInputEventSupplier,
+      EventSupplier pipeExtractorInputEventSupplier,
       BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
     final PipeProcessor pipeProcessor =
         pipeProcessorParameters
@@ -90,7 +90,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     this.pipeProcessorSubtask =
         new PipeProcessorSubtask(
             taskId,
-            pipeCollectorInputEventSupplier,
+            pipeExtractorInputEventSupplier,
             pipeProcessor,
             pipeConnectorOutputEventCollector);
   }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
index e0977828d75..e84b8195956 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.db.pipe.collector;
 
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector;
+import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.collector.realtime.matcher.CachedSchemaPatternMatcher;
-import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
@@ -49,7 +49,7 @@ public class CachedSchemaPatternMatcherTest {
 
   private CachedSchemaPatternMatcher matcher;
   private ExecutorService executorService;
-  private List<PipeRealtimeDataRegionCollector> collectorList;
+  private List<PipeRealtimeDataRegionExtractor> collectorList;
 
   @Before
   public void setUp() {
@@ -65,12 +65,12 @@ public class CachedSchemaPatternMatcherTest {
 
   @Test
   public void testCachedMatcher() throws Exception {
-    PipeRealtimeDataRegionCollector databaseCollector = new 
PipeRealtimeDataRegionFakeCollector();
+    PipeRealtimeDataRegionExtractor databaseCollector = new 
PipeRealtimeDataRegionFakeExtractor();
     databaseCollector.customize(
         new PipeParameters(
             new HashMap<String, String>() {
               {
-                put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root");
+                put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, "root");
               }
             }),
         new PipeTaskRuntimeConfiguration(new 
PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null)));
@@ -79,20 +79,20 @@ public class CachedSchemaPatternMatcherTest {
     int deviceCollectorNum = 10;
     int seriesCollectorNum = 10;
     for (int i = 0; i < deviceCollectorNum; i++) {
-      PipeRealtimeDataRegionCollector deviceCollector = new 
PipeRealtimeDataRegionFakeCollector();
+      PipeRealtimeDataRegionExtractor deviceCollector = new 
PipeRealtimeDataRegionFakeExtractor();
       int finalI1 = i;
       deviceCollector.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root." + 
finalI1);
+                  put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, "root." + 
finalI1);
                 }
               }),
           new PipeTaskRuntimeConfiguration(
               new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null)));
       collectorList.add(deviceCollector);
       for (int j = 0; j < seriesCollectorNum; j++) {
-        PipeRealtimeDataRegionCollector seriesCollector = new 
PipeRealtimeDataRegionFakeCollector();
+        PipeRealtimeDataRegionExtractor seriesCollector = new 
PipeRealtimeDataRegionFakeExtractor();
         int finalI = i;
         int finalJ = j;
         seriesCollector.customize(
@@ -100,7 +100,7 @@ public class CachedSchemaPatternMatcherTest {
                 new HashMap<String, String>() {
                   {
                     put(
-                        PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
+                        PipeExtractorConstant.COLLECTOR_PATTERN_KEY,
                         "root." + finalI + "." + finalJ);
                   }
                 }),
@@ -147,9 +147,9 @@ public class CachedSchemaPatternMatcherTest {
     future.get();
   }
 
-  public static class PipeRealtimeDataRegionFakeCollector extends 
PipeRealtimeDataRegionCollector {
+  public static class PipeRealtimeDataRegionFakeExtractor extends 
PipeRealtimeDataRegionExtractor {
 
-    public PipeRealtimeDataRegionFakeCollector() {}
+    public PipeRealtimeDataRegionFakeExtractor() {}
 
     @Override
     public Event supply() {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
index 1b12e5822f9..0ef1058c12a 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
@@ -25,10 +25,10 @@ import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector;
-import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridCollector;
+import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridExtractor;
 import 
org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener;
-import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
@@ -102,20 +102,20 @@ public class PipeRealtimeCollectTest {
   public void testRealtimeCollectProcess() {
     // set up realtime collector
 
-    try (PipeRealtimeDataRegionHybridCollector collector1 =
-            new PipeRealtimeDataRegionHybridCollector();
-        PipeRealtimeDataRegionHybridCollector collector2 =
-            new PipeRealtimeDataRegionHybridCollector();
-        PipeRealtimeDataRegionHybridCollector collector3 =
-            new PipeRealtimeDataRegionHybridCollector();
-        PipeRealtimeDataRegionHybridCollector collector4 =
-            new PipeRealtimeDataRegionHybridCollector()) {
+    try (PipeRealtimeDataRegionHybridExtractor collector1 =
+            new PipeRealtimeDataRegionHybridExtractor();
+        PipeRealtimeDataRegionHybridExtractor collector2 =
+            new PipeRealtimeDataRegionHybridExtractor();
+        PipeRealtimeDataRegionHybridExtractor collector3 =
+            new PipeRealtimeDataRegionHybridExtractor();
+        PipeRealtimeDataRegionHybridExtractor collector4 =
+            new PipeRealtimeDataRegionHybridExtractor()) {
 
       collector1.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1);
+                  put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern1);
                 }
               }),
           new PipeTaskRuntimeConfiguration(
@@ -125,7 +125,7 @@ public class PipeRealtimeCollectTest {
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2);
+                  put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern2);
                 }
               }),
           new PipeTaskRuntimeConfiguration(
@@ -135,7 +135,7 @@ public class PipeRealtimeCollectTest {
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1);
+                  put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern1);
                 }
               }),
           new PipeTaskRuntimeConfiguration(
@@ -145,15 +145,15 @@ public class PipeRealtimeCollectTest {
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2);
+                  put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern2);
                 }
               }),
           new PipeTaskRuntimeConfiguration(
               new PipeTaskCollectorRuntimeEnvironment(
                   "1", 1, Integer.parseInt(dataRegion2), null)));
 
-      PipeRealtimeDataRegionCollector[] collectors =
-          new PipeRealtimeDataRegionCollector[] {collector1, collector2, 
collector3, collector4};
+      PipeRealtimeDataRegionExtractor[] collectors =
+          new PipeRealtimeDataRegionExtractor[] {collector1, collector2, 
collector3, collector4};
 
       // start collector 0, 1
       collectors[0].start();
@@ -291,7 +291,7 @@ public class PipeRealtimeCollectTest {
   }
 
   private Future<?> listen(
-      PipeRealtimeDataRegionCollector collector, Function<Event, Integer> 
weight, int expectNum) {
+      PipeRealtimeDataRegionExtractor collector, Function<Event, Integer> 
weight, int expectNum) {
     return listenerService.submit(
         () -> {
           int eventNum = 0;


Reply via email to