This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rename-collector-to-extractor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0d65281f41ec6c09d380e671841bf5913cd55932
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Jun 24 00:52:16 2023 +0800

    pipe rename: collector -> extractor
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   | 12 ++++----
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |  8 +++---
 .../persistence/pipe/PipePluginInfo.java           | 12 ++++----
 .../impl/pipe/task/CreatePipeProcedureV2.java      | 10 +++----
 .../request/ConfigPhysicalPlanSerDeTest.java       |  8 +++---
 .../iotdb/confignode/persistence/PipeInfoTest.java |  6 ++--
 .../runtime/PipeHandleMetaChangeProcedureTest.java |  2 +-
 .../impl/pipe/task/CreatePipeProcedureV2Test.java  |  6 ++--
 .../org/apache/iotdb/pipe/api/PipeExtractor.java   | 10 +++----
 ...java => PipeExtractorRuntimeConfiguration.java} |  0
 .../src/main/thrift/confignode.thrift              |  2 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      |  2 +-
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |  4 +--
 .../{collector => extractor}/IoTDBExtractor.java   |  8 +++---
 .../commons/pipe/task/meta/PipeStaticMeta.java     | 32 +++++++++++-----------
 .../config/executor/ClusterConfigTaskExecutor.java |  2 +-
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       | 20 +++++++-------
 .../statement/sys/pipe/CreatePipeStatement.java    | 10 +++----
 18 files changed, 77 insertions(+), 77 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index b73bb9bbd64..ffd2bdf6cf8 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -515,20 +515,20 @@ migrateRegion
 // Pipe Task 
=========================================================================================
 createPipe
     : CREATE PIPE pipeName=identifier
-        collectorAttributesClause?
+        extractorAttributesClause?
         processorAttributesClause?
         connectorAttributesClause
     ;
 
-collectorAttributesClause
-    : WITH COLLECTOR
+extractorAttributesClause
+    : WITH EXTRACTOR
         LR_BRACKET
-        (collectorAttributeClause COMMA)* collectorAttributeClause?
+        (extractorAttributeClause COMMA)* extractorAttributeClause?
         RR_BRACKET
     ;
 
-collectorAttributeClause
-    : collectorKey=STRING_LITERAL OPERATOR_SEQ collectorValue=STRING_LITERAL
+extractorAttributeClause
+    : extractorKey=STRING_LITERAL OPERATOR_SEQ extractorValue=STRING_LITERAL
     ;
 
 processorAttributesClause
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 9014377237c..9ed929a9b06 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -129,10 +129,6 @@ CLUSTER
     : C L U S T E R
     ;
 
-COLLECTOR
-    : C O L L E C T O R
-    ;
-
 CONCAT
     : C O N C A T
     ;
@@ -262,6 +258,10 @@ EXPLAIN
     : E X P L A I N
     ;
 
