This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch rename-collector-to-extractor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4f60b69d27e2257baf98b0f9759ad9780119acc2 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Jun 23 23:14:40 2023 +0800 pipe rename: collector -> extractor --- .../response/pipe/task/PipeTableResp.java | 2 +- .../persistence/pipe/PipePluginInfo.java | 2 +- .../commons/pipe/task/meta/PipeStaticMeta.java | 2 +- .../db/engine/storagegroup/TsFileProcessor.java | 2 +- .../db/pipe/agent/plugin/PipePluginAgent.java | 6 +- ...torConstant.java => PipeExtractorConstant.java} | 22 ++-- ...va => PipeTaskExtractorRuntimeEnvironment.java} | 4 +- .../apache/iotdb/db/pipe/event/EnrichedEvent.java | 2 +- .../common/tablet/PipeRawTabletInsertionEvent.java | 4 +- ...imeCollectEvent.java => PipeRealtimeEvent.java} | 18 +-- ...tFactory.java => PipeRealtimeEventFactory.java} | 10 +- .../IoTDBDataRegionExtractor.java | 138 ++++++++++----------- .../PipeHistoricalDataRegionExtractor.java | 2 +- .../PipeHistoricalDataRegionTsFileExtractor.java | 52 ++++---- .../realtime/PipeRealtimeDataRegionExtractor.java | 20 +-- .../PipeRealtimeDataRegionFakeExtractor.java | 8 +- .../PipeRealtimeDataRegionHybridExtractor.java | 32 ++--- .../PipeRealtimeDataRegionLogExtractor.java | 16 +-- .../PipeRealtimeDataRegionTsFileExtractor.java | 16 +-- .../realtime/assigner/DisruptorQueue.java | 2 +- .../realtime/assigner/PipeDataRegionAssigner.java | 42 +++---- .../realtime/epoch/TsFileEpoch.java | 24 ++-- .../realtime/epoch/TsFileEpochManager.java | 12 +- .../realtime/epoch/TsFileEpochStateMigrator.java | 2 +- .../listener/PipeInsertionDataNodeListener.java | 51 ++++---- .../matcher/CachedSchemaPatternMatcher.java | 100 +++++++-------- .../realtime/matcher/PipeDataRegionMatcher.java | 28 ++--- .../db/pipe/processor/PipeDoNothingProcessor.java | 4 +- .../org/apache/iotdb/db/pipe/task/PipeTask.java | 14 +-- .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 18 +-- .../db/pipe/task/connection/EventSupplier.java | 4 +- ...ectorStage.java => PipeTaskExtractorStage.java} | 30 ++--- .../db/pipe/task/stage/PipeTaskProcessorStage.java | 2 +- .../CachedSchemaPatternMatcherTest.java | 34 ++--- .../PipeRealtimeCollectTest.java | 26 ++-- 35 files changed, 375 insertions(+), 376 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java index 2b5bb5e6894..d5edeb49674 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java @@ -116,7 +116,7 @@ public class PipeTableResp implements DataSet { staticMeta.getPipeName(), staticMeta.getCreationTime(), runtimeMeta.getStatus().get().name(), - staticMeta.getCollectorParameters().toString(), + staticMeta.getExtractorParameters().toString(), staticMeta.getProcessorParameters().toString(), staticMeta.getConnectorParameters().toString(), exceptionMessageBuilder.toString())); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 7a24618d6a7..4c6466cdf40 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -126,7 +126,7 @@ public class PipePluginInfo implements SnapshotProcessor { new PipeParameters(createPipeRequest.getCollectorAttributes()); final String collectorPluginName = collectorParameters.getStringOrDefault( - PipeExtractorConstant.COLLECTOR_KEY, IOTDB_EXTRACTOR.getPipePluginName()); + PipeExtractorConstant.EXTRACTOR_KEY, IOTDB_EXTRACTOR.getPipePluginName()); if (!pipePluginMetaKeeper.containsPipePlugin(collectorPluginName)) { final String exceptionMessage = String.format( diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java index 09a43c4ef76..1024b866dda 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java @@ -63,7 +63,7 @@ public class PipeStaticMeta { return creationTime; } - public PipeParameters getCollectorParameters() { + public PipeParameters getExtractorParameters() { return collectorParameters; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 26b32a1be61..8d499872195 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -53,7 +53,7 @@ import org.apache.iotdb.db.mpp.metric.QueryResourceMetricSet; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode; -import org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener; +import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.rescon.MemTableManager; import org.apache.iotdb.db.rescon.PrimitiveArrayManager; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java index 7aa7c3d5449..0074e795134 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java @@ -195,11 +195,11 @@ public class PipePluginAgent { } } - public PipeExtractor reflectCollector(PipeParameters collectorParameters) { + public PipeExtractor reflectExtractor(PipeParameters extractorParameters) { return (PipeExtractor) reflect( - collectorParameters.getStringOrDefault( - PipeExtractorConstant.COLLECTOR_KEY, + extractorParameters.getStringOrDefault( + PipeExtractorConstant.EXTRACTOR_KEY, BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java similarity index 59% rename from server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java rename to server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java index 5c3f660f8e6..6b003860352 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java @@ -21,20 +21,20 @@ package org.apache.iotdb.db.pipe.config.constant; public class PipeExtractorConstant { - public static final String COLLECTOR_KEY = "collector"; + public static final String EXTRACTOR_KEY = "extractor"; - public static final String COLLECTOR_PATTERN_KEY = "collector.pattern"; - public static final String COLLECTOR_PATTERN_DEFAULT_VALUE = "root"; + public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern"; + public static final String EXTRACTOR_PATTERN_DEFAULT_VALUE = "root"; - public static final String COLLECTOR_HISTORY_ENABLE_KEY = "collector.history.enable"; - public static final String COLLECTOR_HISTORY_START_TIME = "collector.history.start-time"; - public static final String COLLECTOR_HISTORY_END_TIME = "collector.history.end-time"; + public static final String EXTRACTOR_HISTORY_ENABLE_KEY = "extractor.history.enable"; + public static final String EXTRACTOR_HISTORY_START_TIME = "extractor.history.start-time"; + public static final String EXTRACTOR_HISTORY_END_TIME = "extractor.history.end-time"; - public static final String COLLECTOR_REALTIME_ENABLE = "collector.realtime.enable"; - public static final String COLLECTOR_REALTIME_MODE = "collector.realtime.mode"; - public static final String COLLECTOR_REALTIME_MODE_HYBRID = "hybrid"; - public static final String COLLECTOR_REALTIME_MODE_FILE = "file"; - public static final String COLLECTOR_REALTIME_MODE_LOG = "log"; + public static final String EXTRACTOR_REALTIME_ENABLE = "extractor.realtime.enable"; + public static final String EXTRACTOR_REALTIME_MODE = "extractor.realtime.mode"; + public static final String EXTRACTOR_REALTIME_MODE_HYBRID = "hybrid"; + public static final String EXTRACTOR_REALTIME_MODE_FILE = "file"; + public static final String EXTRACTOR_REALTIME_MODE_LOG = "log"; private PipeExtractorConstant() { throw new IllegalStateException("Utility class"); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java similarity index 92% rename from server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java rename to server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java index 530c977b9eb..a726637d5b0 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java @@ -21,13 +21,13 @@ package org.apache.iotdb.db.pipe.config.plugin.env; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; -public class PipeTaskCollectorRuntimeEnvironment extends PipeTaskRuntimeEnvironment { +public class PipeTaskExtractorRuntimeEnvironment extends PipeTaskRuntimeEnvironment { private final int regionId; private final PipeTaskMeta pipeTaskMeta; - public PipeTaskCollectorRuntimeEnvironment( + public PipeTaskExtractorRuntimeEnvironment( String pipeName, long creationTime, int regionId, PipeTaskMeta pipeTaskMeta) { super(pipeName, creationTime); this.regionId = regionId; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java index 4f70f17674c..ddd0c78877c 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java @@ -125,7 +125,7 @@ public abstract class EnrichedEvent implements Event { * @return the pattern */ public final String getPattern() { - return pattern == null ? PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern; + return pattern == null ? PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern; } public abstract EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 24f52d0ea87..b769be62237 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -47,7 +47,7 @@ public class PipeRawTabletInsertionEvent implements TabletInsertionEvent { } public String getPattern() { - return pattern == null ? PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern; + return pattern == null ? PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern; } /////////////////////////// TabletInsertionEvent /////////////////////////// @@ -78,7 +78,7 @@ public class PipeRawTabletInsertionEvent implements TabletInsertionEvent { final String notNullPattern = getPattern(); // if notNullPattern is "root", we don't need to convert, just return the original tablet - if (notNullPattern.equals(PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) { + if (notNullPattern.equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE)) { return tablet; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java similarity index 90% rename from server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java rename to server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java index cb84d0bc11a..e29e979618a 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java @@ -21,30 +21,30 @@ package org.apache.iotdb.db.pipe.event.realtime; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; -import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch; import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch; import org.apache.iotdb.pipe.api.event.Event; import java.util.Map; /** * PipeRealtimeCollectEvent is an event that decorates the EnrichedEvent with the information of - * TsFileEpoch and schema info. It only exists in the realtime event collector. + * TsFileEpoch and schema info. It only exists in the realtime event extractor. */ -public class PipeRealtimeCollectEvent extends EnrichedEvent { +public class PipeRealtimeEvent extends EnrichedEvent { private final EnrichedEvent event; private final TsFileEpoch tsFileEpoch; private Map<String, String[]> device2Measurements; - public PipeRealtimeCollectEvent( + public PipeRealtimeEvent( EnrichedEvent event, TsFileEpoch tsFileEpoch, Map<String, String[]> device2Measurements, String pattern) { // pipeTaskMeta is used to report the progress of the event, the PipeRealtimeCollectEvent - // is only used in the realtime event collector, which does not need to report the progress + // is only used in the realtime event extractor, which does not need to report the progress // of the event, so the pipeTaskMeta is always null. super(null, pattern); @@ -53,14 +53,14 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent { this.device2Measurements = device2Measurements; } - public PipeRealtimeCollectEvent( + public PipeRealtimeEvent( EnrichedEvent event, TsFileEpoch tsFileEpoch, Map<String, String[]> device2Measurements, PipeTaskMeta pipeTaskMeta, String pattern) { // pipeTaskMeta is used to report the progress of the event, the PipeRealtimeCollectEvent - // is only used in the realtime event collector, which does not need to report the progress + // is only used in the realtime event extractor, which does not need to report the progress // of the event, so the pipeTaskMeta is always null. super(pipeTaskMeta, pattern); @@ -119,9 +119,9 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent { } @Override - public PipeRealtimeCollectEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( + public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( PipeTaskMeta pipeTaskMeta, String pattern) { - return new PipeRealtimeCollectEvent( + return new PipeRealtimeEvent( event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta, pattern), this.tsFileEpoch, this.device2Measurements, diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java similarity index 85% rename from server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java rename to server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java index 2b2be4abc21..247a8b17daf 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEventFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java @@ -21,21 +21,21 @@ package org.apache.iotdb.db.pipe.event.realtime; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; -import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpochManager; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpochManager; import org.apache.iotdb.db.wal.utils.WALEntryHandler; -public class PipeRealtimeCollectEventFactory { +public class PipeRealtimeEventFactory { private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new TsFileEpochManager(); - public static PipeRealtimeCollectEvent createCollectEvent(TsFileResource resource) { + public static PipeRealtimeEvent createCollectEvent(TsFileResource resource) { return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent( new PipeTsFileInsertionEvent(resource), resource); } - public static PipeRealtimeCollectEvent createCollectEvent( + public static PipeRealtimeEvent createCollectEvent( WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource resource) { return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent( new PipeInsertNodeTabletInsertionEvent( @@ -44,7 +44,7 @@ public class PipeRealtimeCollectEventFactory { resource); } - private PipeRealtimeCollectEventFactory() { + private PipeRealtimeEventFactory() { // factory class, do not instantiate } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionExtractor.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java similarity index 58% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionExtractor.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java index 48a34fc923c..d909e9b9493 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionExtractor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java @@ -17,18 +17,18 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector; +package org.apache.iotdb.db.pipe.extractor; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.db.engine.StorageEngine; -import org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionExtractor; -import org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionTsFileExtractor; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionFakeExtractor; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridExtractor; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionLogExtractor; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionTsFileExtractor; -import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; +import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor; +import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionTsFileExtractor; +import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor; +import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionFakeExtractor; +import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionHybridExtractor; +import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionLogExtractor; +import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionTsFileExtractor; import org.apache.iotdb.pipe.api.PipeExtractor; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; @@ -42,12 +42,12 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_ENABLE_KEY; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_ENABLE; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE_FILE; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE_HYBRID; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_REALTIME_MODE_LOG; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG; public class IoTDBDataRegionExtractor implements PipeExtractor { @@ -55,8 +55,8 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { private final AtomicBoolean hasBeenStarted; - private PipeHistoricalDataRegionExtractor historicalCollector; - private PipeRealtimeDataRegionExtractor realtimeCollector; + private PipeHistoricalDataRegionExtractor historicalExtractor; + private PipeRealtimeDataRegionExtractor realtimeExtractor; private int dataRegionId; @@ -66,70 +66,70 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { @Override public void validate(PipeParameterValidator validator) throws Exception { - // validate collector.history.enable and collector.realtime.enable + // validate extractor.history.enable and extractor.realtime.enable validator .validateAttributeValueRange( - COLLECTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString()) + EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString()) .validateAttributeValueRange( - COLLECTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(), Boolean.FALSE.toString()) + EXTRACTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(), Boolean.FALSE.toString()) .validate( args -> (boolean) args[0] || (boolean) args[1], String.format( "Should not set both %s and %s to false.", - COLLECTOR_HISTORY_ENABLE_KEY, COLLECTOR_REALTIME_ENABLE), - validator.getParameters().getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true), - validator.getParameters().getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true)); + EXTRACTOR_HISTORY_ENABLE_KEY, EXTRACTOR_REALTIME_ENABLE), + validator.getParameters().getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true), + validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)); - // validate collector.realtime.mode - if (validator.getParameters().getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true)) { + // validate extractor.realtime.mode + if (validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) { validator.validateAttributeValueRange( - COLLECTOR_REALTIME_MODE, + EXTRACTOR_REALTIME_MODE, true, - COLLECTOR_REALTIME_MODE_HYBRID, - COLLECTOR_REALTIME_MODE_FILE, - COLLECTOR_REALTIME_MODE_LOG); + EXTRACTOR_REALTIME_MODE_HYBRID, + EXTRACTOR_REALTIME_MODE_FILE, + EXTRACTOR_REALTIME_MODE_LOG); } - constructHistoricalCollector(); - constructRealtimeCollector(validator.getParameters()); + constructHistoricalExtractor(); + constructRealtimeExtractor(validator.getParameters()); - historicalCollector.validate(validator); - realtimeCollector.validate(validator); + historicalExtractor.validate(validator); + realtimeExtractor.validate(validator); } - private void constructHistoricalCollector() { - // enable historical collector by default - historicalCollector = new PipeHistoricalDataRegionTsFileExtractor(); + private void constructHistoricalExtractor() { + // enable historical extractor by default + historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor(); } - private void constructRealtimeCollector(PipeParameters parameters) { - // enable realtime collector by default - if (!parameters.getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true)) { - realtimeCollector = new PipeRealtimeDataRegionFakeExtractor(); + private void constructRealtimeExtractor(PipeParameters parameters) { + // enable realtime extractor by default + if (!parameters.getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) { + realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor(); return; } // use hybrid mode by default - if (!parameters.hasAttribute(COLLECTOR_REALTIME_MODE)) { - realtimeCollector = new PipeRealtimeDataRegionHybridExtractor(); + if (!parameters.hasAttribute(EXTRACTOR_REALTIME_MODE)) { + realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); return; } - switch (parameters.getString(COLLECTOR_REALTIME_MODE)) { - case COLLECTOR_REALTIME_MODE_FILE: - realtimeCollector = new PipeRealtimeDataRegionTsFileExtractor(); + switch (parameters.getString(EXTRACTOR_REALTIME_MODE)) { + case EXTRACTOR_REALTIME_MODE_FILE: + realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor(); break; - case COLLECTOR_REALTIME_MODE_LOG: - realtimeCollector = new PipeRealtimeDataRegionLogExtractor(); + case EXTRACTOR_REALTIME_MODE_LOG: + realtimeExtractor = new PipeRealtimeDataRegionLogExtractor(); break; - case COLLECTOR_REALTIME_MODE_HYBRID: - realtimeCollector = new PipeRealtimeDataRegionHybridExtractor(); + case EXTRACTOR_REALTIME_MODE_HYBRID: + realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); break; default: - realtimeCollector = new PipeRealtimeDataRegionHybridExtractor(); + realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); LOGGER.warn( - "Unsupported collector realtime mode: {}, create a hybrid collector.", - parameters.getString(COLLECTOR_REALTIME_MODE)); + "Unsupported extractor realtime mode: {}, create a hybrid extractor.", + parameters.getString(EXTRACTOR_REALTIME_MODE)); } } @@ -137,10 +137,10 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { public void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration) throws Exception { dataRegionId = - ((PipeTaskCollectorRuntimeEnvironment) configuration.getRuntimeEnvironment()).getRegionId(); + ((PipeTaskExtractorRuntimeEnvironment) configuration.getRuntimeEnvironment()).getRegionId(); - historicalCollector.customize(parameters, configuration); - realtimeCollector.customize(parameters, configuration); + historicalExtractor.customize(parameters, configuration); + realtimeExtractor.customize(parameters, configuration); } @Override @@ -153,7 +153,7 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { final AtomicReference<Exception> exceptionHolder = new AtomicReference<>(null); final DataRegionId dataRegionIdObject = new DataRegionId(this.dataRegionId); while (true) { - // try to start collectors in the data region ... + // try to start extractors in the data region ... // first try to run if data region exists, then try to run if data region does not exist. // both conditions fail is not common, which means the data region is created during the // runIfPresent and runIfAbsent operations. in this case, we need to retry. @@ -165,7 +165,7 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { String.format( "Pipe: starting %s", IoTDBDataRegionExtractor.class.getName())); try { - startHistoricalCollectorAndRealtimeCollector(exceptionHolder); + startHistoricalExtractorAndRealtimeExtractor(exceptionHolder); } finally { dataRegion.writeUnlock(); } @@ -173,7 +173,7 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { || StorageEngine.getInstance() .runIfAbsent( dataRegionIdObject, - () -> startHistoricalCollectorAndRealtimeCollector(exceptionHolder))) { + () -> startHistoricalExtractorAndRealtimeExtractor(exceptionHolder))) { rethrowExceptionIfAny(exceptionHolder); return; } @@ -181,37 +181,37 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { } } - private void startHistoricalCollectorAndRealtimeCollector( + private void startHistoricalExtractorAndRealtimeExtractor( AtomicReference<Exception> exceptionHolder) { try { - historicalCollector.start(); - realtimeCollector.start(); + historicalExtractor.start(); + realtimeExtractor.start(); } catch (Exception e) { exceptionHolder.set(e); LOGGER.warn( String.format( - "Start historical collector %s and realtime collector %s error.", - historicalCollector, realtimeCollector), + "Start historical extractor %s and realtime extractor %s error.", + historicalExtractor, realtimeExtractor), e); } } private void rethrowExceptionIfAny(AtomicReference<Exception> exceptionHolder) { if (exceptionHolder.get() != null) { - throw new PipeException("failed to start collectors.", exceptionHolder.get()); + throw new PipeException("failed to start extractors.", exceptionHolder.get()); } } @Override public Event supply() throws Exception { - return historicalCollector.hasConsumedAll() - ? realtimeCollector.supply() - : historicalCollector.supply(); + return historicalExtractor.hasConsumedAll() + ? realtimeExtractor.supply() + : historicalExtractor.supply(); } @Override public void close() throws Exception { - historicalCollector.close(); - realtimeCollector.close(); + historicalExtractor.close(); + realtimeExtractor.close(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionExtractor.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java similarity index 94% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionExtractor.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java index 858ae933cb5..4cba6de3f01 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionExtractor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.historical; +package org.apache.iotdb.db.pipe.extractor.historical; import org.apache.iotdb.pipe.api.PipeExtractor; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileExtractor.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java similarity index 84% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileExtractor.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java index 2327e63b07a..c59ba2a0ec3 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.historical; +package org.apache.iotdb.db.pipe.extractor.historical; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; @@ -27,7 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.DataRegion; import org.apache.iotdb.db.engine.storagegroup.TsFileManager; import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; @@ -44,11 +44,11 @@ import java.util.ArrayDeque; import java.util.Queue; import java.util.stream.Collectors; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_ENABLE_KEY; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_END_TIME; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_HISTORY_START_TIME; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.COLLECTOR_PATTERN_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY; public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDataRegionExtractor { @@ -77,41 +77,41 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa @Override public void customize( PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration) { - final PipeTaskCollectorRuntimeEnvironment environment = - (PipeTaskCollectorRuntimeEnvironment) configuration.getRuntimeEnvironment(); + final PipeTaskExtractorRuntimeEnvironment environment = + (PipeTaskExtractorRuntimeEnvironment) configuration.getRuntimeEnvironment(); pipeTaskMeta = environment.getPipeTaskMeta(); startIndex = environment.getPipeTaskMeta().getProgressIndex(); dataRegionId = environment.getRegionId(); - pattern = parameters.getStringOrDefault(COLLECTOR_PATTERN_KEY, COLLECTOR_PATTERN_DEFAULT_VALUE); + pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY, EXTRACTOR_PATTERN_DEFAULT_VALUE); - // user may set the COLLECTOR_HISTORY_START_TIME and COLLECTOR_HISTORY_END_TIME without + // user may set the EXTRACTOR_HISTORY_START_TIME and EXTRACTOR_HISTORY_END_TIME without // enabling the historical data collection, which may affect the realtime data collection. - final boolean isHistoricalCollectorEnabledByUser = - parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true); + final boolean isHistoricalExtractorEnabledByUser = + parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true); historicalDataCollectionStartTime = - isHistoricalCollectorEnabledByUser && parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME) + isHistoricalExtractorEnabledByUser && parameters.hasAttribute(EXTRACTOR_HISTORY_START_TIME) ? DateTimeUtils.convertDatetimeStrToLong( - parameters.getString(COLLECTOR_HISTORY_START_TIME), ZoneId.systemDefault()) + parameters.getString(EXTRACTOR_HISTORY_START_TIME), ZoneId.systemDefault()) : Long.MIN_VALUE; historicalDataCollectionEndTime = - isHistoricalCollectorEnabledByUser && parameters.hasAttribute(COLLECTOR_HISTORY_END_TIME) + isHistoricalExtractorEnabledByUser && parameters.hasAttribute(EXTRACTOR_HISTORY_END_TIME) ? DateTimeUtils.convertDatetimeStrToLong( - parameters.getString(COLLECTOR_HISTORY_END_TIME), ZoneId.systemDefault()) + parameters.getString(EXTRACTOR_HISTORY_END_TIME), ZoneId.systemDefault()) : Long.MAX_VALUE; - // enable historical collector by default + // enable historical extractor by default historicalDataCollectionTimeLowerBound = - parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true) + parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true) ? Long.MIN_VALUE // We define the realtime data as the data generated after the creation time // of the pipe from user's perspective. But we still need to use - // PipeHistoricalDataRegionCollector to collect the realtime data generated between the + // PipeHistoricalDataRegionExtractor to extract the realtime data generated between the // creation time of the pipe and the time when the pipe starts, because those data - // can not be listened by PipeRealtimeDataRegionCollector, and should be collected by - // PipeHistoricalDataRegionCollector from implementation perspective. + // can not be listened by PipeRealtimeDataRegionExtractor, and should be collected by + // PipeHistoricalDataRegionExtractor from implementation perspective. : environment.getCreationTime(); // Only invoke flushDataRegionAllTsFiles() when the pipe runs in the realtime only mode. @@ -143,7 +143,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa return; } - dataRegion.writeLock("Pipe: create historical TsFile collector"); + dataRegion.writeLock("Pipe: create historical TsFile extractor"); try { dataRegion.syncCloseAllWorkingTsFileProcessors(); } finally { @@ -160,7 +160,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa return; } - dataRegion.writeLock("Pipe: start to collect historical TsFile"); + dataRegion.writeLock("Pipe: start to extract historical TsFile"); try { dataRegion.syncCloseAllWorkingTsFileProcessors(); @@ -224,10 +224,10 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa } catch (IOException e) { LOGGER.warn( String.format( - "failed to get the generation time of TsFile %s, collect it anyway", + "failed to get the generation time of TsFile %s, extract it anyway", resource.getTsFilePath()), e); - // If failed to get the generation time of the TsFile, we will collect the data in the TsFile + // If failed to get the generation time of the TsFile, we will extract the data in the TsFile // anyway. return true; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionExtractor.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java similarity index 81% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionExtractor.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java index aec400b6d5d..aa0346aace9 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionExtractor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java @@ -17,13 +17,13 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.realtime; +package org.apache.iotdb.db.pipe.extractor.realtime; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; -import org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant; -import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; -import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; +import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.pipe.api.PipeExtractor; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; @@ -45,11 +45,11 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { throws Exception { pattern = parameters.getStringOrDefault( - PipeExtractorConstant.COLLECTOR_PATTERN_KEY, - PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE); + PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, + PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE); - final PipeTaskCollectorRuntimeEnvironment environment = - (PipeTaskCollectorRuntimeEnvironment) configuration.getRuntimeEnvironment(); + final PipeTaskExtractorRuntimeEnvironment environment = + (PipeTaskExtractorRuntimeEnvironment) configuration.getRuntimeEnvironment(); dataRegionId = String.valueOf(environment.getRegionId()); pipeTaskMeta = environment.getPipeTaskMeta(); } @@ -65,7 +65,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { } /** @param event the event from the storage engine */ - public abstract void collect(PipeRealtimeCollectEvent event); + public abstract void extract(PipeRealtimeEvent event); public abstract boolean isNeedListenToTsFile(); @@ -81,7 +81,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { @Override public String toString() { - return "PipeRealtimeDataRegionCollector{" + return "PipeRealtimeDataRegionExtractor{" + "pattern='" + pattern + '\'' diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeExtractor.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionFakeExtractor.java similarity index 88% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeExtractor.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionFakeExtractor.java index cffb273cee1..0f04ba475c5 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeExtractor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionFakeExtractor.java @@ -17,9 +17,9 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.realtime; +package org.apache.iotdb.db.pipe.extractor.realtime; -import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -49,7 +49,7 @@ public class PipeRealtimeDataRegionFakeExtractor extends PipeRealtimeDataRegionE } @Override - public void collect(PipeRealtimeCollectEvent event) { + public void extract(PipeRealtimeEvent event) { // do nothing } @@ -70,6 +70,6 @@ public class PipeRealtimeDataRegionFakeExtractor extends PipeRealtimeDataRegionE @Override public String toString() { - return "PipeRealtimeDataRegionFakeCollector{}"; + return "PipeRealtimeDataRegionFakeExtractor{}"; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridExtractor.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java similarity index 87% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridExtractor.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java index a78ad713feb..06a0135a4ee 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -17,13 +17,13 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.realtime; +package org.apache.iotdb.db.pipe.extractor.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch; -import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; +import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch; import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -37,7 +37,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class); - // This queue is used to store pending events collected by the method collect(). The method + // This queue is used to store pending events collected by the method extract(). The method // supply() will poll events from this queue and send them to the next pipe plugin. private final UnboundedBlockingPendingQueue<Event> pendingQueue; @@ -46,7 +46,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } @Override - public void collect(PipeRealtimeCollectEvent event) { + public void extract(PipeRealtimeEvent event) { final Event eventToCollect = event.getEvent(); if (eventToCollect instanceof TabletInsertionEvent) { @@ -56,7 +56,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } else { throw new UnsupportedOperationException( String.format( - "Unsupported event type %s for Hybrid Realtime Collector %s", + "Unsupported event type %s for hybrid realtime extractor %s", eventToCollect.getClass(), this)); } } @@ -71,10 +71,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio return true; } - private void collectTabletInsertion(PipeRealtimeCollectEvent event) { + private void collectTabletInsertion(PipeRealtimeEvent event) { if (isApproachingCapacity()) { event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); - // if the pending queue is approaching capacity, we should not collect any more tablet events. + // if the pending queue is approaching capacity, we should not extract any more tablet events. // all the data represented by the tablet events should be carried by the following tsfile // event. return; @@ -83,7 +83,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio if (!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE) && !pendingQueue.offer(event)) { LOGGER.warn( - "collectTabletInsertion: pending queue of PipeRealtimeDataRegionHybridCollector {} has reached capacity, discard tablet event {}, current state {}", + "collectTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} has reached capacity, discard tablet event {}, current state {}", this, event, event.getTsFileEpoch().getState(this)); @@ -92,7 +92,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } } - private void collectTsFileInsertion(PipeRealtimeCollectEvent event) { + private void collectTsFileInsertion(PipeRealtimeEvent event) { event .getTsFileEpoch() .migrateState( @@ -102,7 +102,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio if (!pendingQueue.offer(event)) { LOGGER.warn( - "collectTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridCollector {} has reached capacity, discard TsFile event {}, current state {}", + "collectTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} has reached capacity, discard TsFile event {}, current state {}", this, event, event.getTsFileEpoch().getState(this)); @@ -118,7 +118,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio @Override public Event supply() { - PipeRealtimeCollectEvent collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll(); + PipeRealtimeEvent collectEvent = (PipeRealtimeEvent) pendingQueue.poll(); while (collectEvent != null) { Event suppliedEvent; @@ -132,7 +132,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } else { throw new UnsupportedOperationException( String.format( - "Unsupported event type %s for Hybrid Realtime Collector %s to supply.", + "Unsupported event type %s for hybrid realtime extractor %s to supply.", eventToSupply.getClass(), this)); } @@ -141,14 +141,14 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio return suppliedEvent; } - collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll(); + collectEvent = (PipeRealtimeEvent) pendingQueue.poll(); } // means the pending queue is empty. return null; } - private Event supplyTabletInsertion(PipeRealtimeCollectEvent event) { + private Event supplyTabletInsertion(PipeRealtimeEvent event) { event .getTsFileEpoch() .migrateState( @@ -172,7 +172,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio return null; } - private Event supplyTsFileInsertion(PipeRealtimeCollectEvent event) { + private Event supplyTsFileInsertion(PipeRealtimeEvent event) { event .getTsFileEpoch() .migrateState( diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogExtractor.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java similarity index 86% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogExtractor.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java index b50e48536c2..641150826ee 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogExtractor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java @@ -17,12 +17,12 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.realtime; +package org.apache.iotdb.db.pipe.extractor.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch; -import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; +import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch; import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -35,7 +35,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionLogExtractor.class); - // This queue is used to store pending events collected by the method collect(). The method + // This queue is used to store pending events collected by the method extract(). The method // supply() will poll events from this queue and send them to the next pipe plugin. private final UnboundedBlockingPendingQueue<Event> pendingQueue; @@ -44,7 +44,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx } @Override - public void collect(PipeRealtimeCollectEvent event) { + public void extract(PipeRealtimeEvent event) { event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TABLET); if (!(event.getEvent() instanceof TabletInsertionEvent)) { @@ -53,7 +53,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx if (!pendingQueue.offer(event)) { LOGGER.warn( - "collect: pending queue of PipeRealtimeDataRegionLogCollector {} has reached capacity, discard tablet event {}, current state {}", + "extract: pending queue of PipeRealtimeDataRegionLogExtractor {} has reached capacity, discard tablet event {}, current state {}", this, event, event.getTsFileEpoch().getState(this)); @@ -74,7 +74,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx @Override public Event supply() { - PipeRealtimeCollectEvent collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll(); + PipeRealtimeEvent collectEvent = (PipeRealtimeEvent) pendingQueue.poll(); while (collectEvent != null) { Event suppliedEvent = null; @@ -99,7 +99,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx return suppliedEvent; } - collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll(); + collectEvent = (PipeRealtimeEvent) pendingQueue.poll(); } // means the pending queue is empty. diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java similarity index 86% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileExtractor.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java index 2bc9abee7ad..da90c60e53f 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileExtractor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java @@ -17,12 +17,12 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.realtime; +package org.apache.iotdb.db.pipe.extractor.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch; -import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; +import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch; import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -35,7 +35,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileExtractor.class); - // This queue is used to store pending events collected by the method collect(). The method + // This queue is used to store pending events collected by the method extract(). The method // supply() will poll events from this queue and send them to the next pipe plugin. private final UnboundedBlockingPendingQueue<Event> pendingQueue; @@ -44,7 +44,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio } @Override - public void collect(PipeRealtimeCollectEvent event) { + public void extract(PipeRealtimeEvent event) { event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); if (!(event.getEvent() instanceof TsFileInsertionEvent)) { @@ -53,7 +53,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio if (!pendingQueue.offer(event)) { LOGGER.warn( - "collect: pending queue of PipeRealtimeDataRegionTsFileCollector {} has reached capacity, discard TsFile event {}, current state {}", + "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} has reached capacity, discard TsFile event {}, current state {}", this, event, event.getTsFileEpoch().getState(this)); @@ -74,7 +74,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio @Override public Event supply() { - PipeRealtimeCollectEvent collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll(); + PipeRealtimeEvent collectEvent = (PipeRealtimeEvent) pendingQueue.poll(); while (collectEvent != null) { Event suppliedEvent = null; @@ -100,7 +100,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio return suppliedEvent; } - collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll(); + collectEvent = (PipeRealtimeEvent) pendingQueue.poll(); } // means the pending queue is empty. diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java similarity index 98% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java index bc22132d4cd..0b335bf11e7 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/DisruptorQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.realtime.assigner; +package org.apache.iotdb.db.pipe.extractor.realtime.assigner; import org.apache.iotdb.commons.pipe.config.PipeConfig; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java similarity index 60% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java index 116f63f3073..d5ca1a10b90 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/assigner/PipeDataRegionAssigner.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java @@ -17,61 +17,61 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.realtime.assigner; +package org.apache.iotdb.db.pipe.extractor.realtime.assigner; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; -import org.apache.iotdb.db.pipe.collector.realtime.matcher.CachedSchemaPatternMatcher; -import org.apache.iotdb.db.pipe.collector.realtime.matcher.PipeDataRegionMatcher; -import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; +import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor; +import org.apache.iotdb.db.pipe.extractor.realtime.matcher.CachedSchemaPatternMatcher; +import org.apache.iotdb.db.pipe.extractor.realtime.matcher.PipeDataRegionMatcher; import com.lmax.disruptor.dsl.ProducerType; public class PipeDataRegionAssigner { - /** The matcher is used to match the event with the collector based on the pattern. */ + /** The matcher is used to match the event with the extractor based on the pattern. */ private final PipeDataRegionMatcher matcher; - /** The disruptor is used to assign the event to the collector. */ - private final DisruptorQueue<PipeRealtimeCollectEvent> disruptor; + /** The disruptor is used to assign the event to the extractor. */ + private final DisruptorQueue<PipeRealtimeEvent> disruptor; public PipeDataRegionAssigner() { this.matcher = new CachedSchemaPatternMatcher(); this.disruptor = - new DisruptorQueue.Builder<PipeRealtimeCollectEvent>() + new DisruptorQueue.Builder<PipeRealtimeEvent>() .setProducerType(ProducerType.SINGLE) - .addEventHandler(this::assignToCollector) + .addEventHandler(this::assignToExtractor) .build(); } - public void publishToAssign(PipeRealtimeCollectEvent event) { + public void publishToAssign(PipeRealtimeEvent event) { event.increaseReferenceCount(PipeDataRegionAssigner.class.getName()); disruptor.publish(event); } - public void assignToCollector(PipeRealtimeCollectEvent event, long sequence, boolean endOfBatch) { + public void assignToExtractor(PipeRealtimeEvent event, long sequence, boolean endOfBatch) { matcher .match(event) .forEach( - collector -> { - final PipeRealtimeCollectEvent copiedEvent = + extractor -> { + final PipeRealtimeEvent copiedEvent = event.shallowCopySelfAndBindPipeTaskMetaForProgressReport( - collector.getPipeTaskMeta(), collector.getPattern()); + extractor.getPipeTaskMeta(), extractor.getPattern()); copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName()); - collector.collect(copiedEvent); + extractor.extract(copiedEvent); }); event.gcSchemaInfo(); event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName()); } - public void startAssignTo(PipeRealtimeDataRegionExtractor collector) { - matcher.register(collector); + public void startAssignTo(PipeRealtimeDataRegionExtractor extractor) { + matcher.register(extractor); } - public void stopAssignTo(PipeRealtimeDataRegionExtractor collector) { - matcher.deregister(collector); + public void stopAssignTo(PipeRealtimeDataRegionExtractor extractor) { + matcher.deregister(extractor); } - public boolean notMoreCollectorNeededToBeAssigned() { + public boolean notMoreExtractorNeededToBeAssigned() { return matcher.getRegisterCount() == 0; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java similarity index 73% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java index a4806c30774..55a500bb1ed 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpoch.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java @@ -17,9 +17,9 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.realtime.epoch; +package org.apache.iotdb.db.pipe.extractor.realtime.epoch; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; +import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -29,23 +29,23 @@ public class TsFileEpoch { private final String filePath; private final ConcurrentMap<PipeRealtimeDataRegionExtractor, AtomicReference<State>> - dataRegionCollector2State; + dataRegionExtractor2State; public TsFileEpoch(String filePath) { this.filePath = filePath; - this.dataRegionCollector2State = new ConcurrentHashMap<>(); + this.dataRegionExtractor2State = new ConcurrentHashMap<>(); } - public TsFileEpoch.State getState(PipeRealtimeDataRegionExtractor collector) { - return dataRegionCollector2State - .computeIfAbsent(collector, o -> new AtomicReference<>(State.EMPTY)) + public TsFileEpoch.State getState(PipeRealtimeDataRegionExtractor extractor) { + return dataRegionExtractor2State + .computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY)) .get(); } public void migrateState( - PipeRealtimeDataRegionExtractor collector, TsFileEpochStateMigrator visitor) { - dataRegionCollector2State - .computeIfAbsent(collector, o -> new AtomicReference<>(State.EMPTY)) + PipeRealtimeDataRegionExtractor extractor, TsFileEpochStateMigrator visitor) { + dataRegionExtractor2State + .computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY)) .getAndUpdate(visitor::migrate); } @@ -55,8 +55,8 @@ public class TsFileEpoch { + "filePath='" + filePath + '\'' - + ", dataRegionCollector2State=" - + dataRegionCollector2State + + ", dataRegionExtractor2State=" + + dataRegionExtractor2State + '}'; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java similarity index 87% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java index dbecc2f2a72..ebf57441fdc 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java @@ -17,13 +17,13 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.realtime.epoch; +package org.apache.iotdb.db.pipe.extractor.realtime.epoch; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +41,7 @@ public class TsFileEpochManager { private final Map<String, TsFileEpoch> filePath2Epoch = new HashMap<>(); - public PipeRealtimeCollectEvent bindPipeTsFileInsertionEvent( + public PipeRealtimeEvent bindPipeTsFileInsertionEvent( PipeTsFileInsertionEvent event, TsFileResource resource) { final String filePath = resource.getTsFilePath(); @@ -53,7 +53,7 @@ public class TsFileEpochManager { return new TsFileEpoch(path); }); - return new PipeRealtimeCollectEvent( + return new PipeRealtimeEvent( event, filePath2Epoch.remove(filePath), resource.getDevices().stream() @@ -61,9 +61,9 @@ public class TsFileEpochManager { event.getPattern()); } - public PipeRealtimeCollectEvent bindPipeInsertNodeTabletInsertionEvent( + public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent( PipeInsertNodeTabletInsertionEvent event, InsertNode node, TsFileResource resource) { - return new PipeRealtimeCollectEvent( + return new PipeRealtimeEvent( event, filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), TsFileEpoch::new), Collections.singletonMap(node.getDevicePath().getFullPath(), node.getMeasurements()), diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochStateMigrator.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochStateMigrator.java similarity index 94% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochStateMigrator.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochStateMigrator.java index b95d3349b47..b4b6ada0ba9 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochStateMigrator.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochStateMigrator.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.realtime.epoch; +package org.apache.iotdb.db.pipe.extractor.realtime.epoch; @FunctionalInterface public interface TsFileEpochStateMigrator { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java similarity index 73% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java index 00758562073..50dea414a78 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/listener/PipeInsertionDataNodeListener.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java @@ -17,14 +17,14 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.realtime.listener; +package org.apache.iotdb.db.pipe.extractor.realtime.listener; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; -import org.apache.iotdb.db.pipe.collector.realtime.assigner.PipeDataRegionAssigner; -import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEventFactory; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory; +import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor; +import org.apache.iotdb.db.pipe.extractor.realtime.assigner.PipeDataRegionAssigner; import org.apache.iotdb.db.wal.utils.WALEntryHandler; import java.util.concurrent.ConcurrentHashMap; @@ -40,49 +40,49 @@ import java.util.concurrent.atomic.AtomicInteger; * * <p>All events collected by this listener will be first published to different * PipeEventDataRegionAssigners (identified by data region id), and then PipeEventDataRegionAssigner - * will filter events and assign them to different PipeRealtimeEventDataRegionCollectors. + * will filter events and assign them to different PipeRealtimeEventDataRegionExtractors. */ public class PipeInsertionDataNodeListener { private final ConcurrentMap<String, PipeDataRegionAssigner> dataRegionId2Assigner = new ConcurrentHashMap<>(); - private final AtomicInteger listenToTsFileCollectorCount = new AtomicInteger(0); - private final AtomicInteger listenToInsertNodeCollectorCount = new AtomicInteger(0); + private final AtomicInteger listenToTsFileExtractorCount = new AtomicInteger(0); + private final AtomicInteger listenToInsertNodeExtractorCount = new AtomicInteger(0); //////////////////////////// start & stop //////////////////////////// public synchronized void startListenAndAssign( - String dataRegionId, PipeRealtimeDataRegionExtractor collector) { + String dataRegionId, PipeRealtimeDataRegionExtractor extractor) { dataRegionId2Assigner .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner()) - .startAssignTo(collector); + .startAssignTo(extractor); - if (collector.isNeedListenToTsFile()) { - listenToTsFileCollectorCount.incrementAndGet(); + if (extractor.isNeedListenToTsFile()) { + listenToTsFileExtractorCount.incrementAndGet(); } - if (collector.isNeedListenToInsertNode()) { - listenToInsertNodeCollectorCount.incrementAndGet(); + if (extractor.isNeedListenToInsertNode()) { + listenToInsertNodeExtractorCount.incrementAndGet(); } } public synchronized void stopListenAndAssign( - String dataRegionId, PipeRealtimeDataRegionExtractor collector) { + String dataRegionId, PipeRealtimeDataRegionExtractor extractor) { final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); if (assigner == null) { return; } - assigner.stopAssignTo(collector); + assigner.stopAssignTo(extractor); - if (collector.isNeedListenToTsFile()) { - listenToTsFileCollectorCount.decrementAndGet(); + if (extractor.isNeedListenToTsFile()) { + listenToTsFileExtractorCount.decrementAndGet(); } - if (collector.isNeedListenToInsertNode()) { - listenToInsertNodeCollectorCount.decrementAndGet(); + if (extractor.isNeedListenToInsertNode()) { + listenToInsertNodeExtractorCount.decrementAndGet(); } - if (assigner.notMoreCollectorNeededToBeAssigned()) { + if (assigner.notMoreExtractorNeededToBeAssigned()) { // the removed assigner will is the same as the one referenced by the variable `assigner` dataRegionId2Assigner.remove(dataRegionId); // this will help to release the memory occupied by the assigner @@ -93,9 +93,9 @@ public class PipeInsertionDataNodeListener { //////////////////////////// listen to events //////////////////////////// public void listenToTsFile(String dataRegionId, TsFileResource tsFileResource) { - // wo don't judge whether listenToTsFileCollectorCount.get() == 0 here, because + // wo don't judge whether listenToTsFileExtractorCount.get() == 0 here, because // when using SimpleProgressIndex, the tsfile event needs to be assigned to the - // collector even if listenToTsFileCollectorCount.get() == 0 to record the progress + // extractor even if listenToTsFileExtractorCount.get() == 0 to record the progress PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource); @@ -106,7 +106,7 @@ public class PipeInsertionDataNodeListener { return; } - assigner.publishToAssign(PipeRealtimeCollectEventFactory.createCollectEvent(tsFileResource)); + assigner.publishToAssign(PipeRealtimeEventFactory.createCollectEvent(tsFileResource)); } public void listenToInsertNode( @@ -114,7 +114,7 @@ public class PipeInsertionDataNodeListener { WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource tsFileResource) { - if (listenToInsertNodeCollectorCount.get() == 0) { + if (listenToInsertNodeExtractorCount.get() == 0) { return; } @@ -126,8 +126,7 @@ public class PipeInsertionDataNodeListener { } assigner.publishToAssign( - PipeRealtimeCollectEventFactory.createCollectEvent( - walEntryHandler, insertNode, tsFileResource)); + PipeRealtimeEventFactory.createCollectEvent(walEntryHandler, insertNode, tsFileResource)); } /////////////////////////////// singleton /////////////////////////////// diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/CachedSchemaPatternMatcher.java similarity index 67% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/CachedSchemaPatternMatcher.java index 0af3151ed9a..bb046ee972b 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/CachedSchemaPatternMatcher.java @@ -17,11 +17,11 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.realtime.matcher; +package org.apache.iotdb.db.pipe.extractor.realtime.matcher; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; -import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; +import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import com.github.benmanes.caffeine.cache.Cache; @@ -40,35 +40,35 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { private final ReentrantReadWriteLock lock; - private final Set<PipeRealtimeDataRegionExtractor> collectors; - private final Cache<String, Set<PipeRealtimeDataRegionExtractor>> deviceToCollectorsCache; + private final Set<PipeRealtimeDataRegionExtractor> extractors; + private final Cache<String, Set<PipeRealtimeDataRegionExtractor>> deviceToExtractorsCache; public CachedSchemaPatternMatcher() { this.lock = new ReentrantReadWriteLock(); - this.collectors = new HashSet<>(); - this.deviceToCollectorsCache = + this.extractors = new HashSet<>(); + this.deviceToExtractorsCache = Caffeine.newBuilder() .maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize()) .build(); } @Override - public void register(PipeRealtimeDataRegionExtractor collector) { + public void register(PipeRealtimeDataRegionExtractor extractor) { lock.writeLock().lock(); try { - collectors.add(collector); - deviceToCollectorsCache.invalidateAll(); + extractors.add(extractor); + deviceToExtractorsCache.invalidateAll(); } finally { lock.writeLock().unlock(); } } @Override - public void deregister(PipeRealtimeDataRegionExtractor collector) { + public void deregister(PipeRealtimeDataRegionExtractor extractor) { lock.writeLock().lock(); try { - collectors.remove(collector); - deviceToCollectorsCache.invalidateAll(); + extractors.remove(extractor); + deviceToExtractorsCache.invalidateAll(); } finally { lock.writeLock().unlock(); } @@ -78,75 +78,75 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { public int getRegisterCount() { lock.readLock().lock(); try { - return collectors.size(); + return extractors.size(); } finally { lock.readLock().unlock(); } } @Override - public Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeCollectEvent event) { - final Set<PipeRealtimeDataRegionExtractor> matchedCollectors = new HashSet<>(); + public Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeEvent event) { + final Set<PipeRealtimeDataRegionExtractor> matchedExtractors = new HashSet<>(); lock.readLock().lock(); try { - if (collectors.isEmpty()) { - return matchedCollectors; + if (extractors.isEmpty()) { + return matchedExtractors; } for (final Map.Entry<String, String[]> entry : event.getSchemaInfo().entrySet()) { final String device = entry.getKey(); final String[] measurements = entry.getValue(); - // 1. try to get matched collectors from cache, if not success, match them by device - final Set<PipeRealtimeDataRegionExtractor> collectorsFilteredByDevice = - deviceToCollectorsCache.get(device, this::filterCollectorsByDevice); + // 1. try to get matched extractors from cache, if not success, match them by device + final Set<PipeRealtimeDataRegionExtractor> extractorsFilteredByDevice = + deviceToExtractorsCache.get(device, this::filterExtractorsByDevice); // this would not happen - if (collectorsFilteredByDevice == null) { + if (extractorsFilteredByDevice == null) { LOGGER.warn("Match result NPE when handle device {}", device); continue; } - // 2. filter matched candidate collectors by measurements + // 2. filter matched candidate extractors by measurements if (measurements.length == 0) { - // `measurements` is empty (only in case of tsfile event). match all collectors. + // `measurements` is empty (only in case of tsfile event). match all extractors. // // case 1: for example, pattern is root.a.b, device is root.a.b.c, measurement can be any. - // in this case, the collector can be matched without checking the measurements. + // in this case, the extractor can be matched without checking the measurements. // // case 2: for example, pattern is root.a.b.c, device is root.a.b. - // in this situation, the collector can not be matched in some cases, but we can not know - // all the measurements of the device in an efficient way, so we ASSUME that the collector + // in this situation, the extractor can not be matched in some cases, but we can not know + // all the measurements of the device in an efficient way, so we ASSUME that the extractor // can be matched. this is a trade-off between efficiency and accuracy. for most user's // usage, this is acceptable, which may result in some unnecessary data processing and // transmission, but will not result in data loss. - matchedCollectors.addAll(collectorsFilteredByDevice); + matchedExtractors.addAll(extractorsFilteredByDevice); } else { - // `measurements` is not empty (only in case of tablet event). match collectors by + // `measurements` is not empty (only in case of tablet event). match extractors by // measurements. - collectorsFilteredByDevice.forEach( - collector -> { - final String pattern = collector.getPattern(); + extractorsFilteredByDevice.forEach( + extractor -> { + final String pattern = extractor.getPattern(); // case 1: for example, pattern is root.a.b and device is root.a.b.c - // in this case, the collector can be matched without checking the measurements + // in this case, the extractor can be matched without checking the measurements if (pattern.length() <= device.length()) { - matchedCollectors.add(collector); + matchedExtractors.add(extractor); } // case 2: for example, pattern is root.a.b.c and device is root.a.b // in this case, we need to check the full path else { for (String measurement : measurements) { // for example, pattern is root.a.b.c, device is root.a.b and measurement is c - // in this case, the collector can be matched. other cases are not matched. + // in this case, the extractor can be matched. other cases are not matched. // please note that there should be a . between device and measurement. if ( // low cost check comes first pattern.length() == device.length() + measurement.length() + 1 // high cost check comes later && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) { - matchedCollectors.add(collector); - // there would be no more matched collectors because the measurements are + matchedExtractors.add(extractor); + // there would be no more matched extractors because the measurements are // unique break; } @@ -155,7 +155,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { }); } - if (matchedCollectors.size() == collectors.size()) { + if (matchedExtractors.size() == extractors.size()) { break; } } @@ -163,36 +163,36 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { lock.readLock().unlock(); } - return matchedCollectors; + return matchedExtractors; } - private Set<PipeRealtimeDataRegionExtractor> filterCollectorsByDevice(String device) { - final Set<PipeRealtimeDataRegionExtractor> filteredCollectors = new HashSet<>(); + private Set<PipeRealtimeDataRegionExtractor> filterExtractorsByDevice(String device) { + final Set<PipeRealtimeDataRegionExtractor> filteredExtractors = new HashSet<>(); - for (PipeRealtimeDataRegionExtractor collector : collectors) { - String pattern = collector.getPattern(); + for (PipeRealtimeDataRegionExtractor extractor : extractors) { + String pattern = extractor.getPattern(); if ( // for example, pattern is root.a.b and device is root.a.b.c - // in this case, the collector can be matched without checking the measurements + // in this case, the extractor can be matched without checking the measurements (pattern.length() <= device.length() && device.startsWith(pattern)) // for example, pattern is root.a.b.c and device is root.a.b - // in this case, the collector can be selected as candidate, but the measurements should + // in this case, the extractor can be selected as candidate, but the measurements should // be checked further || (pattern.length() > device.length() && pattern.startsWith(device))) { - filteredCollectors.add(collector); + filteredExtractors.add(extractor); } } - return filteredCollectors; + return filteredExtractors; } @Override public void clear() { lock.writeLock().lock(); try { - collectors.clear(); - deviceToCollectorsCache.invalidateAll(); - deviceToCollectorsCache.cleanUp(); + extractors.clear(); + deviceToExtractorsCache.invalidateAll(); + deviceToExtractorsCache.cleanUp(); } finally { lock.writeLock().unlock(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/PipeDataRegionMatcher.java similarity index 57% rename from server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java rename to server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/PipeDataRegionMatcher.java index 44b544329c2..8b145c2fac7 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/PipeDataRegionMatcher.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/matcher/PipeDataRegionMatcher.java @@ -17,36 +17,36 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector.realtime.matcher; +package org.apache.iotdb.db.pipe.extractor.realtime.matcher; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; -import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; +import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor; import java.util.Set; public interface PipeDataRegionMatcher { /** - * Register a collector. If the collector's pattern matches the event's schema info, the event - * will be assigned to the collector. + * Register a extractor. If the extractor's pattern matches the event's schema info, the event + * will be assigned to the extractor. */ - void register(PipeRealtimeDataRegionExtractor collector); + void register(PipeRealtimeDataRegionExtractor extractor); - /** Deregister a collector. */ - void deregister(PipeRealtimeDataRegionExtractor collector); + /** Deregister a extractor. */ + void deregister(PipeRealtimeDataRegionExtractor extractor); - /** Get the number of registered collectors in this matcher. */ + /** Get the number of registered extractors in this matcher. */ int getRegisterCount(); /** - * Match the event's schema info with the registered collectors' patterns. If the event's schema - * info matches the pattern of a collector, the collector will be returned. + * Match the event's schema info with the registered extractors' patterns. If the event's schema + * info matches the pattern of a extractor, the extractor will be returned. * * @param event the event to be matched - * @return the matched collectors + * @return the matched extractors */ - Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeCollectEvent event); + Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeEvent event); - /** Clear all the registered collectors and internal data structures. */ + /** Clear all the registered extractors and internal data structures. */ void clear(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java index 18ea33be4ff..7de66c55ca9 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java @@ -54,7 +54,7 @@ public class PipeDoNothingProcessor implements PipeProcessor { final EnrichedEvent enrichedEvent = (EnrichedEvent) tabletInsertionEvent; if (enrichedEvent .getPattern() - .equals(PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) { + .equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE)) { eventCollector.collect(tabletInsertionEvent); } else { tabletInsertionEvent @@ -86,7 +86,7 @@ public class PipeDoNothingProcessor implements PipeProcessor { if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) { final PipeTsFileInsertionEvent enrichedEvent = (PipeTsFileInsertionEvent) tsFileInsertionEvent; - if (enrichedEvent.getPattern().equals(PipeExtractorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE) + if (enrichedEvent.getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE) && !enrichedEvent.hasTimeFilter()) { eventCollector.collect(tsFileInsertionEvent); } else { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java index 887b31a6e3a..3639a4c60da 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java @@ -27,44 +27,44 @@ public class PipeTask { private final String pipeName; private final TConsensusGroupId dataRegionId; - private final PipeTaskStage collectorStage; + private final PipeTaskStage extractorStage; private final PipeTaskStage processorStage; private final PipeTaskStage connectorStage; PipeTask( String pipeName, TConsensusGroupId dataRegionId, - PipeTaskStage collectorStage, + PipeTaskStage extractorStage, PipeTaskStage processorStage, PipeTaskStage connectorStage) { this.pipeName = pipeName; this.dataRegionId = dataRegionId; - this.collectorStage = collectorStage; + this.extractorStage = extractorStage; this.processorStage = processorStage; this.connectorStage = connectorStage; } public void create() { - collectorStage.create(); + extractorStage.create(); processorStage.create(); connectorStage.create(); } public void drop() { - collectorStage.drop(); + extractorStage.drop(); processorStage.drop(); connectorStage.drop(); } public void start() { - collectorStage.start(); + extractorStage.start(); processorStage.start(); connectorStage.start(); } public void stop() { - collectorStage.stop(); + extractorStage.stop(); processorStage.stop(); connectorStage.stop(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java index 5a9e40ccfb0..47353d89ecf 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java @@ -22,8 +22,8 @@ package org.apache.iotdb.db.pipe.task; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; -import org.apache.iotdb.db.pipe.task.stage.PipeTaskCollectorStage; import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage; +import org.apache.iotdb.db.pipe.task.stage.PipeTaskExtractorStage; import org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage; public class PipeTaskBuilder { @@ -40,14 +40,14 @@ public class PipeTaskBuilder { } public PipeTask build() { - // event flow: collector -> processor -> connector + // event flow: extractor -> processor -> connector - // we first build the collector and connector, then build the processor. - final PipeTaskCollectorStage collectorStage = - new PipeTaskCollectorStage( + // we first build the extractor and connector, then build the processor. + final PipeTaskExtractorStage extractorStage = + new PipeTaskExtractorStage( pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime(), - pipeStaticMeta.getCollectorParameters(), + pipeStaticMeta.getExtractorParameters(), dataRegionId, pipeTaskMeta); @@ -57,17 +57,17 @@ public class PipeTaskBuilder { pipeStaticMeta.getCreationTime(), pipeStaticMeta.getConnectorParameters()); - // the processor connects the collector and connector. + // the processor connects the extractor and connector. final PipeTaskProcessorStage processorStage = new PipeTaskProcessorStage( pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime(), pipeStaticMeta.getProcessorParameters(), dataRegionId, - collectorStage.getEventSupplier(), + extractorStage.getEventSupplier(), connectorStage.getPipeConnectorPendingQueue()); return new PipeTask( - pipeStaticMeta.getPipeName(), dataRegionId, collectorStage, processorStage, connectorStage); + pipeStaticMeta.getPipeName(), dataRegionId, extractorStage, processorStage, connectorStage); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java index c00d294be00..74efb7a2122 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java @@ -25,8 +25,8 @@ import org.apache.iotdb.pipe.api.event.Event; public interface EventSupplier { /** - * @return the event to be supplied. the event may be null if the collector has no more events at - * the moment, but the collector is still running for more events. + * @return the event to be supplied. the event may be null if the extractor has no more events at + * the moment, but the extractor is still running for more events. * @throws Exception if the supplier fails to supply the event. */ @SuppressWarnings("squid:S00112") // Exception is thrown by the interface diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java similarity index 79% rename from server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java rename to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java index 807c7b0c758..c2bf2ea2704 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java @@ -23,47 +23,47 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.pipe.collector.IoTDBDataRegionExtractor; import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; -import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; +import org.apache.iotdb.db.pipe.extractor.IoTDBDataRegionExtractor; import org.apache.iotdb.db.pipe.task.connection.EventSupplier; import org.apache.iotdb.pipe.api.PipeExtractor; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; -public class PipeTaskCollectorStage extends PipeTaskStage { +public class PipeTaskExtractorStage extends PipeTaskStage { private final PipeExtractor pipeExtractor; - public PipeTaskCollectorStage( + public PipeTaskExtractorStage( String pipeName, long creationTime, - PipeParameters collectorParameters, + PipeParameters extractorParameters, TConsensusGroupId dataRegionId, PipeTaskMeta pipeTaskMeta) { pipeExtractor = - collectorParameters + extractorParameters .getStringOrDefault( - PipeExtractorConstant.COLLECTOR_KEY, + PipeExtractorConstant.EXTRACTOR_KEY, BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) .equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) ? new IoTDBDataRegionExtractor() - : PipeAgent.plugin().reflectCollector(collectorParameters); + : PipeAgent.plugin().reflectExtractor(extractorParameters); - // validate and customize should be called before createSubtask. this allows collector exposing + // validate and customize should be called before createSubtask. this allows extractor exposing // exceptions in advance. try { - // 1. validate collector parameters - pipeExtractor.validate(new PipeParameterValidator(collectorParameters)); + // 1. validate extractor parameters + pipeExtractor.validate(new PipeParameterValidator(extractorParameters)); - // 2. customize collector + // 2. customize extractor final PipeTaskRuntimeConfiguration runtimeConfiguration = new PipeTaskRuntimeConfiguration( - new PipeTaskCollectorRuntimeEnvironment( + new PipeTaskExtractorRuntimeEnvironment( pipeName, creationTime, dataRegionId.getId(), pipeTaskMeta)); - pipeExtractor.customize(collectorParameters, runtimeConfiguration); + pipeExtractor.customize(extractorParameters, runtimeConfiguration); } catch (Exception e) { throw new PipeException(e.getMessage(), e); } @@ -85,7 +85,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage { @Override public void stopSubtask() throws PipeException { - // collector continuously collects data, so do nothing in stop + // extractor continuously collects data, so do nothing in stop } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java index fade5393d72..1e4d4e8a515 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java @@ -70,7 +70,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage { ? new PipeDoNothingProcessor() : PipeAgent.plugin().reflectProcessor(pipeProcessorParameters); - // validate and customize should be called before createSubtask. this allows collector exposing + // validate and customize should be called before createSubtask. this allows extractor exposing // exceptions in advance. try { // 1. validate processor parameters diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java similarity index 84% rename from server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java rename to server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java index e84b8195956..f1aa273bfaa 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java @@ -17,14 +17,14 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector; +package org.apache.iotdb.db.pipe.extractor; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; -import org.apache.iotdb.db.pipe.collector.realtime.matcher.CachedSchemaPatternMatcher; import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; -import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; -import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; +import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor; +import org.apache.iotdb.db.pipe.extractor.realtime.matcher.CachedSchemaPatternMatcher; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; @@ -70,10 +70,10 @@ public class CachedSchemaPatternMatcherTest { new PipeParameters( new HashMap<String, String>() { { - put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, "root"); + put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root"); } }), - new PipeTaskRuntimeConfiguration(new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null))); + new PipeTaskRuntimeConfiguration(new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null))); collectorList.add(databaseCollector); int deviceCollectorNum = 10; @@ -85,11 +85,11 @@ public class CachedSchemaPatternMatcherTest { new PipeParameters( new HashMap<String, String>() { { - put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, "root." + finalI1); + put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root." + finalI1); } }), new PipeTaskRuntimeConfiguration( - new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null))); + new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null))); collectorList.add(deviceCollector); for (int j = 0; j < seriesCollectorNum; j++) { PipeRealtimeDataRegionExtractor seriesCollector = new PipeRealtimeDataRegionFakeExtractor(); @@ -100,12 +100,12 @@ public class CachedSchemaPatternMatcherTest { new HashMap<String, String>() { { put( - PipeExtractorConstant.COLLECTOR_PATTERN_KEY, + PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root." + finalI + "." + finalJ); } }), new PipeTaskRuntimeConfiguration( - new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null))); + new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null))); collectorList.add(seriesCollector); } } @@ -126,16 +126,16 @@ public class CachedSchemaPatternMatcherTest { long totalTime = 0; for (int i = 0; i < epochNum; i++) { for (int j = 0; j < deviceNum; j++) { - PipeRealtimeCollectEvent event = - new PipeRealtimeCollectEvent( + PipeRealtimeEvent event = + new PipeRealtimeEvent( null, null, Collections.singletonMap("root." + i, measurements), "root"); long startTime = System.currentTimeMillis(); - matcher.match(event).forEach(collector -> collector.collect(event)); + matcher.match(event).forEach(collector -> collector.extract(event)); totalTime += (System.currentTimeMillis() - startTime); } - PipeRealtimeCollectEvent event = new PipeRealtimeCollectEvent(null, null, deviceMap, "root"); + PipeRealtimeEvent event = new PipeRealtimeEvent(null, null, deviceMap, "root"); long startTime = System.currentTimeMillis(); - matcher.match(event).forEach(collector -> collector.collect(event)); + matcher.match(event).forEach(collector -> collector.extract(event)); totalTime += (System.currentTimeMillis() - startTime); } System.out.println("matcher.getRegisterCount() = " + matcher.getRegisterCount()); @@ -157,7 +157,7 @@ public class CachedSchemaPatternMatcherTest { } @Override - public void collect(PipeRealtimeCollectEvent event) { + public void extract(PipeRealtimeEvent event) { final boolean[] match = {false}; event .getSchemaInfo() diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeCollectTest.java similarity index 93% rename from server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java rename to server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeCollectTest.java index 0ef1058c12a..a30854cc95d 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeCollectTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.collector; +package org.apache.iotdb.db.pipe.extractor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.path.PartialPath; @@ -25,12 +25,12 @@ import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionExtractor; -import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridExtractor; -import org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; -import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; +import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor; +import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionHybridExtractor; +import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.wal.utils.WALEntryHandler; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; @@ -115,41 +115,41 @@ public class PipeRealtimeCollectTest { new PipeParameters( new HashMap<String, String>() { { - put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern1); + put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern1); } }), new PipeTaskRuntimeConfiguration( - new PipeTaskCollectorRuntimeEnvironment( + new PipeTaskExtractorRuntimeEnvironment( "1", 1, Integer.parseInt(dataRegion1), null))); collector2.customize( new PipeParameters( new HashMap<String, String>() { { - put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern2); + put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern2); } }), new PipeTaskRuntimeConfiguration( - new PipeTaskCollectorRuntimeEnvironment( + new PipeTaskExtractorRuntimeEnvironment( "1", 1, Integer.parseInt(dataRegion1), null))); collector3.customize( new PipeParameters( new HashMap<String, String>() { { - put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern1); + put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern1); } }), new PipeTaskRuntimeConfiguration( - new PipeTaskCollectorRuntimeEnvironment( + new PipeTaskExtractorRuntimeEnvironment( "1", 1, Integer.parseInt(dataRegion2), null))); collector4.customize( new PipeParameters( new HashMap<String, String>() { { - put(PipeExtractorConstant.COLLECTOR_PATTERN_KEY, pattern2); + put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern2); } }), new PipeTaskRuntimeConfiguration( - new PipeTaskCollectorRuntimeEnvironment( + new PipeTaskExtractorRuntimeEnvironment( "1", 1, Integer.parseInt(dataRegion2), null))); PipeRealtimeDataRegionExtractor[] collectors =
