This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-rename-fallback in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d62e53a2b142ee0f308677efa575432a171b1cd2 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Nov 2 11:28:52 2023 +0800 Pipe: add stream & batch options for source.realtime.mode --- .../iotdb/db/pipe/config/constant/PipeExtractorConstant.java | 2 ++ .../apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java | 8 +++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java index a1eead9e6f5..2c9a72d58d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java @@ -53,6 +53,8 @@ public class PipeExtractorConstant { public static final String EXTRACTOR_REALTIME_MODE_FILE_VALUE = "file"; public static final String EXTRACTOR_REALTIME_MODE_LOG_VALUE = "log"; public static final String EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE = "forced-log"; + public static final String EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE = "stream"; + public static final String EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE = "batch"; private PipeExtractorConstant() { throw new IllegalStateException("Utility class"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java index c68e3047404..d113b4b984f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java @@ -52,11 +52,13 @@ import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXT import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY; @@ -117,7 +119,9 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { EXTRACTOR_REALTIME_MODE_FILE_VALUE, EXTRACTOR_REALTIME_MODE_HYBRID_VALUE, EXTRACTOR_REALTIME_MODE_LOG_VALUE, - EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE); + EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE, + EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE, + EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE); } constructHistoricalExtractor(); @@ -152,10 +156,12 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { switch (parameters.getString(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { case EXTRACTOR_REALTIME_MODE_FILE_VALUE: + case EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE: realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor(); break; case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE: case EXTRACTOR_REALTIME_MODE_LOG_VALUE: + case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE: realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); break; case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