+EXTRACTOR
+    : E X T R A C T O R
+    ;
+
 FILL
     : F I L L
     ;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 4c6466cdf40..b5267a32e69 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -122,16 +122,16 @@ public class PipePluginInfo implements SnapshotProcessor {
   }
 
   public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {
-    final PipeParameters collectorParameters =
-        new PipeParameters(createPipeRequest.getCollectorAttributes());
-    final String collectorPluginName =
-        collectorParameters.getStringOrDefault(
+    final PipeParameters extractorParameters =
+        new PipeParameters(createPipeRequest.getExtractorAttributes());
+    final String extractorPluginName =
+        extractorParameters.getStringOrDefault(
             PipeExtractorConstant.EXTRACTOR_KEY, 
IOTDB_EXTRACTOR.getPipePluginName());
-    if (!pipePluginMetaKeeper.containsPipePlugin(collectorPluginName)) {
+    if (!pipePluginMetaKeeper.containsPipePlugin(extractorPluginName)) {
       final String exceptionMessage =
           String.format(
               "Failed to create pipe, the pipe extractor plugin %s does not 
exist",
-              collectorPluginName);
+              extractorPluginName);
       LOGGER.warn(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 2bb5e7b4a07..072024a577d 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -96,7 +96,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         new PipeStaticMeta(
             createPipeRequest.getPipeName(),
             System.currentTimeMillis(),
-            createPipeRequest.getCollectorAttributes(),
+            createPipeRequest.getExtractorAttributes(),
             createPipeRequest.getProcessorAttributes(),
             createPipeRequest.getConnectorAttributes());
 
@@ -204,8 +204,8 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     stream.writeShort(ProcedureType.CREATE_PIPE_PROCEDURE_V2.getTypeCode());
     super.serialize(stream);
     ReadWriteIOUtils.write(createPipeRequest.getPipeName(), stream);
-    ReadWriteIOUtils.write(createPipeRequest.getCollectorAttributesSize(), 
stream);
-    for (Map.Entry<String, String> entry : 
createPipeRequest.getCollectorAttributes().entrySet()) {
+    ReadWriteIOUtils.write(createPipeRequest.getExtractorAttributesSize(), 
stream);
+    for (Map.Entry<String, String> entry : 
createPipeRequest.getExtractorAttributes().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), stream);
       ReadWriteIOUtils.write(entry.getValue(), stream);
     }
@@ -233,13 +233,13 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     createPipeRequest =
         new TCreatePipeReq()
             .setPipeName(ReadWriteIOUtils.readString(byteBuffer))
-            .setCollectorAttributes(new HashMap<>())
+            .setExtractorAttributes(new HashMap<>())
             .setProcessorAttributes(new HashMap<>())
             .setConnectorAttributes(new HashMap<>());
     int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       createPipeRequest
-          .getCollectorAttributes()
+          .getExtractorAttributes()
           .put(ReadWriteIOUtils.readString(byteBuffer), 
ReadWriteIOUtils.readString(byteBuffer));
     }
     size = ReadWriteIOUtils.readInt(byteBuffer);
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index a60b8605d0e..8fd1e59f79c 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -1045,10 +1045,10 @@ public class ConfigPhysicalPlanSerDeTest {
 
   @Test
   public void CreatePipePlanV2Test() throws IOException {
-    Map<String, String> collectorAttributes = new HashMap<>();
+    Map<String, String> extractorAttributes = new HashMap<>();
     Map<String, String> processorAttributes = new HashMap<>();
     Map<String, String> connectorAttributes = new HashMap<>();
-    collectorAttributes.put("collector", 
"org.apache.iotdb.pipe.collector.DefaultCollector");
+    extractorAttributes.put("extractor", 
"org.apache.iotdb.pipe.extractor.DefaultExtractor");
     processorAttributes.put("processor", 
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
     connectorAttributes.put("connector", 
"org.apache.iotdb.pipe.protocal.ThriftTransporter");
     PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(new MinimumProgressIndex(), 
1);
@@ -1056,7 +1056,7 @@ public class ConfigPhysicalPlanSerDeTest {
     pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
         new PipeStaticMeta(
-            "testPipe", 121, collectorAttributes, processorAttributes, 
connectorAttributes);
+            "testPipe", 121, extractorAttributes, processorAttributes, 
connectorAttributes);
     PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
     CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, 
pipeRuntimeMeta);
     CreatePipePlanV2 createPipePlanV21 =
@@ -1148,7 +1148,7 @@ public class ConfigPhysicalPlanSerDeTest {
             123L,
             new HashMap() {
               {
-                put("collector-key", "collector-value");
+                put("extractor-key", "extractor-value");
               }
             },
             new HashMap() {
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index 51b771026c1..da7a2e24c05 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -67,10 +67,10 @@ public class PipeInfoTest {
 
   @Test
   public void testSnapshot() throws TException, IOException {
-    Map<String, String> collectorAttributes = new HashMap<>();
+    Map<String, String> extractorAttributes = new HashMap<>();
     Map<String, String> processorAttributes = new HashMap<>();
     Map<String, String> connectorAttributes = new HashMap<>();
-    collectorAttributes.put("collector", 
"org.apache.iotdb.pipe.collector.DefaultCollector");
+    extractorAttributes.put("extractor", 
"org.apache.iotdb.pipe.extractor.DefaultExtractor");
     processorAttributes.put("processor", 
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
     connectorAttributes.put("connector", 
"org.apache.iotdb.pipe.protocal.ThriftTransporter");
     PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(new MinimumProgressIndex(), 
1);
@@ -78,7 +78,7 @@ public class PipeInfoTest {
     pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
         new PipeStaticMeta(
-            "testPipe", 121, collectorAttributes, processorAttributes, 
connectorAttributes);
+            "testPipe", 121, extractorAttributes, processorAttributes, 
connectorAttributes);
     PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
     CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, 
pipeRuntimeMeta);
     pipeInfo.getPipeTaskInfo().createPipe(createPipePlanV2);
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
index 47a0dca1abd..b487b30ed6f 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
@@ -51,7 +51,7 @@ public class PipeHandleMetaChangeProcedureTest {
             123L,
             new HashMap() {
               {
-                put("collector-key", "collector-value");
+                put("extractor-key", "extractor-value");
               }
             },
             new HashMap() {
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
index cf151fe5957..4f070a8ea5d 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
@@ -39,10 +39,10 @@ public class CreatePipeProcedureV2Test {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
 
-    Map<String, String> collectorAttributes = new HashMap<>();
+    Map<String, String> extractorAttributes = new HashMap<>();
     Map<String, String> processorAttributes = new HashMap<>();
     Map<String, String> connectorAttributes = new HashMap<>();
-    collectorAttributes.put("collector", 
"org.apache.iotdb.pipe.collector.DefaultCollector");
+    extractorAttributes.put("extractor", 
"org.apache.iotdb.pipe.extractor.DefaultExtractor");
     processorAttributes.put("processor", 
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
     connectorAttributes.put("connector", 
"org.apache.iotdb.pipe.protocal.ThriftTransporter");
 
@@ -50,7 +50,7 @@ public class CreatePipeProcedureV2Test {
         new CreatePipeProcedureV2(
             new TCreatePipeReq()
                 .setPipeName("testPipe")
-                .setCollectorAttributes(collectorAttributes)
+                .setExtractorAttributes(extractorAttributes)
                 .setProcessorAttributes(processorAttributes)
                 .setConnectorAttributes(connectorAttributes));
 
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java
index 4213da842bc..a99d3afed9e 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.pipe.api.event.Event;
  * <p>The lifecycle of a PipeExtractor is as follows:
  *
  * <ul>
- *   <li>When a collaboration task is created, the KV pairs of `WITH 
COLLECTOR` clause in SQL are
+ *   <li>When a collaboration task is created, the KV pairs of `WITH 
EXTRACTOR` clause in SQL are
  *       parsed and the validation method {@link 
PipeExtractor#validate(PipeParameterValidator)}
  *       will be called to validate the parameters.
  *   <li>Before the collaboration task starts, the method {@link
@@ -79,7 +79,7 @@ public interface PipeExtractor extends PipePlugin {
       throws Exception;
 
   /**
-   * Start the collector. After this method is called, events should be ready 
to be supplied by
+   * Start the extractor. After this method is called, events should be ready 
to be supplied by
    * {@link PipeExtractor#supply()}. This method is called after {@link
    * PipeExtractor#customize(PipeParameters, 
PipeExtractorRuntimeConfiguration)} is called.
    *
@@ -88,11 +88,11 @@ public interface PipeExtractor extends PipePlugin {
   void start() throws Exception;
 
   /**
-   * Supply single event from the collector and the caller will send the event 
to the processor.
+   * Supply single event from the extractor and the caller will send the event 
to the processor.
    * This method is called after {@link PipeExtractor#start()} is called.
    *
-   * @return the event to be supplied. the event may be null if the collector 
has no more events at
-   *     the moment, but the collector is still running for more events.
+   * @return the event to be supplied. the event may be null if the extractor 
has no more events at
+   *     the moment, but the extractor is still running for more events.
    * @throws Exception the user can throw errors if necessary
    */
   Event supply() throws Exception;
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeExtractorRuntimeConfiguration.java
similarity index 100%
rename from 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java
rename to 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeExtractorRuntimeConfiguration.java
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 7d3b990b8dd..794c02037bf 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -625,7 +625,7 @@ struct TGetAllPipeInfoResp{
 
 struct TCreatePipeReq {
     1: required string pipeName
-    2: optional map<string, string> collectorAttributes
+    2: optional map<string, string> extractorAttributes
     3: optional map<string, string> processorAttributes
     4: required map<string, string> connectorAttributes
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 2e54fc9c353..077048ac11e 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -53,7 +53,7 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
   }
 
-  /////////////////////////////// Collector ///////////////////////////////
+  /////////////////////////////// Extractor ///////////////////////////////
 
   public int getPipeExtractorAssignerDisruptorRingBufferSize() {
     return COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferSize();
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 1bcbe17b4ad..065c8099369 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -19,17 +19,17 @@
 
 package org.apache.iotdb.commons.pipe.plugin.builtin;
 
-import org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBExtractor;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSyncConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnectorV1;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnectorV2;
+import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
 
 public enum BuiltinPipePlugin {
 
-  // collectors
+  // extractors
   IOTDB_EXTRACTOR("iotdb_extractor", IoTDBExtractor.class),
 
   // processors
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBExtractor.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/extractor/IoTDBExtractor.java
similarity index 91%
rename from 
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBExtractor.java
rename to 
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/extractor/IoTDBExtractor.java
index 9857f5dfd7d..f61635f9418 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBExtractor.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/extractor/IoTDBExtractor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.plugin.builtin.collector;
+package org.apache.iotdb.commons.pipe.plugin.builtin.extractor;
 
 import org.apache.iotdb.pipe.api.PipeExtractor;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
@@ -26,10 +26,10 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
 /**
- * This class is a placeholder and should not be initialized. It represents 
the default collector
- * when no collector is specified. There is a real implementation in the 
server module but cannot be
+ * This class is a placeholder and should not be initialized. It represents 
the default extractor
+ * when no extractor is specified. There is a real implementation in the 
server module but cannot be
  * imported here. The pipe agent in the server module will replace this class 
with the real
- * implementation when initializing the collector.
+ * implementation when initializing the extractor.
  */
 public class IoTDBExtractor implements PipeExtractor {
   @Override
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index 1024b866dda..8fab26dbfd8 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -36,7 +36,7 @@ public class PipeStaticMeta {
   private String pipeName;
   private long creationTime;
 
-  private PipeParameters collectorParameters;
+  private PipeParameters extractorParameters;
   private PipeParameters processorParameters;
   private PipeParameters connectorParameters;
 
@@ -45,12 +45,12 @@ public class PipeStaticMeta {
   public PipeStaticMeta(
       String pipeName,
       long creationTime,
-      Map<String, String> collectorAttributes,
+      Map<String, String> extractorAttributes,
       Map<String, String> processorAttributes,
       Map<String, String> connectorAttributes) {
     this.pipeName = pipeName;
     this.creationTime = creationTime;
-    collectorParameters = new PipeParameters(collectorAttributes);
+    extractorParameters = new PipeParameters(extractorAttributes);
     processorParameters = new PipeParameters(processorAttributes);
     connectorParameters = new PipeParameters(connectorAttributes);
   }
@@ -64,7 +64,7 @@ public class PipeStaticMeta {
   }
 
   public PipeParameters getExtractorParameters() {
-    return collectorParameters;
+    return extractorParameters;
   }
 
   public PipeParameters getProcessorParameters() {
@@ -86,8 +86,8 @@ public class PipeStaticMeta {
     ReadWriteIOUtils.write(pipeName, outputStream);
     ReadWriteIOUtils.write(creationTime, outputStream);
 
-    ReadWriteIOUtils.write(collectorParameters.getAttribute().size(), 
outputStream);
-    for (Map.Entry<String, String> entry : 
collectorParameters.getAttribute().entrySet()) {
+    ReadWriteIOUtils.write(extractorParameters.getAttribute().size(), 
outputStream);
+    for (Map.Entry<String, String> entry : 
extractorParameters.getAttribute().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       ReadWriteIOUtils.write(entry.getValue(), outputStream);
     }
@@ -107,8 +107,8 @@ public class PipeStaticMeta {
     ReadWriteIOUtils.write(pipeName, outputStream);
     ReadWriteIOUtils.write(creationTime, outputStream);
 
-    ReadWriteIOUtils.write(collectorParameters.getAttribute().size(), 
outputStream);
-    for (Map.Entry<String, String> entry : 
collectorParameters.getAttribute().entrySet()) {
+    ReadWriteIOUtils.write(extractorParameters.getAttribute().size(), 
outputStream);
+    for (Map.Entry<String, String> entry : 
extractorParameters.getAttribute().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       ReadWriteIOUtils.write(entry.getValue(), outputStream);
     }
@@ -130,7 +130,7 @@ public class PipeStaticMeta {
     pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(inputStream);
     pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(inputStream);
 
-    pipeStaticMeta.collectorParameters = new PipeParameters(new HashMap<>());
+    pipeStaticMeta.extractorParameters = new PipeParameters(new HashMap<>());
     pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
     pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>());
 
@@ -138,7 +138,7 @@ public class PipeStaticMeta {
     for (int i = 0; i < size; ++i) {
       final String key = ReadWriteIOUtils.readString(inputStream);
       final String value = ReadWriteIOUtils.readString(inputStream);
-      pipeStaticMeta.collectorParameters.getAttribute().put(key, value);
+      pipeStaticMeta.extractorParameters.getAttribute().put(key, value);
     }
     size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
@@ -162,7 +162,7 @@ public class PipeStaticMeta {
     pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
     pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer);
 
-    pipeStaticMeta.collectorParameters = new PipeParameters(new HashMap<>());
+    pipeStaticMeta.extractorParameters = new PipeParameters(new HashMap<>());
     pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
     pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>());
 
@@ -170,7 +170,7 @@ public class PipeStaticMeta {
     for (int i = 0; i < size; ++i) {
       final String key = ReadWriteIOUtils.readString(byteBuffer);
       final String value = ReadWriteIOUtils.readString(byteBuffer);
-      pipeStaticMeta.collectorParameters.getAttribute().put(key, value);
+      pipeStaticMeta.extractorParameters.getAttribute().put(key, value);
     }
     size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
@@ -199,7 +199,7 @@ public class PipeStaticMeta {
     PipeStaticMeta that = (PipeStaticMeta) obj;
     return pipeName.equals(that.pipeName)
         && creationTime == that.creationTime
-        && collectorParameters.equals(that.collectorParameters)
+        && extractorParameters.equals(that.extractorParameters)
         && processorParameters.equals(that.processorParameters)
         && connectorParameters.equals(that.connectorParameters);
   }
@@ -207,7 +207,7 @@ public class PipeStaticMeta {
   @Override
   public int hashCode() {
     return Objects.hash(
-        pipeName, creationTime, collectorParameters, processorParameters, 
connectorParameters);
+        pipeName, creationTime, extractorParameters, processorParameters, 
connectorParameters);
   }
 
   @Override
@@ -218,8 +218,8 @@ public class PipeStaticMeta {
         + '\''
         + ", creationTime="
         + creationTime
-        + ", collectorParameters="
-        + collectorParameters.getAttribute()
+        + ", extractorParameters="
+        + extractorParameters.getAttribute()
         + ", processorParameters="
         + processorParameters.getAttribute()
         + ", connectorParameters="
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index a8ad1d92e15..0651ae0e32b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1518,7 +1518,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       TCreatePipeReq req =
           new TCreatePipeReq()
               .setPipeName(createPipeStatement.getPipeName())
-              
.setCollectorAttributes(createPipeStatement.getCollectorAttributes())
+              
.setExtractorAttributes(createPipeStatement.getExtractorAttributes())
               
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
               
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
       TSStatus tsStatus = configNodeClient.createPipe(req);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index ed6df252389..0096ea11900 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -3517,11 +3517,11 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
       throw new SemanticException(
           "Not support for this sql in CREATEPIPE, please enter pipename or 
pipesinkname.");
     }
-    if (ctx.collectorAttributesClause() != null) {
-      createPipeStatement.setCollectorAttributes(
-          parseCollectorAttributesClause(ctx.collectorAttributesClause()));
+    if (ctx.extractorAttributesClause() != null) {
+      createPipeStatement.setExtractorAttributes(
+          parseExtractorAttributesClause(ctx.extractorAttributesClause()));
     } else {
-      createPipeStatement.setCollectorAttributes(new HashMap<>());
+      createPipeStatement.setExtractorAttributes(new HashMap<>());
     }
     if (ctx.processorAttributesClause() != null) {
       createPipeStatement.setProcessorAttributes(
@@ -3534,14 +3534,14 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
     return createPipeStatement;
   }
 
-  private Map<String, String> parseCollectorAttributesClause(
-      IoTDBSqlParser.CollectorAttributesClauseContext ctx) {
+  private Map<String, String> parseExtractorAttributesClause(
+      IoTDBSqlParser.ExtractorAttributesClauseContext ctx) {
     final Map<String, String> collectorMap = new HashMap<>();
-    for (IoTDBSqlParser.CollectorAttributeClauseContext singleCtx :
-        ctx.collectorAttributeClause()) {
+    for (IoTDBSqlParser.ExtractorAttributeClauseContext singleCtx :
+        ctx.extractorAttributeClause()) {
       collectorMap.put(
-          parseStringLiteral(singleCtx.collectorKey.getText()),
-          parseStringLiteral(singleCtx.collectorValue.getText()));
+          parseStringLiteral(singleCtx.extractorKey.getText()),
+          parseStringLiteral(singleCtx.extractorValue.getText()));
     }
     return collectorMap;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/CreatePipeStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/CreatePipeStatement.java
index 46337300c9f..99eb77d8a5c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/CreatePipeStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/CreatePipeStatement.java
@@ -33,7 +33,7 @@ import java.util.Map;
 public class CreatePipeStatement extends Statement implements IConfigStatement 
{
 
   private String pipeName;
-  private Map<String, String> collectorAttributes;
+  private Map<String, String> extractorAttributes;
   private Map<String, String> processorAttributes;
   private Map<String, String> connectorAttributes;
 
@@ -45,8 +45,8 @@ public class CreatePipeStatement extends Statement implements 
IConfigStatement {
     return pipeName;
   }
 
-  public Map<String, String> getCollectorAttributes() {
-    return collectorAttributes;
+  public Map<String, String> getExtractorAttributes() {
+    return extractorAttributes;
   }
 
   public Map<String, String> getProcessorAttributes() {
@@ -61,8 +61,8 @@ public class CreatePipeStatement extends Statement implements 
IConfigStatement {
     this.pipeName = pipeName;
   }
 
-  public void setCollectorAttributes(Map<String, String> collectorAttributes) {
-    this.collectorAttributes = collectorAttributes;
+  public void setExtractorAttributes(Map<String, String> extractorAttributes) {
+    this.extractorAttributes = extractorAttributes;
   }
 
   public void setProcessorAttributes(Map<String, String> processorAttributes) {

Reply via email to