This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch rename-collector-to-extractor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 64e04dbef55e0d5dc004bc6ae1d465a8290af952 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Jun 23 22:27:00 2023 +0800 pipe rename: collector -> extractor --- .../persistence/pipe/PipePluginInfo.java | 8 +-- .../org/apache/iotdb/pipe/api/PipeConnector.java | 2 +- .../api/{PipeCollector.java => PipeExtractor.java} | 42 ++++++++-------- .../org/apache/iotdb/pipe/api/PipeProcessor.java | 4 +- .../PipeCollectorRuntimeConfiguration.java | 2 +- .../src/main/thrift/confignode.thrift | 2 +- .../resources/conf/iotdb-common.properties | 8 +-- .../apache/iotdb/commons/conf/CommonConfig.java | 44 ++++++++-------- .../iotdb/commons/conf/CommonDescriptor.java | 24 ++++----- .../iotdb/commons/pipe/config/PipeConfig.java | 26 +++++----- .../pipe/plugin/builtin/BuiltinPipePlugin.java | 4 +- .../{IoTDBCollector.java => IoTDBExtractor.java} | 8 +-- .../db/mpp/common/header/ColumnHeaderConstant.java | 4 +- .../execution/config/sys/pipe/ShowPipeTask.java | 2 +- .../db/pipe/agent/plugin/PipePluginAgent.java | 12 ++--- ...ollector.java => IoTDBDataRegionExtractor.java} | 58 +++++++++++----------- ...java => PipeHistoricalDataRegionExtractor.java} | 4 +- ...> PipeHistoricalDataRegionTsFileExtractor.java} | 20 ++++---- ...r.java => PipeRealtimeDataRegionExtractor.java} | 16 +++--- ...va => PipeRealtimeDataRegionFakeExtractor.java} | 6 +-- ... => PipeRealtimeDataRegionHybridExtractor.java} | 14 +++--- ...ava => PipeRealtimeDataRegionLogExtractor.java} | 10 ++-- ... => PipeRealtimeDataRegionTsFileExtractor.java} | 10 ++-- .../realtime/assigner/DisruptorQueue.java | 2 +- .../realtime/assigner/PipeDataRegionAssigner.java | 6 +-- .../pipe/collector/realtime/epoch/TsFileEpoch.java | 8 +-- .../listener/PipeInsertionDataNodeListener.java | 6 +-- .../matcher/CachedSchemaPatternMatcher.java | 24 ++++----- .../realtime/matcher/PipeDataRegionMatcher.java | 8 +-- .../config/constant/PipeCollectorConstant.java | 4 +- .../configuraion/PipeTaskRuntimeConfiguration.java | 4 +- .../apache/iotdb/db/pipe/event/EnrichedEvent.java | 4 +- .../common/tablet/PipeRawTabletInsertionEvent.java | 6 +-- .../db/pipe/processor/PipeDoNothingProcessor.java | 6 +-- .../db/pipe/task/stage/PipeTaskCollectorStage.java | 28 +++++------ .../db/pipe/task/stage/PipeTaskProcessorStage.java | 6 +-- .../collector/CachedSchemaPatternMatcherTest.java | 22 ++++---- .../db/pipe/collector/PipeRealtimeCollectTest.java | 36 +++++++------- 38 files changed, 250 insertions(+), 250 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 784bf7df744..7a24618d6a7 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -34,8 +34,8 @@ import org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTabl import org.apache.iotdb.confignode.consensus.response.udf.JarResp; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.consensus.common.DataSet; -import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -57,7 +57,7 @@ import java.util.Objects; import java.util.concurrent.locks.ReentrantLock; import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.DO_NOTHING_PROCESSOR; -import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_COLLECTOR; +import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_EXTRACTOR; public class PipePluginInfo implements SnapshotProcessor { @@ -126,11 +126,11 @@ public class PipePluginInfo implements SnapshotProcessor { new PipeParameters(createPipeRequest.getCollectorAttributes()); final String collectorPluginName = collectorParameters.getStringOrDefault( - PipeCollectorConstant.COLLECTOR_KEY, IOTDB_COLLECTOR.getPipePluginName()); + PipeExtractorConstant.COLLECTOR_KEY, IOTDB_EXTRACTOR.getPipePluginName()); if (!pipePluginMetaKeeper.containsPipePlugin(collectorPluginName)) { final String exceptionMessage = String.format( - "Failed to create pipe, the pipe collector plugin %s does not exist", + "Failed to create pipe, the pipe extractor plugin %s does not exist", collectorPluginName); LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java index 083590c1aef..5d14da1e3aa 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java @@ -46,7 +46,7 @@ import org.apache.iotdb.pipe.api.exception.PipeConnectionException; * PipeConnector#handshake()} will be called to create a connection with sink. * <li>While the collaboration task is in progress: * <ul> - * <li>PipeCollector captures the events and wraps them into three types of Event instances. + * <li>PipeExtractor captures the events and wraps them into three types of Event instances. * <li>PipeProcessor processes the event and then passes them to the PipeConnector. * <li>PipeConnector serializes the events into binaries and send them to sinks. The * following 3 methods will be called: {@link diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java similarity index 72% rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java rename to iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java index 8771febead4..4213da842bc 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeExtractor.java @@ -19,40 +19,40 @@ package org.apache.iotdb.pipe.api; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; /** - * PipeCollector + * PipeExtractor * - * <p>PipeCollector is responsible for capturing events from sources. + * <p>PipeExtractor is responsible for capturing events from sources. * - * <p>Various data sources can be supported by implementing different PipeCollector classes. + * <p>Various data sources can be supported by implementing different PipeExtractor classes. * - * <p>The lifecycle of a PipeCollector is as follows: + * <p>The lifecycle of a PipeExtractor is as follows: * * <ul> * <li>When a collaboration task is created, the KV pairs of `WITH COLLECTOR` clause in SQL are - * parsed and the validation method {@link PipeCollector#validate(PipeParameterValidator)} + * parsed and the validation method {@link PipeExtractor#validate(PipeParameterValidator)} * will be called to validate the parameters. * <li>Before the collaboration task starts, the method {@link - * PipeCollector#customize(PipeParameters, PipeCollectorRuntimeConfiguration)} will be called - * to config the runtime behavior of the PipeCollector. - * <li>Then the method {@link PipeCollector#start()} will be called to start the PipeCollector. - * <li>While the collaboration task is in progress, the method {@link PipeCollector#supply()} will + * PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} will be called + * to config the runtime behavior of the PipeExtractor. + * <li>Then the method {@link PipeExtractor#start()} will be called to start the PipeExtractor. + * <li>While the collaboration task is in progress, the method {@link PipeExtractor#supply()} will * be called to capture events from sources and then the events will be passed to the * PipeProcessor. - * <li>The method {@link PipeCollector#close()} will be called when the collaboration task is + * <li>The method {@link PipeExtractor#close()} will be called when the collaboration task is * cancelled (the `DROP PIPE` command is executed). * </ul> */ -public interface PipeCollector extends PipePlugin { +public interface PipeExtractor extends PipePlugin { /** * This method is mainly used to validate {@link PipeParameters} and it is executed before {@link - * PipeCollector#customize(PipeParameters, PipeCollectorRuntimeConfiguration)} is called. + * PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} is called. * * @param validator the validator used to validate {@link PipeParameters} * @throws Exception if any parameter is not valid @@ -60,28 +60,28 @@ public interface PipeCollector extends PipePlugin { void validate(PipeParameterValidator validator) throws Exception; /** - * This method is mainly used to customize PipeCollector. In this method, the user can do the + * This method is mainly used to customize PipeExtractor. In this method, the user can do the * following things: * * <ul> * <li>Use PipeParameters to parse key-value pair attributes entered by the user. - * <li>Set the running configurations in PipeCollectorRuntimeConfiguration. + * <li>Set the running configurations in PipeExtractorRuntimeConfiguration. * </ul> * * <p>This method is called after the method {@link - * PipeCollector#validate(PipeParameterValidator)} is called. + * PipeExtractor#validate(PipeParameterValidator)} is called. * * @param parameters used to parse the input parameters entered by the user - * @param configuration used to set the required properties of the running PipeCollector + * @param configuration used to set the required properties of the running PipeExtractor * @throws Exception the user can throw errors if necessary */ - void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) + void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration) throws Exception; /** * Start the collector. After this method is called, events should be ready to be supplied by - * {@link PipeCollector#supply()}. This method is called after {@link - * PipeCollector#customize(PipeParameters, PipeCollectorRuntimeConfiguration)} is called. + * {@link PipeExtractor#supply()}. This method is called after {@link + * PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} is called. * * @throws Exception the user can throw errors if necessary */ @@ -89,7 +89,7 @@ public interface PipeCollector extends PipePlugin { /** * Supply single event from the collector and the caller will send the event to the processor. - * This method is called after {@link PipeCollector#start()} is called. + * This method is called after {@link PipeExtractor#start()} is called. * * @return the event to be supplied. the event may be null if the collector has no more events at * the moment, but the collector is still running for more events. diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java index 3c35e8b9e13..f752aafef3a 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java @@ -30,7 +30,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; /** * PipeProcessor * - * <p>PipeProcessor is used to filter and transform the Event formed by the PipeCollector. + * <p>PipeProcessor is used to filter and transform the Event formed by the PipeExtractor. * * <p>The lifecycle of a PipeProcessor is as follows: * @@ -43,7 +43,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; * to config the runtime behavior of the PipeProcessor. * <li>While the collaboration task is in progress: * <ul> - * <li>PipeCollector captures the events and wraps them into three types of Event instances. + * <li>PipeExtractor captures the events and wraps them into three types of Event instances. * <li>PipeProcessor processes the event and then passes them to the PipeConnector. The * following 3 methods will be called: {@link * PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java index 071bfeb60c6..2671d3be5e7 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java @@ -19,4 +19,4 @@ package org.apache.iotdb.pipe.api.customizer.configuration; -public interface PipeCollectorRuntimeConfiguration extends PipeRuntimeConfiguration {} +public interface PipeExtractorRuntimeConfiguration extends PipeRuntimeConfiguration {} diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 8ad6fc9ac76..7d3b990b8dd 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -612,7 +612,7 @@ struct TShowPipeInfo { 1: required string id 2: required i64 creationTime 3: required string state - 4: required string pipeCollector + 4: required string pipeExtractor 5: required string pipeProcessor 6: required string pipeConnector 7: required string exceptionMessage diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties index 38a57fa0205..033322e456e 100644 --- a/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -960,17 +960,17 @@ cluster_name=defaultCluster # pipe_subtask_executor_pending_queue_max_blocking_time_ms=1000 # The default size of ring buffer in the realtime collector's disruptor queue. -# pipe_collector_assigner_disruptor_ring_buffer_size=65536 +# pipe_extractor_assigner_disruptor_ring_buffer_size=65536 # The maximum number of entries the deviceToCollectorsCache can hold. -# pipe_collector_matcher_cache_size=1024 +# pipe_extractor_matcher_cache_size=1024 # The capacity for the number of tablet events that can be stored in the pending queue of the Hybrid Realtime Collector. -# pipe_collector_pending_queue_capacity=128 +# pipe_extractor_pending_queue_capacity=128 # The limit for the number of tablet events that can be held in the pending queue of the Hybrid Realtime Collector. # Noted that: this should be less than or equals to realtimeCollectorPendingQueueCapacity -# pipe_collector_pending_queue_tablet_limit=64 +# pipe_extractor_pending_queue_tablet_limit=64 # The buffer size used for reading file during file transfer. # pipe_connector_read_file_buffer_size=8388608 diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index d4f768fc0c4..c402ec2f460 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -151,11 +151,11 @@ public class CommonConfig { private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 1000L; private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000; - private int pipeCollectorAssignerDisruptorRingBufferSize = 65536; - private int pipeCollectorMatcherCacheSize = 1024; - private int pipeCollectorPendingQueueCapacity = 128; + private int pipeExtractorAssignerDisruptorRingBufferSize = 65536; + private int pipeExtractorMatcherCacheSize = 1024; + private int pipeExtractorPendingQueueCapacity = 128; // this should be less than or equals to realtimeCollectorPendingQueueCapacity - private int pipeCollectorPendingQueueTabletLimit = pipeCollectorPendingQueueCapacity / 2; + private int pipeExtractorPendingQueueTabletLimit = pipeExtractorPendingQueueCapacity / 2; private int pipeConnectorReadFileBufferSize = 8388608; private long pipeConnectorRetryIntervalMs = 1000L; @@ -458,38 +458,38 @@ public class CommonConfig { this.pipeHardlinkTsFileDirName = pipeHardlinkTsFileDirName; } - public int getPipeCollectorAssignerDisruptorRingBufferSize() { - return pipeCollectorAssignerDisruptorRingBufferSize; + public int getPipeExtractorAssignerDisruptorRingBufferSize() { + return pipeExtractorAssignerDisruptorRingBufferSize; } - public void setPipeCollectorAssignerDisruptorRingBufferSize( - int pipeCollectorAssignerDisruptorRingBufferSize) { - this.pipeCollectorAssignerDisruptorRingBufferSize = - pipeCollectorAssignerDisruptorRingBufferSize; + public void setPipeExtractorAssignerDisruptorRingBufferSize( + int pipeExtractorAssignerDisruptorRingBufferSize) { + this.pipeExtractorAssignerDisruptorRingBufferSize = + pipeExtractorAssignerDisruptorRingBufferSize; } - public int getPipeCollectorMatcherCacheSize() { - return pipeCollectorMatcherCacheSize; + public int getPipeExtractorMatcherCacheSize() { + return pipeExtractorMatcherCacheSize; } - public void setPipeCollectorMatcherCacheSize(int pipeCollectorMatcherCacheSize) { - this.pipeCollectorMatcherCacheSize = pipeCollectorMatcherCacheSize; + public void setPipeExtractorMatcherCacheSize(int pipeExtractorMatcherCacheSize) { + this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize; } - public int getPipeCollectorPendingQueueCapacity() { - return pipeCollectorPendingQueueCapacity; + public int getPipeExtractorPendingQueueCapacity() { + return pipeExtractorPendingQueueCapacity; } - public void setPipeCollectorPendingQueueCapacity(int pipeCollectorPendingQueueCapacity) { - this.pipeCollectorPendingQueueCapacity = pipeCollectorPendingQueueCapacity; + public void setPipeExtractorPendingQueueCapacity(int pipeExtractorPendingQueueCapacity) { + this.pipeExtractorPendingQueueCapacity = pipeExtractorPendingQueueCapacity; } - public int getPipeCollectorPendingQueueTabletLimit() { - return pipeCollectorPendingQueueTabletLimit; + public int getPipeExtractorPendingQueueTabletLimit() { + return pipeExtractorPendingQueueTabletLimit; } - public void setPipeCollectorPendingQueueTabletLimit(int pipeCollectorPendingQueueTabletLimit) { - this.pipeCollectorPendingQueueTabletLimit = pipeCollectorPendingQueueTabletLimit; + public void setPipeExtractorPendingQueueTabletLimit(int pipeExtractorPendingQueueTabletLimit) { + this.pipeExtractorPendingQueueTabletLimit = pipeExtractorPendingQueueTabletLimit; } public int getPipeConnectorReadFileBufferSize() { diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 7dd417d5b3b..a8058a549b7 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -267,26 +267,26 @@ public class CommonDescriptor { "pipe_subtask_executor_pending_queue_max_blocking_time_ms", String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs())))); - config.setPipeCollectorAssignerDisruptorRingBufferSize( + config.setPipeExtractorAssignerDisruptorRingBufferSize( Integer.parseInt( properties.getProperty( - "pipe_collector_assigner_disruptor_ring_buffer_size", - String.valueOf(config.getPipeCollectorAssignerDisruptorRingBufferSize())))); - config.setPipeCollectorMatcherCacheSize( + "pipe_extractor_assigner_disruptor_ring_buffer_size", + String.valueOf(config.getPipeExtractorAssignerDisruptorRingBufferSize())))); + config.setPipeExtractorMatcherCacheSize( Integer.parseInt( properties.getProperty( - "pipe_collector_matcher_cache_size", - String.valueOf(config.getPipeCollectorMatcherCacheSize())))); - config.setPipeCollectorPendingQueueCapacity( + "pipe_extractor_matcher_cache_size", + String.valueOf(config.getPipeExtractorMatcherCacheSize())))); + config.setPipeExtractorPendingQueueCapacity( Integer.parseInt( properties.getProperty( - "pipe_collector_pending_queue_capacity", - String.valueOf(config.getPipeCollectorPendingQueueCapacity())))); - config.setPipeCollectorPendingQueueTabletLimit( + "pipe_extractor_pending_queue_capacity", + String.valueOf(config.getPipeExtractorPendingQueueCapacity())))); + config.setPipeExtractorPendingQueueTabletLimit( Integer.parseInt( properties.getProperty( - "pipe_collector_pending_queue_tablet_limit", - String.valueOf(config.getPipeCollectorPendingQueueTabletLimit())))); + "pipe_extractor_pending_queue_tablet_limit", + String.valueOf(config.getPipeExtractorPendingQueueTabletLimit())))); config.setPipeConnectorReadFileBufferSize( Integer.parseInt( diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 365ea0b8003..2e54fc9c353 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -55,20 +55,20 @@ public class PipeConfig { /////////////////////////////// Collector /////////////////////////////// - public int getPipeCollectorAssignerDisruptorRingBufferSize() { - return COMMON_CONFIG.getPipeCollectorAssignerDisruptorRingBufferSize(); + public int getPipeExtractorAssignerDisruptorRingBufferSize() { + return COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferSize(); } - public int getPipeCollectorMatcherCacheSize() { - return COMMON_CONFIG.getPipeCollectorMatcherCacheSize(); + public int getPipeExtractorMatcherCacheSize() { + return COMMON_CONFIG.getPipeExtractorMatcherCacheSize(); } - public int getPipeCollectorPendingQueueCapacity() { - return COMMON_CONFIG.getPipeCollectorPendingQueueCapacity(); + public int getPipeExtractorPendingQueueCapacity() { + return COMMON_CONFIG.getPipeExtractorPendingQueueCapacity(); } - public int getPipeCollectorPendingQueueTabletLimit() { - return COMMON_CONFIG.getPipeCollectorPendingQueueTabletLimit(); + public int getPipeExtractorPendingQueueTabletLimit() { + return COMMON_CONFIG.getPipeExtractorPendingQueueTabletLimit(); } /////////////////////////////// Connector /////////////////////////////// @@ -118,12 +118,12 @@ public class PipeConfig { getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()); LOGGER.info( - "PipeCollectorAssignerDisruptorRingBufferSize: {}", - getPipeCollectorAssignerDisruptorRingBufferSize()); - LOGGER.info("PipeCollectorMatcherCacheSize: {}", getPipeCollectorMatcherCacheSize()); - LOGGER.info("PipeCollectorPendingQueueCapacity: {}", getPipeCollectorPendingQueueCapacity()); + "PipeExtractorAssignerDisruptorRingBufferSize: {}", + getPipeExtractorAssignerDisruptorRingBufferSize()); + LOGGER.info("PipeExtractorMatcherCacheSize: {}", getPipeExtractorMatcherCacheSize()); + LOGGER.info("PipeExtractorPendingQueueCapacity: {}", getPipeExtractorPendingQueueCapacity()); LOGGER.info( - "PipeCollectorPendingQueueTabletLimit: {}", getPipeCollectorPendingQueueTabletLimit()); + "PipeExtractorPendingQueueTabletLimit: {}", getPipeExtractorPendingQueueTabletLimit()); LOGGER.info("PipeConnectorReadFileBufferSize: {}", getPipeConnectorReadFileBufferSize()); LOGGER.info("PipeConnectorRetryIntervalMs: {}", getPipeConnectorRetryIntervalMs()); diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index 14d22f578c3..1bcbe17b4ad 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -19,7 +19,7 @@ package org.apache.iotdb.commons.pipe.plugin.builtin; -import org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBCollector; +import org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBExtractor; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSyncConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector; @@ -30,7 +30,7 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor public enum BuiltinPipePlugin { // collectors - IOTDB_COLLECTOR("iotdb_collector", IoTDBCollector.class), + IOTDB_EXTRACTOR("iotdb_extractor", IoTDBExtractor.class), // processors DO_NOTHING_PROCESSOR("do_nothing_processor", DoNothingProcessor.class), diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBExtractor.java similarity index 89% rename from node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java rename to node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBExtractor.java index 8d3f0276126..9857f5dfd7d 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBExtractor.java @@ -19,8 +19,8 @@ package org.apache.iotdb.commons.pipe.plugin.builtin.collector; -import org.apache.iotdb.pipe.api.PipeCollector; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.PipeExtractor; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; @@ -31,14 +31,14 @@ import org.apache.iotdb.pipe.api.event.Event; * imported here. The pipe agent in the server module will replace this class with the real * implementation when initializing the collector. */ -public class IoTDBCollector implements PipeCollector { +public class IoTDBExtractor implements PipeExtractor { @Override public void validate(PipeParameterValidator validator) throws Exception { throw new UnsupportedOperationException("This class is a placeholder and should not be used."); } @Override - public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) + public void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration) throws Exception { throw new UnsupportedOperationException("This class is a placeholder and should not be used."); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java index 304222a7407..a982df07a4d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java @@ -146,7 +146,7 @@ public class ColumnHeaderConstant { // column names for show pipe public static final String ID = "ID"; public static final String CREATION_TIME = "CreationTime"; - public static final String PIPE_COLLECTOR = "PipeCollector"; + public static final String PIPE_EXTRACTOR = "PipeExtractor"; public static final String PIPE_PROCESSOR = "PipeProcessor"; public static final String PIPE_CONNECTOR = "PipeConnector"; public static final String EXCEPTION_MESSAGE = "ExceptionMessage"; @@ -385,7 +385,7 @@ public class ColumnHeaderConstant { new ColumnHeader(ID, TSDataType.TEXT), new ColumnHeader(CREATION_TIME, TSDataType.TEXT), new ColumnHeader(STATE, TSDataType.TEXT), - new ColumnHeader(PIPE_COLLECTOR, TSDataType.TEXT), + new ColumnHeader(PIPE_EXTRACTOR, TSDataType.TEXT), new ColumnHeader(PIPE_PROCESSOR, TSDataType.TEXT), new ColumnHeader(PIPE_CONNECTOR, TSDataType.TEXT), new ColumnHeader(EXCEPTION_MESSAGE, TSDataType.TEXT)); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/pipe/ShowPipeTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/pipe/ShowPipeTask.java index dde5561e9de..a9a79015bbc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/pipe/ShowPipeTask.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/pipe/ShowPipeTask.java @@ -68,7 +68,7 @@ public class ShowPipeTask implements IConfigTask { .getColumnBuilder(1) .writeBinary(new Binary(DateTimeUtils.convertLongToDate(tPipeInfo.getCreationTime()))); builder.getColumnBuilder(2).writeBinary(new Binary(tPipeInfo.getState())); - builder.getColumnBuilder(3).writeBinary(new Binary(tPipeInfo.getPipeCollector())); + builder.getColumnBuilder(3).writeBinary(new Binary(tPipeInfo.getPipeExtractor())); builder.getColumnBuilder(4).writeBinary(new Binary(tPipeInfo.getPipeProcessor())); builder.getColumnBuilder(5).writeBinary(new Binary(tPipeInfo.getPipeConnector())); builder.getColumnBuilder(6).writeBinary(new Binary(tPipeInfo.getExceptionMessage())); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java index 5a05cb43b53..7aa7c3d5449 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java @@ -25,11 +25,11 @@ import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta; import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader; import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager; import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager; -import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant; -import org.apache.iotdb.pipe.api.PipeCollector; import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.PipeExtractor; import org.apache.iotdb.pipe.api.PipePlugin; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -195,12 +195,12 @@ public class PipePluginAgent { } } - public PipeCollector reflectCollector(PipeParameters collectorParameters) { - return (PipeCollector) + public PipeExtractor reflectCollector(PipeParameters collectorParameters) { + return (PipeExtractor) reflect( collectorParameters.getStringOrDefault( - PipeCollectorConstant.COLLECTOR_KEY, - BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())); + PipeExtractorConstant.COLLECTOR_KEY, + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())); } public PipeProcessor reflectProcessor(PipeParameters processorParameters) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionExtractor.java similarity index 82% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java rename to server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionExtractor.java index cbe7fee083e..48a34fc923c 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionExtractor.java @@ -21,16 +21,16 @@ package org.apache.iotdb.db.pipe.collector; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.db.engine.StorageEngine; -import org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionCollector; -import org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionTsFileCollector; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionFakeCollector; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridCollector; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionLogCollector; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionTsFileCollector; +import org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionExtractor; +import org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionTsFileExtractor; +import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; +import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionFakeExtractor; +import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridExtractor; +import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionLogExtractor; +import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionTsFileExtractor; import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; -import org.apache.iotdb.pipe.api.PipeCollector; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.PipeExtractor; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; @@ -42,25 +42,25 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY; -import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_ENABLE; -import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE; -import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_FILE; -import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_HYBRID; -import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_LOG; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_ENABLE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE_FILE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE_HYBRID; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE_LOG; -public class IoTDBDataRegionCollector implements PipeCollector { +public class IoTDBDataRegionExtractor implements PipeExtractor { - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionCollector.class); + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionExtractor.class); private final AtomicBoolean hasBeenStarted; - private PipeHistoricalDataRegionCollector historicalCollector; - private PipeRealtimeDataRegionCollector realtimeCollector; + private PipeHistoricalDataRegionExtractor historicalCollector; + private PipeRealtimeDataRegionExtractor realtimeCollector; private int dataRegionId; - public IoTDBDataRegionCollector() { + public IoTDBDataRegionExtractor() { this.hasBeenStarted = new AtomicBoolean(false); } @@ -99,34 +99,34 @@ public class IoTDBDataRegionCollector implements PipeCollector { private void constructHistoricalCollector() { // enable historical collector by default - historicalCollector = new PipeHistoricalDataRegionTsFileCollector(); + historicalCollector = new PipeHistoricalDataRegionTsFileExtractor(); } private void constructRealtimeCollector(PipeParameters parameters) { // enable realtime collector by default if (!parameters.getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true)) { - realtimeCollector = new PipeRealtimeDataRegionFakeCollector(); + realtimeCollector = new PipeRealtimeDataRegionFakeExtractor(); return; } // use hybrid mode by default if (!parameters.hasAttribute(COLLECTOR_REALTIME_MODE)) { - realtimeCollector = new PipeRealtimeDataRegionHybridCollector(); + realtimeCollector = new PipeRealtimeDataRegionHybridExtractor(); return; } switch (parameters.getString(COLLECTOR_REALTIME_MODE)) { case COLLECTOR_REALTIME_MODE_FILE: - realtimeCollector = new PipeRealtimeDataRegionTsFileCollector(); + realtimeCollector = new PipeRealtimeDataRegionTsFileExtractor(); break; case COLLECTOR_REALTIME_MODE_LOG: - realtimeCollector = new PipeRealtimeDataRegionLogCollector(); + realtimeCollector = new PipeRealtimeDataRegionLogExtractor(); break; case COLLECTOR_REALTIME_MODE_HYBRID: - realtimeCollector = new PipeRealtimeDataRegionHybridCollector(); + realtimeCollector = new PipeRealtimeDataRegionHybridExtractor(); break; default: - realtimeCollector = new PipeRealtimeDataRegionHybridCollector(); + realtimeCollector = new PipeRealtimeDataRegionHybridExtractor(); LOGGER.warn( "Unsupported collector realtime mode: {}, create a hybrid collector.", parameters.getString(COLLECTOR_REALTIME_MODE)); @@ -134,7 +134,7 @@ public class IoTDBDataRegionCollector implements PipeCollector { } @Override - public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) + public void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration) throws Exception { dataRegionId = ((PipeTaskCollectorRuntimeEnvironment) configuration.getRuntimeEnvironment()).getRegionId(); @@ -163,7 +163,7 @@ public class IoTDBDataRegionCollector implements PipeCollector { (dataRegion -> { dataRegion.writeLock( String.format( - "Pipe: starting %s", IoTDBDataRegionCollector.class.getName())); + "Pipe: starting %s", IoTDBDataRegionExtractor.class.getName())); try { startHistoricalCollectorAndRealtimeCollector(exceptionHolder); } finally { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionExtractor.java similarity index 87% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java rename to server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionExtractor.java index 08d5684851e..858ae933cb5 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionExtractor.java @@ -19,9 +19,9 @@ package org.apache.iotdb.db.pipe.collector.historical; -import org.apache.iotdb.pipe.api.PipeCollector; +import org.apache.iotdb.pipe.api.PipeExtractor; -public interface PipeHistoricalDataRegionCollector extends PipeCollector { +public interface PipeHistoricalDataRegionExtractor extends PipeExtractor { boolean hasConsumedAll(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileExtractor.java similarity index 93% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java rename to server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileExtractor.java index 6ade5ca4ad8..2327e63b07a 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -30,7 +30,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.utils.DateTimeUtils; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; @@ -44,16 +44,16 @@ import java.util.ArrayDeque; import java.util.Queue; import java.util.stream.Collectors; -import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY; -import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME; -import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME; -import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE; -import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_END_TIME; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_START_TIME; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_PATTERN_KEY; -public class PipeHistoricalDataRegionTsFileCollector implements PipeHistoricalDataRegionCollector { +public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDataRegionExtractor { private static final Logger LOGGER = - LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileCollector.class); + LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileExtractor.class); private PipeTaskMeta pipeTaskMeta; private ProgressIndex startIndex; @@ -76,7 +76,7 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeHistoricalDa @Override public void customize( - PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) { + PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration) { final PipeTaskCollectorRuntimeEnvironment environment = (PipeTaskCollectorRuntimeEnvironment) configuration.getRuntimeEnvironment(); @@ -203,7 +203,7 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeHistoricalDa pendingQueue.forEach( event -> event.increaseReferenceCount( - PipeHistoricalDataRegionTsFileCollector.class.getName())); + PipeHistoricalDataRegionTsFileExtractor.class.getName())); } finally { tsFileManager.readUnlock(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionExtractor.java similarity index 84% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java rename to server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionExtractor.java index 4e6e98c168e..aec400b6d5d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionExtractor.java @@ -21,32 +21,32 @@ package org.apache.iotdb.db.pipe.collector.realtime; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener; -import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; -import org.apache.iotdb.pipe.api.PipeCollector; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.PipeExtractor; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; -public abstract class PipeRealtimeDataRegionCollector implements PipeCollector { +public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { protected String pattern; protected String dataRegionId; protected PipeTaskMeta pipeTaskMeta; - protected PipeRealtimeDataRegionCollector() {} + protected PipeRealtimeDataRegionExtractor() {} @Override public void validate(PipeParameterValidator validator) throws Exception {} @Override - public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) + public void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration) throws Exception { pattern = parameters.getStringOrDefault( - PipeCollectorConstant.COLLECTOR_PATTERN_KEY, - PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE); + PipeExtractorConstant.COLLECTOR_PATTERN_KEY, + PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE); final PipeTaskCollectorRuntimeEnvironment environment = (PipeTaskCollectorRuntimeEnvironment) configuration.getRuntimeEnvironment(); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeExtractor.java similarity index 88% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java rename to server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeExtractor.java index 6051930e4e9..cffb273cee1 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeExtractor.java @@ -20,12 +20,12 @@ package org.apache.iotdb.db.pipe.collector.realtime; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; -public class PipeRealtimeDataRegionFakeCollector extends PipeRealtimeDataRegionCollector { +public class PipeRealtimeDataRegionFakeExtractor extends PipeRealtimeDataRegionExtractor { @Override public void validate(PipeParameterValidator validator) { @@ -34,7 +34,7 @@ public class PipeRealtimeDataRegionFakeCollector extends PipeRealtimeDataRegionC @Override public void customize( - PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) { + PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration) { // do nothing } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridExtractor.java similarity index 95% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java rename to server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridExtractor.java index eeca9a82dad..a78ad713feb 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -32,16 +32,16 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegionCollector { +public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegionExtractor { private static final Logger LOGGER = - LoggerFactory.getLogger(PipeRealtimeDataRegionHybridCollector.class); + LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class); // This queue is used to store pending events collected by the method collect(). The method // supply() will poll events from this queue and send them to the next pipe plugin. private final UnboundedBlockingPendingQueue<Event> pendingQueue; - public PipeRealtimeDataRegionHybridCollector() { + public PipeRealtimeDataRegionHybridExtractor() { this.pendingQueue = new UnboundedBlockingPendingQueue<>(); } @@ -113,7 +113,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio private boolean isApproachingCapacity() { return pendingQueue.size() - >= PipeConfig.getInstance().getPipeCollectorPendingQueueTabletLimit(); + >= PipeConfig.getInstance().getPipeExtractorPendingQueueTabletLimit(); } @Override @@ -136,7 +136,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio eventToSupply.getClass(), this)); } - collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName()); + collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); if (suppliedEvent != null) { return suppliedEvent; } @@ -157,7 +157,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio (state.equals(TsFileEpoch.State.EMPTY)) ? TsFileEpoch.State.USING_TABLET : state); if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TABLET)) { - if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName())) { + if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) { return event.getEvent(); } else { // if the event's reference count can not be increased, it means the data represented by @@ -188,7 +188,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio }); if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) { - if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName())) { + if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) { return event.getEvent(); } else { // if the event's reference count can not be increased, it means the data represented by diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogExtractor.java similarity index 93% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java rename to server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogExtractor.java index 2d02d4fba78..b50e48536c2 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogExtractor.java @@ -30,16 +30,16 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCollector { +public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionExtractor { private static final Logger LOGGER = - LoggerFactory.getLogger(PipeRealtimeDataRegionLogCollector.class); + LoggerFactory.getLogger(PipeRealtimeDataRegionLogExtractor.class); // This queue is used to store pending events collected by the method collect(). The method // supply() will poll events from this queue and send them to the next pipe plugin. private final UnboundedBlockingPendingQueue<Event> pendingQueue; - public PipeRealtimeDataRegionLogCollector() { + public PipeRealtimeDataRegionLogExtractor() { this.pendingQueue = new UnboundedBlockingPendingQueue<>(); } @@ -79,7 +79,7 @@ public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCo while (collectEvent != null) { Event suppliedEvent = null; - if (collectEvent.increaseReferenceCount(PipeRealtimeDataRegionLogCollector.class.getName())) { + if (collectEvent.increaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName())) { suppliedEvent = collectEvent.getEvent(); } else { // if the event's reference count can not be increased, it means the data represented by @@ -94,7 +94,7 @@ public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCo PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); } - collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogCollector.class.getName()); + collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()); if (suppliedEvent != null) { return suppliedEvent; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileExtractor.java similarity index 92% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java rename to server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileExtractor.java index f43582a5541..2bc9abee7ad 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileExtractor.java @@ -30,16 +30,16 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegionCollector { +public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegionExtractor { private static final Logger LOGGER = - LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileCollector.class); + LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileExtractor.class); // This queue is used to store pending events collected by the method collect(). The method // supply() will poll events from this queue and send them to the next pipe plugin. private final UnboundedBlockingPendingQueue<Event> pendingQueue; - public PipeRealtimeDataRegionTsFileCollector() { + public PipeRealtimeDataRegionTsFileExtractor() { this.pendingQueue = new UnboundedBlockingPendingQueue<>(); } @@ -80,7 +80,7 @@ public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegio Event suppliedEvent = null; if (collectEvent.increaseReferenceCount( - PipeRealtimeDataRegionTsFileCollector.class.getName())) { + PipeRealtimeDataRegionTsFileExtractor.class.getName())) { suppliedEvent = collectEvent.getEvent(); } else { // if the event's reference count can not be increased, it means the data represented by @@ -95,7 +95,7 @@ public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegio PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); } - collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileCollector.class.getName()); + collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName()); if (suppliedEvent != null) { return suppliedEvent; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java index 885913a44cf..bc22132d4cd 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java @@ -49,7 +49,7 @@ public class DisruptorQueue<E> { public static class Builder<E> { private int ringBufferSize = - PipeConfig.getInstance().getPipeCollectorAssignerDisruptorRingBufferSize(); + PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize(); private ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE; private ProducerType producerType = ProducerType.MULTI; private WaitStrategy waitStrategy = new BlockingWaitStrategy(); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java index fe482147e04..116f63f3073 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.pipe.collector.realtime.assigner; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector; +import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; import org.apache.iotdb.db.pipe.collector.realtime.matcher.CachedSchemaPatternMatcher; import org.apache.iotdb.db.pipe.collector.realtime.matcher.PipeDataRegionMatcher; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; @@ -63,11 +63,11 @@ public class PipeDataRegionAssigner { event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName()); } - public void startAssignTo(PipeRealtimeDataRegionCollector collector) { + public void startAssignTo(PipeRealtimeDataRegionExtractor collector) { matcher.register(collector); } - public void stopAssignTo(PipeRealtimeDataRegionCollector collector) { + public void stopAssignTo(PipeRealtimeDataRegionExtractor collector) { matcher.deregister(collector); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java index f20bf4cc6dc..a4806c30774 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.pipe.collector.realtime.epoch; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector; +import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; public class TsFileEpoch { private final String filePath; - private final ConcurrentMap<PipeRealtimeDataRegionCollector, AtomicReference<State>> + private final ConcurrentMap<PipeRealtimeDataRegionExtractor, AtomicReference<State>> dataRegionCollector2State; public TsFileEpoch(String filePath) { @@ -36,14 +36,14 @@ public class TsFileEpoch { this.dataRegionCollector2State = new ConcurrentHashMap<>(); } - public TsFileEpoch.State getState(PipeRealtimeDataRegionCollector collector) { + public TsFileEpoch.State getState(PipeRealtimeDataRegionExtractor collector) { return dataRegionCollector2State .computeIfAbsent(collector, o -> new AtomicReference<>(State.EMPTY)) .get(); } public void migrateState( - PipeRealtimeDataRegionCollector collector, TsFileEpochStateMigrator visitor) { + PipeRealtimeDataRegionExtractor collector, TsFileEpochStateMigrator visitor) { dataRegionCollector2State .computeIfAbsent(collector, o -> new AtomicReference<>(State.EMPTY)) .getAndUpdate(visitor::migrate); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java index 6fd82618ff9..00758562073 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java @@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.collector.realtime.listener; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector; +import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; import org.apache.iotdb.db.pipe.collector.realtime.assigner.PipeDataRegionAssigner; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEventFactory; import org.apache.iotdb.db.wal.utils.WALEntryHandler; @@ -53,7 +53,7 @@ public class PipeInsertionDataNodeListener { //////////////////////////// start & stop //////////////////////////// public synchronized void startListenAndAssign( - String dataRegionId, PipeRealtimeDataRegionCollector collector) { + String dataRegionId, PipeRealtimeDataRegionExtractor collector) { dataRegionId2Assigner .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner()) .startAssignTo(collector); @@ -67,7 +67,7 @@ public class PipeInsertionDataNodeListener { } public synchronized void stopListenAndAssign( - String dataRegionId, PipeRealtimeDataRegionCollector collector) { + String dataRegionId, PipeRealtimeDataRegionExtractor collector) { final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); if (assigner == null) { return; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java index 14df36e9267..0af3151ed9a 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.pipe.collector.realtime.matcher; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector; +import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; @@ -40,20 +40,20 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { private final ReentrantReadWriteLock lock; - private final Set<PipeRealtimeDataRegionCollector> collectors; - private final Cache<String, Set<PipeRealtimeDataRegionCollector>> deviceToCollectorsCache; + private final Set<PipeRealtimeDataRegionExtractor> collectors; + private final Cache<String, Set<PipeRealtimeDataRegionExtractor>> deviceToCollectorsCache; public CachedSchemaPatternMatcher() { this.lock = new ReentrantReadWriteLock(); this.collectors = new HashSet<>(); this.deviceToCollectorsCache = Caffeine.newBuilder() - .maximumSize(PipeConfig.getInstance().getPipeCollectorMatcherCacheSize()) + .maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize()) .build(); } @Override - public void register(PipeRealtimeDataRegionCollector collector) { + public void register(PipeRealtimeDataRegionExtractor collector) { lock.writeLock().lock(); try { collectors.add(collector); @@ -64,7 +64,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { } @Override - public void deregister(PipeRealtimeDataRegionCollector collector) { + public void deregister(PipeRealtimeDataRegionExtractor collector) { lock.writeLock().lock(); try { collectors.remove(collector); @@ -85,8 +85,8 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { } @Override - public Set<PipeRealtimeDataRegionCollector> match(PipeRealtimeCollectEvent event) { - final Set<PipeRealtimeDataRegionCollector> matchedCollectors = new HashSet<>(); + public Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeCollectEvent event) { + final Set<PipeRealtimeDataRegionExtractor> matchedCollectors = new HashSet<>(); lock.readLock().lock(); try { @@ -99,7 +99,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { final String[] measurements = entry.getValue(); // 1. try to get matched collectors from cache, if not success, match them by device - final Set<PipeRealtimeDataRegionCollector> collectorsFilteredByDevice = + final Set<PipeRealtimeDataRegionExtractor> collectorsFilteredByDevice = deviceToCollectorsCache.get(device, this::filterCollectorsByDevice); // this would not happen if (collectorsFilteredByDevice == null) { @@ -166,10 +166,10 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { return matchedCollectors; } - private Set<PipeRealtimeDataRegionCollector> filterCollectorsByDevice(String device) { - final Set<PipeRealtimeDataRegionCollector> filteredCollectors = new HashSet<>(); + private Set<PipeRealtimeDataRegionExtractor> filterCollectorsByDevice(String device) { + final Set<PipeRealtimeDataRegionExtractor> filteredCollectors = new HashSet<>(); - for (PipeRealtimeDataRegionCollector collector : collectors) { + for (PipeRealtimeDataRegionExtractor collector : collectors) { String pattern = collector.getPattern(); if ( // for example, pattern is root.a.b and device is root.a.b.c diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java index 6e84cdf0ce2..44b544329c2 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.pipe.collector.realtime.matcher; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector; +import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; import java.util.Set; @@ -30,10 +30,10 @@ public interface PipeDataRegionMatcher { * Register a collector. If the collector's pattern matches the event's schema info, the event * will be assigned to the collector. */ - void register(PipeRealtimeDataRegionCollector collector); + void register(PipeRealtimeDataRegionExtractor collector); /** Deregister a collector. */ - void deregister(PipeRealtimeDataRegionCollector collector); + void deregister(PipeRealtimeDataRegionExtractor collector); /** Get the number of registered collectors in this matcher. */ int getRegisterCount(); @@ -45,7 +45,7 @@ public interface PipeDataRegionMatcher { * @param event the event to be matched * @return the matched collectors */ - Set<PipeRealtimeDataRegionCollector> match(PipeRealtimeCollectEvent event); + Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeCollectEvent event); /** Clear all the registered collectors and internal data structures. */ void clear(); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java index a98bac82bdd..5c3f660f8e6 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.pipe.config.constant; -public class PipeCollectorConstant { +public class PipeExtractorConstant { public static final String COLLECTOR_KEY = "collector"; @@ -36,7 +36,7 @@ public class PipeCollectorConstant { public static final String COLLECTOR_REALTIME_MODE_FILE = "file"; public static final String COLLECTOR_REALTIME_MODE_LOG = "log"; - private PipeCollectorConstant() { + private PipeExtractorConstant() { throw new IllegalStateException("Utility class"); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java index 29810c46960..bd4999d68ab 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java @@ -19,13 +19,13 @@ package org.apache.iotdb.db.pipe.config.plugin.configuraion; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment; public class PipeTaskRuntimeConfiguration - implements PipeCollectorRuntimeConfiguration, + implements PipeExtractorRuntimeConfiguration, PipeProcessorRuntimeConfiguration, PipeConnectorRuntimeConfiguration { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java index 462ba9164f6..4f70f17674c 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java @@ -23,7 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.pipe.api.event.Event; import java.util.concurrent.atomic.AtomicInteger; @@ -125,7 +125,7 @@ public abstract class EnrichedEvent implements Event { * @return the pattern */ public final String getPattern() { - return pattern == null ? PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern; + return pattern == null ? PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern; } public abstract EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 168400443c8..24f52d0ea87 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.pipe.event.common.tablet; -import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -47,7 +47,7 @@ public class PipeRawTabletInsertionEvent implements TabletInsertionEvent { } public String getPattern() { - return pattern == null ? PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern; + return pattern == null ? PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern; } /////////////////////////// TabletInsertionEvent /////////////////////////// @@ -78,7 +78,7 @@ public class PipeRawTabletInsertionEvent implements TabletInsertionEvent { final String notNullPattern = getPattern(); // if notNullPattern is "root", we don't need to convert, just return the original tablet - if (notNullPattern.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) { + if (notNullPattern.equals(PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) { return tablet; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java index 70d21e85b3a..18ea33be4ff 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.pipe.processor; -import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.db.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.pipe.api.PipeProcessor; @@ -54,7 +54,7 @@ public class PipeDoNothingProcessor implements PipeProcessor { final EnrichedEvent enrichedEvent = (EnrichedEvent) tabletInsertionEvent; if (enrichedEvent .getPattern() - .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) { + .equals(PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) { eventCollector.collect(tabletInsertionEvent); } else { tabletInsertionEvent @@ -86,7 +86,7 @@ public class PipeDoNothingProcessor implements PipeProcessor { if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) { final PipeTsFileInsertionEvent enrichedEvent = (PipeTsFileInsertionEvent) tsFileInsertionEvent; - if (enrichedEvent.getPattern().equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE) + if (enrichedEvent.getPattern().equals(PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE) && !enrichedEvent.hasTimeFilter()) { eventCollector.collect(tsFileInsertionEvent); } else { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java index 0d676ac4992..807c7b0c758 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java @@ -23,19 +23,19 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.pipe.collector.IoTDBDataRegionCollector; -import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.collector.IoTDBDataRegionExtractor; +import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; import org.apache.iotdb.db.pipe.task.connection.EventSupplier; -import org.apache.iotdb.pipe.api.PipeCollector; +import org.apache.iotdb.pipe.api.PipeExtractor; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; public class PipeTaskCollectorStage extends PipeTaskStage { - private final PipeCollector pipeCollector; + private final PipeExtractor pipeExtractor; public PipeTaskCollectorStage( String pipeName, @@ -43,27 +43,27 @@ public class PipeTaskCollectorStage extends PipeTaskStage { PipeParameters collectorParameters, TConsensusGroupId dataRegionId, PipeTaskMeta pipeTaskMeta) { - pipeCollector = + pipeExtractor = collectorParameters .getStringOrDefault( - PipeCollectorConstant.COLLECTOR_KEY, - BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName()) - .equals(BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName()) - ? new IoTDBDataRegionCollector() + PipeExtractorConstant.COLLECTOR_KEY, + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + .equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + ? new IoTDBDataRegionExtractor() : PipeAgent.plugin().reflectCollector(collectorParameters); // validate and customize should be called before createSubtask. this allows collector exposing // exceptions in advance. try { // 1. validate collector parameters - pipeCollector.validate(new PipeParameterValidator(collectorParameters)); + pipeExtractor.validate(new PipeParameterValidator(collectorParameters)); // 2. customize collector final PipeTaskRuntimeConfiguration runtimeConfiguration = new PipeTaskRuntimeConfiguration( new PipeTaskCollectorRuntimeEnvironment( pipeName, creationTime, dataRegionId.getId(), pipeTaskMeta)); - pipeCollector.customize(collectorParameters, runtimeConfiguration); + pipeExtractor.customize(collectorParameters, runtimeConfiguration); } catch (Exception e) { throw new PipeException(e.getMessage(), e); } @@ -77,7 +77,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage { @Override public void startSubtask() throws PipeException { try { - pipeCollector.start(); + pipeExtractor.start(); } catch (Exception e) { throw new PipeException(e.getMessage(), e); } @@ -91,13 +91,13 @@ public class PipeTaskCollectorStage extends PipeTaskStage { @Override public void dropSubtask() throws PipeException { try { - pipeCollector.close(); + pipeExtractor.close(); } catch (Exception e) { throw new PipeException(e.getMessage(), e); } } public EventSupplier getEventSupplier() { - return pipeCollector::supply; + return pipeExtractor::supply; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java index 60fbc3798df..fade5393d72 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java @@ -51,7 +51,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage { * @param creationTime pipe creation time * @param pipeProcessorParameters used to create pipe processor * @param dataRegionId data region id - * @param pipeCollectorInputEventSupplier used to input events from pipe collector + * @param pipeExtractorInputEventSupplier used to input events from pipe extractor * @param pipeConnectorOutputPendingQueue used to output events to pipe connector */ public PipeTaskProcessorStage( @@ -59,7 +59,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage { long creationTime, PipeParameters pipeProcessorParameters, TConsensusGroupId dataRegionId, - EventSupplier pipeCollectorInputEventSupplier, + EventSupplier pipeExtractorInputEventSupplier, BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) { final PipeProcessor pipeProcessor = pipeProcessorParameters @@ -90,7 +90,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage { this.pipeProcessorSubtask = new PipeProcessorSubtask( taskId, - pipeCollectorInputEventSupplier, + pipeExtractorInputEventSupplier, pipeProcessor, pipeConnectorOutputEventCollector); } diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java index e0977828d75..e84b8195956 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java @@ -19,9 +19,9 @@ package org.apache.iotdb.db.pipe.collector; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector; +import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; import org.apache.iotdb.db.pipe.collector.realtime.matcher.CachedSchemaPatternMatcher; -import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; @@ -49,7 +49,7 @@ public class CachedSchemaPatternMatcherTest { private CachedSchemaPatternMatcher matcher; private ExecutorService executorService; - private List<PipeRealtimeDataRegionCollector> collectorList; + private List<PipeRealtimeDataRegionExtractor> collectorList; @Before public void setUp() { @@ -65,12 +65,12 @@ public class CachedSchemaPatternMatcherTest { @Test public void testCachedMatcher() throws Exception { - PipeRealtimeDataRegionCollector databaseCollector = new PipeRealtimeDataRegionFakeCollector(); + PipeRealtimeDataRegionExtractor databaseCollector = new PipeRealtimeDataRegionFakeExtractor(); databaseCollector.customize( new PipeParameters( new HashMap<String, String>() { { - put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root"); + put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, "root"); } }), new PipeTaskRuntimeConfiguration(new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null))); @@ -79,20 +79,20 @@ public class CachedSchemaPatternMatcherTest { int deviceCollectorNum = 10; int seriesCollectorNum = 10; for (int i = 0; i < deviceCollectorNum; i++) { - PipeRealtimeDataRegionCollector deviceCollector = new PipeRealtimeDataRegionFakeCollector(); + PipeRealtimeDataRegionExtractor deviceCollector = new PipeRealtimeDataRegionFakeExtractor(); int finalI1 = i; deviceCollector.customize( new PipeParameters( new HashMap<String, String>() { { - put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root." + finalI1); + put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, "root." + finalI1); } }), new PipeTaskRuntimeConfiguration( new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null))); collectorList.add(deviceCollector); for (int j = 0; j < seriesCollectorNum; j++) { - PipeRealtimeDataRegionCollector seriesCollector = new PipeRealtimeDataRegionFakeCollector(); + PipeRealtimeDataRegionExtractor seriesCollector = new PipeRealtimeDataRegionFakeExtractor(); int finalI = i; int finalJ = j; seriesCollector.customize( @@ -100,7 +100,7 @@ public class CachedSchemaPatternMatcherTest { new HashMap<String, String>() { { put( - PipeCollectorConstant.COLLECTOR_PATTERN_KEY, + PipeExtractorConstant.COLLECTOR_PATTERN_KEY, "root." + finalI + "." + finalJ); } }), @@ -147,9 +147,9 @@ public class CachedSchemaPatternMatcherTest { future.get(); } - public static class PipeRealtimeDataRegionFakeCollector extends PipeRealtimeDataRegionCollector { + public static class PipeRealtimeDataRegionFakeExtractor extends PipeRealtimeDataRegionExtractor { - public PipeRealtimeDataRegionFakeCollector() {} + public PipeRealtimeDataRegionFakeExtractor() {} @Override public Event supply() { diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java index 1b12e5822f9..0ef1058c12a 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java @@ -25,10 +25,10 @@ import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridCollector; +import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; +import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridExtractor; import org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener; -import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; import org.apache.iotdb.db.wal.utils.WALEntryHandler; @@ -102,20 +102,20 @@ public class PipeRealtimeCollectTest { public void testRealtimeCollectProcess() { // set up realtime collector - try (PipeRealtimeDataRegionHybridCollector collector1 = - new PipeRealtimeDataRegionHybridCollector(); - PipeRealtimeDataRegionHybridCollector collector2 = - new PipeRealtimeDataRegionHybridCollector(); - PipeRealtimeDataRegionHybridCollector collector3 = - new PipeRealtimeDataRegionHybridCollector(); - PipeRealtimeDataRegionHybridCollector collector4 = - new PipeRealtimeDataRegionHybridCollector()) { + try (PipeRealtimeDataRegionHybridExtractor collector1 = + new PipeRealtimeDataRegionHybridExtractor(); + PipeRealtimeDataRegionHybridExtractor collector2 = + new PipeRealtimeDataRegionHybridExtractor(); + PipeRealtimeDataRegionHybridExtractor collector3 = + new PipeRealtimeDataRegionHybridExtractor(); + PipeRealtimeDataRegionHybridExtractor collector4 = + new PipeRealtimeDataRegionHybridExtractor()) { collector1.customize( new PipeParameters( new HashMap<String, String>() { { - put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1); + put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern1); } }), new PipeTaskRuntimeConfiguration( @@ -125,7 +125,7 @@ public class PipeRealtimeCollectTest { new PipeParameters( new HashMap<String, String>() { { - put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2); + put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern2); } }), new PipeTaskRuntimeConfiguration( @@ -135,7 +135,7 @@ public class PipeRealtimeCollectTest { new PipeParameters( new HashMap<String, String>() { { - put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1); + put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern1); } }), new PipeTaskRuntimeConfiguration( @@ -145,15 +145,15 @@ public class PipeRealtimeCollectTest { new PipeParameters( new HashMap<String, String>() { { - put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2); + put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern2); } }), new PipeTaskRuntimeConfiguration( new PipeTaskCollectorRuntimeEnvironment( "1", 1, Integer.parseInt(dataRegion2), null))); - PipeRealtimeDataRegionCollector[] collectors = - new PipeRealtimeDataRegionCollector[] {collector1, collector2, collector3, collector4}; + PipeRealtimeDataRegionExtractor[] collectors = + new PipeRealtimeDataRegionExtractor[] {collector1, collector2, collector3, collector4}; // start collector 0, 1 collectors[0].start(); @@ -291,7 +291,7 @@ public class PipeRealtimeCollectTest { } private Future<?> listen( - PipeRealtimeDataRegionCollector collector, Function<Event, Integer> weight, int expectNum) { + PipeRealtimeDataRegionExtractor collector, Function<Event, Integer> weight, int expectNum) { return listenerService.submit( () -> { int eventNum = 0;
