This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch rename-collector-to-extractor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0d65281f41ec6c09d380e671841bf5913cd55932 Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Jun 24 00:52:16 2023 +0800 pipe rename: collector -> extractor --- .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 12 ++++---- .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 8 +++--- .../persistence/pipe/PipePluginInfo.java | 12 ++++---- .../impl/pipe/task/CreatePipeProcedureV2.java | 10 +++---- .../request/ConfigPhysicalPlanSerDeTest.java | 8 +++--- .../iotdb/confignode/persistence/PipeInfoTest.java | 6 ++-- .../runtime/PipeHandleMetaChangeProcedureTest.java | 2 +- .../impl/pipe/task/CreatePipeProcedureV2Test.java | 6 ++-- .../org/apache/iotdb/pipe/api/PipeExtractor.java | 10 +++---- ...java => PipeExtractorRuntimeConfiguration.java} | 0 .../src/main/thrift/confignode.thrift | 2 +- .../iotdb/commons/pipe/config/PipeConfig.java | 2 +- .../pipe/plugin/builtin/BuiltinPipePlugin.java | 4 +-- .../{collector => extractor}/IoTDBExtractor.java | 8 +++--- .../commons/pipe/task/meta/PipeStaticMeta.java | 32 +++++++++++----------- .../config/executor/ClusterConfigTaskExecutor.java | 2 +- .../iotdb/db/mpp/plan/parser/ASTVisitor.java | 20 +++++++------- .../statement/sys/pipe/CreatePipeStatement.java | 10 +++---- 18 files changed, 77 insertions(+), 77 deletions(-) diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index b73bb9bbd64..ffd2bdf6cf8 100644 --- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -515,20 +515,20 @@ migrateRegion // Pipe Task ========================================================================================= createPipe : CREATE PIPE pipeName=identifier - collectorAttributesClause? + extractorAttributesClause? processorAttributesClause? connectorAttributesClause ; -collectorAttributesClause - : WITH COLLECTOR +extractorAttributesClause + : WITH EXTRACTOR LR_BRACKET - (collectorAttributeClause COMMA)* collectorAttributeClause? + (extractorAttributeClause COMMA)* extractorAttributeClause? RR_BRACKET ; -collectorAttributeClause - : collectorKey=STRING_LITERAL OPERATOR_SEQ collectorValue=STRING_LITERAL +extractorAttributeClause + : extractorKey=STRING_LITERAL OPERATOR_SEQ extractorValue=STRING_LITERAL ; processorAttributesClause diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 index 9014377237c..9ed929a9b06 100644 --- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 +++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 @@ -129,10 +129,6 @@ CLUSTER : C L U S T E R ; -COLLECTOR - : C O L L E C T O R - ; - CONCAT : C O N C A T ; @@ -262,6 +258,10 @@ EXPLAIN : E X P L A I N ; +EXTRACTOR + : E X T R A C T O R + ; + FILL : F I L L ; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 4c6466cdf40..b5267a32e69 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -122,16 +122,16 @@ public class PipePluginInfo implements SnapshotProcessor { } public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) { - final PipeParameters collectorParameters = - new PipeParameters(createPipeRequest.getCollectorAttributes()); - final String collectorPluginName = - collectorParameters.getStringOrDefault( + final PipeParameters extractorParameters = + new PipeParameters(createPipeRequest.getExtractorAttributes()); + final String extractorPluginName = + extractorParameters.getStringOrDefault( PipeExtractorConstant.EXTRACTOR_KEY, IOTDB_EXTRACTOR.getPipePluginName()); - if (!pipePluginMetaKeeper.containsPipePlugin(collectorPluginName)) { + if (!pipePluginMetaKeeper.containsPipePlugin(extractorPluginName)) { final String exceptionMessage = String.format( "Failed to create pipe, the pipe extractor plugin %s does not exist", - collectorPluginName); + extractorPluginName); LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index 2bb5e7b4a07..072024a577d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -96,7 +96,7 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { new PipeStaticMeta( createPipeRequest.getPipeName(), System.currentTimeMillis(), - createPipeRequest.getCollectorAttributes(), + createPipeRequest.getExtractorAttributes(), createPipeRequest.getProcessorAttributes(), createPipeRequest.getConnectorAttributes()); @@ -204,8 +204,8 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { stream.writeShort(ProcedureType.CREATE_PIPE_PROCEDURE_V2.getTypeCode()); super.serialize(stream); ReadWriteIOUtils.write(createPipeRequest.getPipeName(), stream); - ReadWriteIOUtils.write(createPipeRequest.getCollectorAttributesSize(), stream); - for (Map.Entry<String, String> entry : createPipeRequest.getCollectorAttributes().entrySet()) { + ReadWriteIOUtils.write(createPipeRequest.getExtractorAttributesSize(), stream); + for (Map.Entry<String, String> entry : createPipeRequest.getExtractorAttributes().entrySet()) { ReadWriteIOUtils.write(entry.getKey(), stream); ReadWriteIOUtils.write(entry.getValue(), stream); } @@ -233,13 +233,13 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { createPipeRequest = new TCreatePipeReq() .setPipeName(ReadWriteIOUtils.readString(byteBuffer)) - .setCollectorAttributes(new HashMap<>()) + .setExtractorAttributes(new HashMap<>()) .setProcessorAttributes(new HashMap<>()) .setConnectorAttributes(new HashMap<>()); int size = ReadWriteIOUtils.readInt(byteBuffer); for (int i = 0; i < size; ++i) { createPipeRequest - .getCollectorAttributes() + .getExtractorAttributes() .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer)); } size = ReadWriteIOUtils.readInt(byteBuffer); diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index a60b8605d0e..8fd1e59f79c 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -1045,10 +1045,10 @@ public class ConfigPhysicalPlanSerDeTest { @Test public void CreatePipePlanV2Test() throws IOException { - Map<String, String> collectorAttributes = new HashMap<>(); + Map<String, String> extractorAttributes = new HashMap<>(); Map<String, String> processorAttributes = new HashMap<>(); Map<String, String> connectorAttributes = new HashMap<>(); - collectorAttributes.put("collector", "org.apache.iotdb.pipe.collector.DefaultCollector"); + extractorAttributes.put("extractor", "org.apache.iotdb.pipe.extractor.DefaultExtractor"); processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor"); connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocal.ThriftTransporter"); PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(new MinimumProgressIndex(), 1); @@ -1056,7 +1056,7 @@ public class ConfigPhysicalPlanSerDeTest { pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta); PipeStaticMeta pipeStaticMeta = new PipeStaticMeta( - "testPipe", 121, collectorAttributes, processorAttributes, connectorAttributes); + "testPipe", 121, extractorAttributes, processorAttributes, connectorAttributes); PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks); CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta); CreatePipePlanV2 createPipePlanV21 = @@ -1148,7 +1148,7 @@ public class ConfigPhysicalPlanSerDeTest { 123L, new HashMap() { { - put("collector-key", "collector-value"); + put("extractor-key", "extractor-value"); } }, new HashMap() { diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java index 51b771026c1..da7a2e24c05 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java @@ -67,10 +67,10 @@ public class PipeInfoTest { @Test public void testSnapshot() throws TException, IOException { - Map<String, String> collectorAttributes = new HashMap<>(); + Map<String, String> extractorAttributes = new HashMap<>(); Map<String, String> processorAttributes = new HashMap<>(); Map<String, String> connectorAttributes = new HashMap<>(); - collectorAttributes.put("collector", "org.apache.iotdb.pipe.collector.DefaultCollector"); + extractorAttributes.put("extractor", "org.apache.iotdb.pipe.extractor.DefaultExtractor"); processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor"); connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocal.ThriftTransporter"); PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(new MinimumProgressIndex(), 1); @@ -78,7 +78,7 @@ public class PipeInfoTest { pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta); PipeStaticMeta pipeStaticMeta = new PipeStaticMeta( - "testPipe", 121, collectorAttributes, processorAttributes, connectorAttributes); + "testPipe", 121, extractorAttributes, processorAttributes, connectorAttributes); PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks); CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta); pipeInfo.getPipeTaskInfo().createPipe(createPipePlanV2); diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java index 47a0dca1abd..b487b30ed6f 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java @@ -51,7 +51,7 @@ public class PipeHandleMetaChangeProcedureTest { 123L, new HashMap() { { - put("collector-key", "collector-value"); + put("extractor-key", "extractor-value"); } }, new HashMap() { diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java index cf151fe5957..4f070a8ea5d 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java @@ -39,10 +39,10 @@ public class CreatePipeProcedureV2Test { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); - Map<String, String> collectorAttributes = new HashMap<>(); + Map<String, String> extractorAttributes = new HashMap<>(); Map<String, String> processorAttributes = new HashMap<>(); Map<String, String> connectorAttributes = new HashMap<>(); - collectorAttributes.put("collector", "org.apache.iotdb.pipe.collector.DefaultCollector"); + extractorAttributes.put("extractor", "org.apache.iotdb.pipe.extractor.DefaultExtractor"); processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor"); connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocal.ThriftTransporter"); @@ -50,7 +50,7 @@ public class CreatePipeProcedureV2Test { new CreatePipeProcedureV2( new TCreatePipeReq() .setPipeName("testPipe") - .setCollectorAttributes(collectorAttributes) + .setExtractorAttributes(extractorAttributes) .setProcessorAttributes(processorAttributes) .setConnectorAttributes(connectorAttributes)); diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java index 4213da842bc..a99d3afed9e 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java @@ -34,7 +34,7 @@ import org.apache.iotdb.pipe.api.event.Event; * <p>The lifecycle of a PipeExtractor is as follows: * * <ul> - * <li>When a collaboration task is created, the KV pairs of `WITH COLLECTOR` clause in SQL are + * <li>When a collaboration task is created, the KV pairs of `WITH EXTRACTOR` clause in SQL are * parsed and the validation method {@link PipeExtractor#validate(PipeParameterValidator)} * will be called to validate the parameters. * <li>Before the collaboration task starts, the method {@link @@ -79,7 +79,7 @@ public interface PipeExtractor extends PipePlugin { throws Exception; /** - * Start the collector. After this method is called, events should be ready to be supplied by + * Start the extractor. After this method is called, events should be ready to be supplied by * {@link PipeExtractor#supply()}. This method is called after {@link * PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} is called. * @@ -88,11 +88,11 @@ public interface PipeExtractor extends PipePlugin { void start() throws Exception; /** - * Supply single event from the collector and the caller will send the event to the processor. + * Supply single event from the extractor and the caller will send the event to the processor. * This method is called after {@link PipeExtractor#start()} is called. * - * @return the event to be supplied. the event may be null if the collector has no more events at - * the moment, but the collector is still running for more events. + * @return the event to be supplied. the event may be null if the extractor has no more events at + * the moment, but the extractor is still running for more events. * @throws Exception the user can throw errors if necessary */ Event supply() throws Exception; diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeExtractorRuntimeConfiguration.java similarity index 100% rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java rename to iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeExtractorRuntimeConfiguration.java diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 7d3b990b8dd..794c02037bf 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -625,7 +625,7 @@ struct TGetAllPipeInfoResp{ struct TCreatePipeReq { 1: required string pipeName - 2: optional map<string, string> collectorAttributes + 2: optional map<string, string> extractorAttributes 3: optional map<string, string> processorAttributes 4: required map<string, string> connectorAttributes } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 2e54fc9c353..077048ac11e 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -53,7 +53,7 @@ public class PipeConfig { return COMMON_CONFIG.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs(); } - /////////////////////////////// Collector /////////////////////////////// + /////////////////////////////// Extractor /////////////////////////////// public int getPipeExtractorAssignerDisruptorRingBufferSize() { return COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferSize(); diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index 1bcbe17b4ad..065c8099369 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -19,17 +19,17 @@ package org.apache.iotdb.commons.pipe.plugin.builtin; -import org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBExtractor; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSyncConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnectorV1; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnectorV2; +import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor; import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor; public enum BuiltinPipePlugin { - // collectors + // extractors IOTDB_EXTRACTOR("iotdb_extractor", IoTDBExtractor.class), // processors diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBExtractor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/extractor/IoTDBExtractor.java similarity index 91% rename from node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBExtractor.java rename to node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/extractor/IoTDBExtractor.java index 9857f5dfd7d..f61635f9418 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBExtractor.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/extractor/IoTDBExtractor.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.commons.pipe.plugin.builtin.collector; +package org.apache.iotdb.commons.pipe.plugin.builtin.extractor; import org.apache.iotdb.pipe.api.PipeExtractor; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; @@ -26,10 +26,10 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; /** - * This class is a placeholder and should not be initialized. It represents the default collector - * when no collector is specified. There is a real implementation in the server module but cannot be + * This class is a placeholder and should not be initialized. It represents the default extractor + * when no extractor is specified. There is a real implementation in the server module but cannot be * imported here. The pipe agent in the server module will replace this class with the real - * implementation when initializing the collector. + * implementation when initializing the extractor. */ public class IoTDBExtractor implements PipeExtractor { @Override diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java index 1024b866dda..8fab26dbfd8 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java @@ -36,7 +36,7 @@ public class PipeStaticMeta { private String pipeName; private long creationTime; - private PipeParameters collectorParameters; + private PipeParameters extractorParameters; private PipeParameters processorParameters; private PipeParameters connectorParameters; @@ -45,12 +45,12 @@ public class PipeStaticMeta { public PipeStaticMeta( String pipeName, long creationTime, - Map<String, String> collectorAttributes, + Map<String, String> extractorAttributes, Map<String, String> processorAttributes, Map<String, String> connectorAttributes) { this.pipeName = pipeName; this.creationTime = creationTime; - collectorParameters = new PipeParameters(collectorAttributes); + extractorParameters = new PipeParameters(extractorAttributes); processorParameters = new PipeParameters(processorAttributes); connectorParameters = new PipeParameters(connectorAttributes); } @@ -64,7 +64,7 @@ public class PipeStaticMeta { } public PipeParameters getExtractorParameters() { - return collectorParameters; + return extractorParameters; } public PipeParameters getProcessorParameters() { @@ -86,8 +86,8 @@ public class PipeStaticMeta { ReadWriteIOUtils.write(pipeName, outputStream); ReadWriteIOUtils.write(creationTime, outputStream); - ReadWriteIOUtils.write(collectorParameters.getAttribute().size(), outputStream); - for (Map.Entry<String, String> entry : collectorParameters.getAttribute().entrySet()) { + ReadWriteIOUtils.write(extractorParameters.getAttribute().size(), outputStream); + for (Map.Entry<String, String> entry : extractorParameters.getAttribute().entrySet()) { ReadWriteIOUtils.write(entry.getKey(), outputStream); ReadWriteIOUtils.write(entry.getValue(), outputStream); } @@ -107,8 +107,8 @@ public class PipeStaticMeta { ReadWriteIOUtils.write(pipeName, outputStream); ReadWriteIOUtils.write(creationTime, outputStream); - ReadWriteIOUtils.write(collectorParameters.getAttribute().size(), outputStream); - for (Map.Entry<String, String> entry : collectorParameters.getAttribute().entrySet()) { + ReadWriteIOUtils.write(extractorParameters.getAttribute().size(), outputStream); + for (Map.Entry<String, String> entry : extractorParameters.getAttribute().entrySet()) { ReadWriteIOUtils.write(entry.getKey(), outputStream); ReadWriteIOUtils.write(entry.getValue(), outputStream); } @@ -130,7 +130,7 @@ public class PipeStaticMeta { pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(inputStream); pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(inputStream); - pipeStaticMeta.collectorParameters = new PipeParameters(new HashMap<>()); + pipeStaticMeta.extractorParameters = new PipeParameters(new HashMap<>()); pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>()); pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>()); @@ -138,7 +138,7 @@ public class PipeStaticMeta { for (int i = 0; i < size; ++i) { final String key = ReadWriteIOUtils.readString(inputStream); final String value = ReadWriteIOUtils.readString(inputStream); - pipeStaticMeta.collectorParameters.getAttribute().put(key, value); + pipeStaticMeta.extractorParameters.getAttribute().put(key, value); } size = ReadWriteIOUtils.readInt(inputStream); for (int i = 0; i < size; ++i) { @@ -162,7 +162,7 @@ public class PipeStaticMeta { pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer); pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer); - pipeStaticMeta.collectorParameters = new PipeParameters(new HashMap<>()); + pipeStaticMeta.extractorParameters = new PipeParameters(new HashMap<>()); pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>()); pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>()); @@ -170,7 +170,7 @@ public class PipeStaticMeta { for (int i = 0; i < size; ++i) { final String key = ReadWriteIOUtils.readString(byteBuffer); final String value = ReadWriteIOUtils.readString(byteBuffer); - pipeStaticMeta.collectorParameters.getAttribute().put(key, value); + pipeStaticMeta.extractorParameters.getAttribute().put(key, value); } size = ReadWriteIOUtils.readInt(byteBuffer); for (int i = 0; i < size; ++i) { @@ -199,7 +199,7 @@ public class PipeStaticMeta { PipeStaticMeta that = (PipeStaticMeta) obj; return pipeName.equals(that.pipeName) && creationTime == that.creationTime - && collectorParameters.equals(that.collectorParameters) + && extractorParameters.equals(that.extractorParameters) && processorParameters.equals(that.processorParameters) && connectorParameters.equals(that.connectorParameters); } @@ -207,7 +207,7 @@ public class PipeStaticMeta { @Override public int hashCode() { return Objects.hash( - pipeName, creationTime, collectorParameters, processorParameters, connectorParameters); + pipeName, creationTime, extractorParameters, processorParameters, connectorParameters); } @Override @@ -218,8 +218,8 @@ public class PipeStaticMeta { + '\'' + ", creationTime=" + creationTime - + ", collectorParameters=" - + collectorParameters.getAttribute() + + ", extractorParameters=" + + extractorParameters.getAttribute() + ", processorParameters=" + processorParameters.getAttribute() + ", connectorParameters=" diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java index a8ad1d92e15..0651ae0e32b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -1518,7 +1518,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { TCreatePipeReq req = new TCreatePipeReq() .setPipeName(createPipeStatement.getPipeName()) - .setCollectorAttributes(createPipeStatement.getCollectorAttributes()) + .setExtractorAttributes(createPipeStatement.getExtractorAttributes()) .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) .setConnectorAttributes(createPipeStatement.getConnectorAttributes()); TSStatus tsStatus = configNodeClient.createPipe(req); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java index ed6df252389..0096ea11900 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java @@ -3517,11 +3517,11 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { throw new SemanticException( "Not support for this sql in CREATEPIPE, please enter pipename or pipesinkname."); } - if (ctx.collectorAttributesClause() != null) { - createPipeStatement.setCollectorAttributes( - parseCollectorAttributesClause(ctx.collectorAttributesClause())); + if (ctx.extractorAttributesClause() != null) { + createPipeStatement.setExtractorAttributes( + parseExtractorAttributesClause(ctx.extractorAttributesClause())); } else { - createPipeStatement.setCollectorAttributes(new HashMap<>()); + createPipeStatement.setExtractorAttributes(new HashMap<>()); } if (ctx.processorAttributesClause() != null) { createPipeStatement.setProcessorAttributes( @@ -3534,14 +3534,14 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { return createPipeStatement; } - private Map<String, String> parseCollectorAttributesClause( - IoTDBSqlParser.CollectorAttributesClauseContext ctx) { + private Map<String, String> parseExtractorAttributesClause( + IoTDBSqlParser.ExtractorAttributesClauseContext ctx) { final Map<String, String> collectorMap = new HashMap<>(); - for (IoTDBSqlParser.CollectorAttributeClauseContext singleCtx : - ctx.collectorAttributeClause()) { + for (IoTDBSqlParser.ExtractorAttributeClauseContext singleCtx : + ctx.extractorAttributeClause()) { collectorMap.put( - parseStringLiteral(singleCtx.collectorKey.getText()), - parseStringLiteral(singleCtx.collectorValue.getText())); + parseStringLiteral(singleCtx.extractorKey.getText()), + parseStringLiteral(singleCtx.extractorValue.getText())); } return collectorMap; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/CreatePipeStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/CreatePipeStatement.java index 46337300c9f..99eb77d8a5c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/CreatePipeStatement.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/CreatePipeStatement.java @@ -33,7 +33,7 @@ import java.util.Map; public class CreatePipeStatement extends Statement implements IConfigStatement { private String pipeName; - private Map<String, String> collectorAttributes; + private Map<String, String> extractorAttributes; private Map<String, String> processorAttributes; private Map<String, String> connectorAttributes; @@ -45,8 +45,8 @@ public class CreatePipeStatement extends Statement implements IConfigStatement { return pipeName; } - public Map<String, String> getCollectorAttributes() { - return collectorAttributes; + public Map<String, String> getExtractorAttributes() { + return extractorAttributes; } public Map<String, String> getProcessorAttributes() { @@ -61,8 +61,8 @@ public class CreatePipeStatement extends Statement implements IConfigStatement { this.pipeName = pipeName; } - public void setCollectorAttributes(Map<String, String> collectorAttributes) { - this.collectorAttributes = collectorAttributes; + public void setExtractorAttributes(Map<String, String> extractorAttributes) { + this.extractorAttributes = extractorAttributes; } public void setProcessorAttributes(Map<String, String> processorAttributes) {
