This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 26c00a3adef [HUDI-7187] Fix integ test props to honor new streamer properties (#10866) 26c00a3adef is described below commit 26c00a3adefff9217187ca0ab9a5b2a7c9e42199 Author: wombatu-kun <wombatu...@gmail.com> AuthorDate: Sun Mar 31 11:16:01 2024 +0700 [HUDI-7187] Fix integ test props to honor new streamer properties (#10866) Co-authored-by: Vova Kolmakov <kolmakov.vladi...@huawei-partners.com> --- .../TestKafkaConnectHdfsProvider.java | 4 +- .../hudi/utilities/config/SourceTestConfig.java | 15 +++-- .../deltastreamer/HoodieDeltaStreamerTestBase.java | 54 ++++++++-------- .../deltastreamer/TestHoodieDeltaStreamer.java | 48 +++++++------- .../TestHoodieDeltaStreamerWithMultiWriter.java | 4 +- .../TestHoodieMultiTableDeltaStreamer.java | 14 ++-- .../functional/TestHiveSchemaProvider.java | 10 +-- .../functional/TestJdbcbasedSchemaProvider.java | 14 ++-- .../schema/TestSchemaRegistryProvider.java | 16 ++--- .../utilities/sources/BaseTestKafkaSource.java | 2 +- .../hudi/utilities/sources/TestAvroDFSSource.java | 2 +- .../utilities/sources/TestAvroKafkaSource.java | 12 ++-- .../hudi/utilities/sources/TestCsvDFSSource.java | 6 +- .../sources/TestGcsEventsHoodieIncrSource.java | 18 +++--- .../utilities/sources/TestHoodieIncrSource.java | 4 +- .../hudi/utilities/sources/TestJdbcSource.java | 74 +++++++++++----------- .../hudi/utilities/sources/TestJsonDFSSource.java | 2 +- .../utilities/sources/TestJsonKafkaSource.java | 6 +- .../sources/TestJsonKafkaSourcePostProcessor.java | 2 +- .../utilities/sources/TestParquetDFSSource.java | 2 +- .../utilities/sources/TestProtoKafkaSource.java | 4 +- .../sources/TestS3EventsHoodieIncrSource.java | 20 +++--- .../utilities/sources/TestSqlFileBasedSource.java | 4 +- .../hudi/utilities/sources/TestSqlSource.java | 2 +- .../debezium/TestAbstractDebeziumSource.java | 6 +- .../helpers/TestCloudObjectsSelectorCommon.java | 18 +++--- .../sources/helpers/TestKafkaOffsetGen.java | 6 +- .../utilities/testutils/UtilitiesTestBase.java | 4 +- .../testutils/sources/AbstractBaseTestSource.java | 24 ++++--- .../sources/DistributedTestDataSource.java | 11 ++-- .../transform/TestSqlFileBasedTransformer.java | 8 +-- .../transform/TestSqlQueryBasedTransformer.java | 2 +- .../streamer-config/dfs-source.properties | 6 +- .../invalid_hive_sync_uber_config.properties | 6 +- .../streamer-config/kafka-source.properties | 6 +- .../short_trip_uber_config.properties | 12 ++-- .../streamer-config/sql-transformer.properties | 2 +- .../streamer-config/uber_config.properties | 10 +-- 38 files changed, 232 insertions(+), 228 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java index fb6f5d649cb..e90cfdb6856 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java @@ -62,7 +62,7 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness { new File(topicPath + "/year=2016/month=05/day=02/" + "random_snappy_2" + BASE_FILE_EXTENSION).createNewFile(); final TypedProperties props = new TypedProperties(); - props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath.toString()); + props.put("hoodie.streamer.checkpoint.provider.path", topicPath.toString()); final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props); provider.init(HoodieTestUtils.getDefaultHadoopConf()); assertEquals("topic1,0:300,1:200", provider.getCheckpoint()); @@ -83,7 +83,7 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness { new File(topicPath + "/year=2016/month=05/day=02/" + "topic1+0+201+300" + BASE_FILE_EXTENSION).createNewFile(); final TypedProperties props = new TypedProperties(); - props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath.toString()); + props.put("hoodie.streamer.checkpoint.provider.path", topicPath.toString()); final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props); provider.init(HoodieTestUtils.getDefaultHadoopConf()); assertThrows(HoodieException.class, provider::getCheckpoint); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/config/SourceTestConfig.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/config/SourceTestConfig.java index 450d6e8dc3a..760e7ed7ff4 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/config/SourceTestConfig.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/config/SourceTestConfig.java @@ -21,29 +21,36 @@ package org.apache.hudi.utilities.config; import org.apache.hudi.common.config.ConfigProperty; +import static org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX; +import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX; + /** * Configurations for Test Data Sources. */ public class SourceTestConfig { public static final ConfigProperty<Integer> NUM_SOURCE_PARTITIONS_PROP = ConfigProperty - .key("hoodie.deltastreamer.source.test.num_partitions") + .key(STREAMER_CONFIG_PREFIX + "source.test.num_partitions") .defaultValue(10) + .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.test.num_partitions") .withDocumentation("Used by DistributedTestDataSource only. Number of partitions where each partitions generates test-data"); public static final ConfigProperty<Integer> MAX_UNIQUE_RECORDS_PROP = ConfigProperty - .key("hoodie.deltastreamer.source.test.max_unique_records") + .key(STREAMER_CONFIG_PREFIX + "source.test.max_unique_records") .defaultValue(Integer.MAX_VALUE) + .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.test.max_unique_records") .withDocumentation("Maximum number of unique records generated for the run"); public static final ConfigProperty<Boolean> USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS = ConfigProperty - .key("hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys") + .key(STREAMER_CONFIG_PREFIX + "source.test.datagen.use_rocksdb_for_storing_existing_keys") .defaultValue(false) + .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.test.datagen.use_rocksdb_for_storing_existing_keys") .withDocumentation("If true, uses Rocks DB for storing datagen keys"); public static final ConfigProperty<String> ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS = ConfigProperty - .key("hoodie.deltastreamer.source.test.datagen.rocksdb_base_dir") + .key(STREAMER_CONFIG_PREFIX + "source.test.datagen.rocksdb_base_dir") .noDefaultValue() + .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.test.datagen.rocksdb_base_dir") .withDocumentation("Base Dir for storing datagen keys"); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index 3c74388860e..2b2013d04cd 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -204,8 +204,8 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path"); // Source schema is the target schema of upstream table - downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc"); - downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); + downstreamProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc"); + downstreamProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs, dfsBasePath + "/test-downstream-source.properties"); // Properties used for testing invalid key generator @@ -214,8 +214,8 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { invalidProps.setProperty("hoodie.datasource.write.keygenerator.class", "invalid"); invalidProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path"); - invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); - invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); + invalidProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); + invalidProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID); // Properties used for testing inferring key generator for complex key generator @@ -223,8 +223,8 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { inferKeygenProps.setProperty("include", "base.properties"); inferKeygenProps.setProperty("hoodie.datasource.write.recordkey.field", "timestamp,_row_key"); inferKeygenProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path"); - inferKeygenProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); - inferKeygenProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); + inferKeygenProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); + inferKeygenProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); UtilitiesTestBase.Helpers.savePropsToDFS(inferKeygenProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_INFER_COMPLEX_KEYGEN); // Properties used for testing inferring key generator for non-partitioned key generator @@ -240,8 +240,8 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath + "/" + PROPS_INVALID_TABLE_CONFIG_FILE); TypedProperties invalidHiveSyncProps = new TypedProperties(); - invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber"); - invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties"); + invalidHiveSyncProps.setProperty("hoodie.streamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber"); + invalidHiveSyncProps.setProperty("hoodie.streamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties"); UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1); } @@ -251,8 +251,8 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path"); - props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); - props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); + props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); + props.setProperty("hoodie.streamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); // Hive Configs props.setProperty(HiveSyncConfigHolder.HIVE_URL.key(), HiveTestService.HS2_JDBC_URL); @@ -266,9 +266,9 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { protected static void populateInvalidTableConfigFilePathProps(TypedProperties props, String dfsBasePath) { props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); - props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); - props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber"); - props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_uber_config.properties"); + props.setProperty("hoodie.keygen.timebased.output.dateformat", "yyyyMMdd"); + props.setProperty("hoodie.streamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber"); + props.setProperty("hoodie.streamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_uber_config.properties"); } protected static void populateAllCommonProps(TypedProperties props, String dfsBasePath, String brokerAddress) { @@ -279,10 +279,10 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { protected static void populateCommonProps(TypedProperties props, String dfsBasePath) { props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); - props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); - props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber"); - props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/uber_config.properties"); - props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", dfsBasePath + "/config/short_trip_uber_config.properties"); + props.setProperty("hoodie.keygen.timebased.output.dateformat", "yyyyMMdd"); + props.setProperty("hoodie.streamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber"); + props.setProperty("hoodie.streamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/uber_config.properties"); + props.setProperty("hoodie.streamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", dfsBasePath + "/config/short_trip_uber_config.properties"); } protected static void populateCommonKafkaProps(TypedProperties props, String brokerAddress) { @@ -291,7 +291,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { props.setProperty("auto.offset.reset", "earliest"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000)); + props.setProperty("hoodie.streamer.kafka.source.maxEvents", String.valueOf(5000)); } protected static void populateCommonHiveProps(TypedProperties props) { @@ -384,12 +384,12 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", partitionPath); if (useSchemaProvider) { - parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/" + sourceSchemaFile); + parquetProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/" + sourceSchemaFile); if (hasTransformer) { - parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/" + targetSchemaFile); + parquetProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", basePath + "/" + targetSchemaFile); } } - parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot); + parquetProps.setProperty("hoodie.streamer.source.dfs.root", parquetSourceRoot); if (!StringUtils.isNullOrEmpty(emptyBatchParam)) { parquetProps.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH, emptyBatchParam); } @@ -405,11 +405,11 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { props.setProperty("hoodie.embed.timeline.server", "false"); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); props.setProperty("hoodie.datasource.write.partitionpath.field", partitionPath); - props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName); - props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000)); + props.setProperty("hoodie.streamer.source.kafka.topic", topicName); + props.setProperty("hoodie.streamer.kafka.source.maxEvents", String.valueOf(5000)); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.setProperty(KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS.key(), ByteArrayDeserializer.class.getName()); - props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", + props.setProperty("hoodie.streamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue())); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); @@ -617,10 +617,10 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { } List<String> cfgs = new ArrayList<>(); cfgs.add(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key() + "=true"); - cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt); - cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath); + cfgs.add("hoodie.streamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt); + cfgs.add("hoodie.streamer.source.hoodieincr.path=" + srcBasePath); // No partition - cfgs.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr"); + cfgs.add("hoodie.streamer.source.hoodieincr.partition.fields=datestr"); cfg.configs = cfgs; return cfg; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 2a2c4dafb1e..34486a07ab8 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -376,7 +376,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT); TypedProperties props = new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps(); - props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath); + props.put("hoodie.streamer.checkpoint.provider.path", bootstrapPath); cfg.initialCheckpointProvider = checkpointProviderClass; // create regular kafka connect hdfs dirs fs.mkdirs(new Path(bootstrapPath)); @@ -568,8 +568,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType); addRecordMerger(recordType, cfg.configs); - cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc"); - cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source.avsc"); + cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc"); + cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source.avsc"); cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true"); if (!useSchemaPostProcessor) { cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false"); @@ -582,8 +582,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType); addRecordMerger(recordType, cfg.configs); - cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc"); - cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc"); + cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc"); + cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc"); cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true"); if (!useSchemaPostProcessor) { cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false"); @@ -607,9 +607,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType); addRecordMerger(recordType, cfg.configs); - cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc"); + cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc"); if (useUserProvidedSchema) { - cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc"); + cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc"); } if (!useSchemaPostProcessor) { cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false"); @@ -1822,12 +1822,12 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { orcProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); orcProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path"); if (useSchemaProvider) { - orcProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/" + "source.avsc"); + orcProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/" + "source.avsc"); if (transformerClassNames != null) { - orcProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/" + "target.avsc"); + orcProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", basePath + "/" + "target.avsc"); } } - orcProps.setProperty("hoodie.deltastreamer.source.dfs.root", ORC_SOURCE_ROOT); + orcProps.setProperty("hoodie.streamer.source.dfs.root", ORC_SOURCE_ROOT); UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, fs, basePath + "/" + PROPS_FILENAME_TEST_ORC); String tableBasePath = basePath + "/test_orc_source_table" + testNum; @@ -1852,11 +1852,11 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { props.setProperty("hoodie.embed.timeline.server", "false"); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); props.setProperty("hoodie.datasource.write.partitionpath.field", "driver"); - props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT); - props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName); - props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType); - props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source_uber.avsc"); - props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target_uber.avsc"); + props.setProperty("hoodie.streamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT); + props.setProperty("hoodie.streamer.source.kafka.topic", topicName); + props.setProperty("hoodie.streamer.source.kafka.checkpoint.type", kafkaCheckpointType); + props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/source_uber.avsc"); + props.setProperty("hoodie.streamer.schemaprovider.target.schema.file", basePath + "/target_uber.avsc"); props.setProperty("auto.offset.reset", autoResetValue); if (extraProps != null && !extraProps.isEmpty()) { extraProps.forEach(props::setProperty); @@ -2255,22 +2255,22 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { csvProps.setProperty("hoodie.datasource.write.recordkey.field", recordKeyField); csvProps.setProperty("hoodie.datasource.write.partitionpath.field", partitionPath); if (useSchemaProvider) { - csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source-flattened.avsc"); + csvProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/source-flattened.avsc"); if (hasTransformer) { - csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target-flattened.avsc"); + csvProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", basePath + "/target-flattened.avsc"); } } - csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot); + csvProps.setProperty("hoodie.streamer.source.dfs.root", sourceRoot); if (sep != ',') { if (sep == '\t') { - csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t"); + csvProps.setProperty("hoodie.streamer.csv.sep", "\\t"); } else { - csvProps.setProperty("hoodie.deltastreamer.csv.sep", Character.toString(sep)); + csvProps.setProperty("hoodie.streamer.csv.sep", Character.toString(sep)); } } if (hasHeader) { - csvProps.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(hasHeader)); + csvProps.setProperty("hoodie.streamer.csv.header", Boolean.toString(hasHeader)); } UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, fs, basePath + "/" + PROPS_FILENAME_TEST_CSV); @@ -2391,7 +2391,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { sqlSourceProps.setProperty("hoodie.embed.timeline.server", "false"); sqlSourceProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); sqlSourceProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path"); - sqlSourceProps.setProperty("hoodie.deltastreamer.source.sql.sql.query", "select * from test_sql_table"); + sqlSourceProps.setProperty("hoodie.streamer.source.sql.sql.query", "select * from test_sql_table"); UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, fs, basePath + "/" + PROPS_FILENAME_TEST_SQL_SOURCE); @@ -2465,7 +2465,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieDeltaStreamer.Config downstreamCfg = TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT, true, null); - downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=1"); + downstreamCfg.configs.add("hoodie.streamer.source.hoodieincr.num_instants=1"); new HoodieDeltaStreamer(downstreamCfg, jsc).sync(); insertInTable(tableBasePath, 9, WriteOperationType.UPSERT); @@ -2481,7 +2481,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { downstreamCfg.configs.remove(downstreamCfg.configs.size() - 1); downstreamCfg.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key() + "=true"); //Adding this conf to make testing easier :) - downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10"); + downstreamCfg.configs.add("hoodie.streamer.source.hoodieincr.num_instants=10"); downstreamCfg.operation = WriteOperationType.UPSERT; new HoodieDeltaStreamer(downstreamCfg, jsc).sync(); new HoodieDeltaStreamer(downstreamCfg, jsc).sync(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java index 2745edef584..635b57c9fa6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java @@ -320,8 +320,8 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends HoodieDeltaStreamerT props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path"); - props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source.avsc"); - props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target.avsc"); + props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/source.avsc"); + props.setProperty("hoodie.streamer.schemaprovider.target.schema.file", basePath + "/target.avsc"); props.setProperty("include", "base.properties"); props.setProperty("hoodie.write.concurrency.mode", "optimistic_concurrency_control"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java index 26ea61e31fe..0c5de863436 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java @@ -178,16 +178,16 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); List<TableExecutionContext> executionContexts = streamer.getTableExecutionContexts(); TypedProperties properties = executionContexts.get(1).getProperties(); - properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source_uber.avsc"); - properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target_uber.avsc"); + properties.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/source_uber.avsc"); + properties.setProperty("hoodie.streamer.schemaprovider.target.schema.file", basePath + "/target_uber.avsc"); properties.setProperty("hoodie.datasource.write.partitionpath.field", "timestamp"); - properties.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName2); + properties.setProperty("hoodie.streamer.source.kafka.topic", topicName2); executionContexts.get(1).setProperties(properties); TypedProperties properties1 = executionContexts.get(0).getProperties(); - properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source_short_trip_uber.avsc"); - properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target_short_trip_uber.avsc"); + properties1.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/source_short_trip_uber.avsc"); + properties1.setProperty("hoodie.streamer.schemaprovider.target.schema.file", basePath + "/target_short_trip_uber.avsc"); properties1.setProperty("hoodie.datasource.write.partitionpath.field", "timestamp"); - properties1.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName1); + properties1.setProperty("hoodie.streamer.source.kafka.topic", topicName1); executionContexts.get(0).setProperties(properties1); String targetBasePath1 = executionContexts.get(0).getConfig().targetBasePath; String targetBasePath2 = executionContexts.get(1).getConfig().targetBasePath; @@ -288,7 +288,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa props.setProperty("include", "base.properties"); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path"); - props.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot); + props.setProperty("hoodie.streamer.source.dfs.root", parquetSourceRoot); return props; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java index e2ae67aae23..75e812acf37 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java @@ -55,8 +55,8 @@ public class TestHiveSchemaProvider extends SparkClientFunctionalTestHarnessWith @BeforeAll public static void init() { Pair<String, String> dbAndTableName = paresDBAndTableName(SOURCE_SCHEMA_TABLE_NAME); - PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.database", dbAndTableName.getLeft()); - PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table", dbAndTableName.getRight()); + PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.hive.database", dbAndTableName.getLeft()); + PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.hive.table", dbAndTableName.getRight()); } @Disabled @@ -84,8 +84,8 @@ public class TestHiveSchemaProvider extends SparkClientFunctionalTestHarnessWith public void testTargetSchema() throws Exception { try { Pair<String, String> dbAndTableName = paresDBAndTableName(TARGET_SCHEMA_TABLE_NAME); - PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.database", dbAndTableName.getLeft()); - PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.table", dbAndTableName.getRight()); + PROPS.setProperty("hoodie.streamer.schemaprovider.target.schema.hive.database", dbAndTableName.getLeft()); + PROPS.setProperty("hoodie.streamer.schemaprovider.target.schema.hive.table", dbAndTableName.getRight()); createSchemaTable(SOURCE_SCHEMA_TABLE_NAME); createSchemaTable(TARGET_SCHEMA_TABLE_NAME); Schema targetSchema = UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, jsc()).getTargetSchema(); @@ -105,7 +105,7 @@ public class TestHiveSchemaProvider extends SparkClientFunctionalTestHarnessWith @Test public void testNotExistTable() { String wrongName = "wrong_schema_tab"; - PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table", wrongName); + PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.hive.table", wrongName); Assertions.assertThrows(NoSuchTableException.class, () -> { try { UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, jsc()).getSourceSchema(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java index 05a623f0e09..82588429db5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java @@ -51,13 +51,13 @@ public class TestJdbcbasedSchemaProvider extends SparkClientFunctionalTestHarnes @BeforeAll public static void init() { - PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url", JDBC_URL); - PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type", JDBC_DRIVER); - PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username", JDBC_USER); - PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password", JDBC_PASS); - PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable", "triprec"); - PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout", "0"); - PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable", "false"); + PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.connection.url", JDBC_URL); + PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.driver.type", JDBC_DRIVER); + PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.username", JDBC_USER); + PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.password", JDBC_PASS); + PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.dbtable", "triprec"); + PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.timeout", "0"); + PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.nullable", "false"); } @Test diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java index 397e72a0ec4..88f67723c85 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java @@ -64,10 +64,10 @@ class TestSchemaRegistryProvider { private static TypedProperties getProps() { return new TypedProperties() { { - put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://" + BASIC_AUTH + "@localhost"); - put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", "-value"); - put("hoodie.deltastreamer.schemaprovider.registry.url", "http://foo:bar@localhost"); - put("hoodie.deltastreamer.source.kafka.topic", "foo"); + put("hoodie.streamer.schemaprovider.registry.baseUrl", "http://" + BASIC_AUTH + "@localhost"); + put("hoodie.streamer.schemaprovider.registry.urlSuffix", "-value"); + put("hoodie.streamer.schemaprovider.registry.url", "http://foo:bar@localhost"); + put("hoodie.streamer.source.kafka.topic", "foo"); } }; } @@ -102,8 +102,8 @@ class TestSchemaRegistryProvider { @Test public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws IOException { TypedProperties props = getProps(); - props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); - props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", DummySchemaConverter.class.getName()); + props.put("hoodie.streamer.schemaprovider.registry.url", "http://localhost"); + props.put("hoodie.streamer.schemaprovider.registry.schemaconverter", DummySchemaConverter.class.getName()); SchemaRegistryProvider spyUnderTest = getUnderTest(props); Schema actual = spyUnderTest.getSourceSchema(); assertNotNull(actual); @@ -114,8 +114,8 @@ class TestSchemaRegistryProvider { @Test public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws IOException { TypedProperties props = getProps(); - props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); - props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", DummySchemaConverter.class.getName()); + props.put("hoodie.streamer.schemaprovider.registry.url", "http://localhost"); + props.put("hoodie.streamer.schemaprovider.registry.schemaconverter", DummySchemaConverter.class.getName()); SchemaRegistryProvider spyUnderTest = getUnderTest(props); Schema actual = spyUnderTest.getTargetSchema(); assertNotNull(actual); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java index 011a1f626b2..c5fc7bfaafa 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java @@ -169,7 +169,7 @@ abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness { testUtils.createTopic(topic, 2); TypedProperties props = createPropsForKafkaSource(topic, Long.MAX_VALUE, "earliest"); SourceFormatAdapter kafkaSource = createSource(props); - props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500"); + props.setProperty("hoodie.streamer.kafka.source.maxEvents", "500"); /* 1. maxEventsFromKafkaSourceProp set to more than generated insert records diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java index 1cda910b707..5ccf9ad2b29 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java @@ -41,7 +41,7 @@ public class TestAvroDFSSource extends AbstractDFSSourceTestBase { @Override protected Source prepareDFSSource() { TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); + props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot); try { return new AvroDFSSource(props, jsc, sparkSession, schemaProvider); } catch (IOException e) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java index 558181f4258..497757ab378 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java @@ -97,11 +97,11 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { protected TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) { TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic); + props.setProperty("hoodie.streamer.source.kafka.topic", topic); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); props.setProperty("auto.offset.reset", resetStrategy); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", + props.setProperty("hoodie.streamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue())); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); @@ -160,8 +160,8 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { "test", dataGen.generateGenericRecord()); JavaRDD<ConsumerRecord<Object, Object>> rdd = jsc().parallelize(Arrays.asList(recordConsumerRecord)); TypedProperties props = new TypedProperties(); - props.put("hoodie.deltastreamer.source.kafka.topic", "test"); - props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", SCHEMA_PATH); + props.put("hoodie.streamer.source.kafka.topic", "test"); + props.put("hoodie.streamer.schemaprovider.source.schema.file", SCHEMA_PATH); SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor( UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>()); @@ -191,11 +191,11 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend"; TypedProperties props = createPropsForKafkaSource(topic, null, "earliest"); - props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", SCHEMA_PATH); + props.put("hoodie.streamer.schemaprovider.source.schema.file", SCHEMA_PATH); SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor( UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>()); - props.put("hoodie.deltastreamer.source.kafka.value.deserializer.class", ByteArrayDeserializer.class.getName()); + props.put("hoodie.streamer.source.kafka.value.deserializer.class", ByteArrayDeserializer.class.getName()); int numPartitions = 2; int numMessages = 30; testUtils.createTopic(topic,numPartitions); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java index 8eaa1d95b23..6a2bbcd0136 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java @@ -48,9 +48,9 @@ public class TestCsvDFSSource extends AbstractDFSSourceTestBase { @Override public Source prepareDFSSource() { TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); - props.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(true)); - props.setProperty("hoodie.deltastreamer.csv.sep", "\t"); + props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot); + props.setProperty("hoodie.streamer.csv.header", Boolean.toString(true)); + props.setProperty("hoodie.streamer.csv.sep", "\t"); return new CsvDFSSource(props, jsc, sparkSession, schemaProvider); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index c1844c7a2a1..3b018473dc4 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -114,8 +114,8 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn jsc = JavaSparkContext.fromSparkContext(spark().sparkContext()); String schemaFilePath = TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath(); TypedProperties props = new TypedProperties(); - props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath); - props.put("hoodie.deltastreamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); + props.put("hoodie.streamer.schemaprovider.source.schema.file", schemaFilePath); + props.put("hoodie.streamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc)); MockitoAnnotations.initMocks(this); } @@ -263,14 +263,14 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint)); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); - typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix", "path/to/skip"); + typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix", "path/to/skip"); //1. snapshot query, read all records readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, typedProperties); //2. incremental query, as commit is present in timeline readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, exptected2, typedProperties); //3. snapshot query with source limit less than first commit size readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, typedProperties); - typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix", "path/to"); + typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix", "path/to"); //4. As snapshotQuery will return 1 -> same would be return as nextCheckpoint (dataset is empty due to ignore prefix). readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, typedProperties); } @@ -316,7 +316,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint) { TypedProperties typedProperties = setProps(missingCheckpointStrategy); - typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format", "json"); + typedProperties.put("hoodie.streamer.source.hoodieincr.file.format", "json"); readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, expectedCheckpoint, typedProperties); } @@ -388,10 +388,10 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy) { Properties properties = new Properties(); //String schemaFilePath = TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath(); - //properties.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath); - properties.put("hoodie.deltastreamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); - properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath()); - properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", + //properties.put("hoodie.streamer.schemaprovider.source.schema.file", schemaFilePath); + properties.put("hoodie.streamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); + properties.setProperty("hoodie.streamer.source.hoodieincr.path", basePath()); + properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name()); properties.setProperty(CloudSourceConfig.DATAFILE_FORMAT.key(), "json"); return new TypedProperties(properties); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index b9e20fb3a19..3d9f3362a15 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -337,8 +337,8 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness { String expectedCheckpoint, Option<String> snapshotCheckPointImplClassOpt) { Properties properties = new Properties(); - properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath()); - properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name()); + properties.setProperty("hoodie.streamer.source.hoodieincr.path", basePath()); + properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name()); // TODO: [HUDI-7081] get rid of this properties.setProperty(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false"); snapshotCheckPointImplClassOpt.map(className -> diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java index dcd12ac7c8e..ade781e6c8b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java @@ -77,11 +77,11 @@ public class TestJdbcSource extends UtilitiesTestBase { @BeforeEach public void setup() throws Exception { super.setup(); - PROPS.setProperty("hoodie.deltastreamer.jdbc.url", JDBC_URL); - PROPS.setProperty("hoodie.deltastreamer.jdbc.driver.class", JDBC_DRIVER); - PROPS.setProperty("hoodie.deltastreamer.jdbc.user", JDBC_USER); - PROPS.setProperty("hoodie.deltastreamer.jdbc.password", JDBC_PASS); - PROPS.setProperty("hoodie.deltastreamer.jdbc.table.name", "triprec"); + PROPS.setProperty("hoodie.streamer.jdbc.url", JDBC_URL); + PROPS.setProperty("hoodie.streamer.jdbc.driver.class", JDBC_DRIVER); + PROPS.setProperty("hoodie.streamer.jdbc.user", JDBC_USER); + PROPS.setProperty("hoodie.streamer.jdbc.password", JDBC_PASS); + PROPS.setProperty("hoodie.streamer.jdbc.table.name", "triprec"); connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASS); } @@ -93,8 +93,8 @@ public class TestJdbcSource extends UtilitiesTestBase { @Test public void testSingleCommit() { - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); - PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "last_insert"); try { int numRecords = 100; @@ -116,8 +116,8 @@ public class TestJdbcSource extends UtilitiesTestBase { @Test public void testInsertAndUpdate() { - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); - PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "last_insert"); try { final String commitTime = "000"; @@ -150,8 +150,8 @@ public class TestJdbcSource extends UtilitiesTestBase { @Test public void testTwoCommits() { - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); - PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "last_insert"); try { // Add 10 records with commit time "000" @@ -178,8 +178,8 @@ public class TestJdbcSource extends UtilitiesTestBase { @Test public void testIncrementalFetchWithCommitTime() { - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); - PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "last_insert"); try { // Add 10 records with commit time "000" @@ -204,8 +204,8 @@ public class TestJdbcSource extends UtilitiesTestBase { @Test public void testIncrementalFetchWithNoMatchingRows() { - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); - PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "last_insert"); try { // Add 10 records with commit time "000" @@ -226,8 +226,8 @@ public class TestJdbcSource extends UtilitiesTestBase { @Test public void testIncrementalFetchWhenTableRecordsMoreThanSourceLimit() { - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); - PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "id"); try { // Add 100 records with commit time "000" @@ -257,8 +257,8 @@ public class TestJdbcSource extends UtilitiesTestBase { @Test public void testIncrementalFetchWhenLastCheckpointMoreThanTableRecords() { - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); - PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "id"); try { // Add 100 records with commit time "000" @@ -284,8 +284,8 @@ public class TestJdbcSource extends UtilitiesTestBase { @Test public void testIncrementalFetchFallbackToFullFetchWhenError() { - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); - PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "last_insert"); try { // Add 10 records with commit time "000" @@ -299,14 +299,14 @@ public class TestJdbcSource extends UtilitiesTestBase { // Add 10 records with commit time "001" insert("001", 10, connection, DATA_GENERATOR, PROPS); - PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "dummy_col"); + PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "dummy_col"); assertThrows(HoodieException.class, () -> { // Start incremental scan with a dummy column that does not exist. // This will throw an exception as the default behavior is to not fallback to full fetch. runSource(Option.of(batch.getCheckpointForNextBatch()), -1); }); - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch", "true"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.fallback.to.full.fetch", "true"); // Start incremental scan with a dummy column that does not exist. // This will fallback to full fetch mode but still throw an exception checkpointing will fail. @@ -321,7 +321,7 @@ public class TestJdbcSource extends UtilitiesTestBase { @Test public void testFullFetchWithCommitTime() { - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false"); try { // Add 10 records with commit time "000" @@ -345,8 +345,8 @@ public class TestJdbcSource extends UtilitiesTestBase { @Test public void testFullFetchWithCheckpoint() { - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false"); - PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false"); + PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "last_insert"); try { // Add 10 records with commit time "000" @@ -360,7 +360,7 @@ public class TestJdbcSource extends UtilitiesTestBase { // Get max of incremental column Column incrementalColumn = rowDataset - .col(PROPS.getString("hoodie.deltastreamer.jdbc.table.incr.column.name")); + .col(PROPS.getString("hoodie.streamer.jdbc.table.incr.column.name")); final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first() .getString(0); @@ -382,10 +382,10 @@ public class TestJdbcSource extends UtilitiesTestBase { // Write secret string to fs in a file writeSecretToFs(); // Remove secret string from props - PROPS.remove("hoodie.deltastreamer.jdbc.password"); + PROPS.remove("hoodie.streamer.jdbc.password"); // Set property to read secret from fs file - PROPS.setProperty("hoodie.deltastreamer.jdbc.password.file", "file:///tmp/hudi/config/secret"); - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false"); + PROPS.setProperty("hoodie.streamer.jdbc.password.file", "file:///tmp/hudi/config/secret"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false"); // Add 10 records with commit time 000 clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS); Dataset<Row> rowDataset = runSource(Option.empty(), 10).getBatch().get(); @@ -401,8 +401,8 @@ public class TestJdbcSource extends UtilitiesTestBase { // Write secret string to fs in a file writeSecretToFs(); // Remove secret string from props - PROPS.remove("hoodie.deltastreamer.jdbc.password"); - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false"); + PROPS.remove("hoodie.streamer.jdbc.password"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false"); // Add 10 records with commit time 000 clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS); runSource(Option.empty(), 10); @@ -411,9 +411,9 @@ public class TestJdbcSource extends UtilitiesTestBase { @Test public void testSourceWithExtraOptions() { - PROPS.setProperty("hoodie.deltastreamer.jdbc.extra.options.fetchsize", "10"); - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false"); - PROPS.remove("hoodie.deltastreamer.jdbc.table.incr.column.name"); + PROPS.setProperty("hoodie.streamer.jdbc.extra.options.fetchsize", "10"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false"); + PROPS.remove("hoodie.streamer.jdbc.table.incr.column.name"); try { // Add 20 records with commit time 000 clearAndInsert("000", 20, connection, DATA_GENERATOR, PROPS); @@ -426,8 +426,8 @@ public class TestJdbcSource extends UtilitiesTestBase { @Test public void testSourceWithStorageLevel() { - PROPS.setProperty("hoodie.deltastreamer.jdbc.storage.level", "NONE"); - PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false"); + PROPS.setProperty("hoodie.streamer.jdbc.storage.level", "NONE"); + PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false"); try { // Add 10 records with commit time 000 clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java index fde10b2d9a5..24a341fe9c3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java @@ -44,7 +44,7 @@ public class TestJsonDFSSource extends AbstractDFSSourceTestBase { @Override public Source prepareDFSSource() { TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); + props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot); return new JsonDFSSource(props, jsc, sparkSession, schemaProvider); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index 6b24f57a50d..4b615c50ee1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -82,7 +82,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { public void init() throws Exception { String schemaFilePath = Objects.requireNonNull(SCHEMA_FILE_URL).toURI().getPath(); TypedProperties props = new TypedProperties(); - props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath); + props.put("hoodie.streamer.schemaprovider.source.schema.file", schemaFilePath); schemaProvider = new FilebasedSchemaProvider(props, jsc()); } @@ -93,11 +93,11 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { static TypedProperties createPropsForJsonKafkaSource(String brokerAddress, String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) { TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic); + props.setProperty("hoodie.streamer.source.kafka.topic", topic); props.setProperty("bootstrap.servers", brokerAddress); props.setProperty("auto.offset.reset", resetStrategy); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", + props.setProperty("hoodie.streamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue())); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java index b6bc3480e3d..1f1a4e2b5c1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java @@ -80,7 +80,7 @@ public class TestJsonKafkaSourcePostProcessor extends SparkClientFunctionalTestH public void init() throws Exception { String schemaFilePath = Objects.requireNonNull(TestJsonKafkaSource.SCHEMA_FILE_URL).toURI().getPath(); TypedProperties props = new TypedProperties(); - props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath); + props.put("hoodie.streamer.schemaprovider.source.schema.file", schemaFilePath); schemaProvider = new FilebasedSchemaProvider(props, jsc()); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java index 44489037e82..159ababcf47 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java @@ -43,7 +43,7 @@ public class TestParquetDFSSource extends AbstractDFSSourceTestBase { @Override public Source prepareDFSSource() { TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); + props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot); return new ParquetDFSSource(props, jsc, sparkSession, schemaProvider); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java index b56d87c9263..f9679211144 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java @@ -75,11 +75,11 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource { protected TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) { TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic); + props.setProperty("hoodie.streamer.source.kafka.topic", topic); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); props.setProperty("auto.offset.reset", resetStrategy); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", + props.setProperty("hoodie.streamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue())); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 90fbeb3bb35..a9dd11c5544 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -105,8 +105,8 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne metaClient = getHoodieMetaClient(hadoopConf(), basePath()); String schemaFilePath = TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath(); TypedProperties props = new TypedProperties(); - props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath); - props.put("hoodie.deltastreamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); + props.put("hoodie.streamer.schemaprovider.source.schema.file", schemaFilePath); + props.put("hoodie.streamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc)); } @@ -186,10 +186,10 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy) { Properties properties = new Properties(); - properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath()); - properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", + properties.setProperty("hoodie.streamer.source.hoodieincr.path", basePath()); + properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name()); - properties.setProperty("hoodie.deltastreamer.source.hoodieincr.file.format", "json"); + properties.setProperty("hoodie.streamer.source.hoodieincr.file.format", "json"); return new TypedProperties(properties); } @@ -354,7 +354,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne setMockQueryRunner(inputDs); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); - typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to/skip"); + typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to/skip"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 1000L, "2", typedProperties); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 1000L, "2", typedProperties); @@ -388,7 +388,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); - typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to/skip"); + typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to/skip"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 50L, "2#path/to/file4.json", typedProperties); } @@ -420,7 +420,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); - typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to/skip"); + typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to/skip"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 50L, "3#path/to/file4.json", typedProperties); @@ -457,14 +457,14 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); - typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to/skip"); + typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to/skip"); //1. snapshot query, read all records readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, typedProperties); //2. incremental query, as commit is present in timeline readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, exptected2, typedProperties); //3. snapshot query with source limit less than first commit size readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, typedProperties); - typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to"); + typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to"); //4. As snapshotQuery will return 1 -> same would be return as nextCheckpoint (dataset is empty due to ignore prefix). readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, typedProperties); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java index 89769954d38..ee488e38c6a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java @@ -51,8 +51,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class TestSqlFileBasedSource extends UtilitiesTestBase { private final boolean useFlattenedSchema = false; - private final String sqlFileSourceConfig = "hoodie.deltastreamer.source.sql.file"; - private final String sqlFileSourceConfigEmitChkPointConf = "hoodie.deltastreamer.source.sql.checkpoint.emit"; + private final String sqlFileSourceConfig = "hoodie.streamer.source.sql.file"; + private final String sqlFileSourceConfigEmitChkPointConf = "hoodie.streamer.source.sql.checkpoint.emit"; protected FilebasedSchemaProvider schemaProvider; protected HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); private String dfsRoot; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java index 64578f3bae3..a738003a3fc 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java @@ -50,7 +50,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; public class TestSqlSource extends UtilitiesTestBase { private final boolean useFlattenedSchema = false; - private final String sqlSourceConfig = "hoodie.deltastreamer.source.sql.sql.query"; + private final String sqlSourceConfig = "hoodie.streamer.source.sql.sql.query"; protected FilebasedSchemaProvider schemaProvider; protected HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); private String dfsRoot; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java index c9f46144e96..a57383c43b2 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java @@ -86,12 +86,12 @@ public abstract class TestAbstractDebeziumSource extends UtilitiesTestBase { private TypedProperties createPropsForJsonSource() { TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.kafka.topic", testTopicName); + props.setProperty("hoodie.streamer.source.kafka.topic", testTopicName); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); props.setProperty("auto.offset.reset", "earliest"); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - props.setProperty("hoodie.deltastreamer.schemaprovider.registry.url", "localhost"); - props.setProperty("hoodie.deltastreamer.source.kafka.value.deserializer.class", StringDeserializer.class.getName()); + props.setProperty("hoodie.streamer.schemaprovider.registry.url", "localhost"); + props.setProperty("hoodie.streamer.source.kafka.value.deserializer.class", StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); return props; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index b97e2fa80a0..79f15975cb5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -69,7 +69,7 @@ public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); TypedProperties properties = new TypedProperties(); - properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", "country,state"); + properties.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", "country,state"); Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, "json"); Assertions.assertTrue(result.isPresent()); Assertions.assertEquals(1, result.get().count()); @@ -82,9 +82,9 @@ public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness TypedProperties props = new TypedProperties(); TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc"); String schemaFilePath = TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath(); - props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath); - props.put("hoodie.deltastreamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); - props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", "country,state"); + props.put("hoodie.streamer.schemaprovider.source.schema.file", schemaFilePath); + props.put("hoodie.streamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); + props.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", "country,state"); List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json", Option.of(new FilebasedSchemaProvider(props, jsc))); Assertions.assertTrue(result.isPresent()); @@ -97,8 +97,8 @@ public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness public void partitionKeyNotPresentInPath() { List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); TypedProperties properties = new TypedProperties(); - properties.put("hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format", "false"); - properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", "unknown"); + properties.put("hoodie.streamer.source.cloud.data.reader.comma.separated.path.format", "false"); + properties.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", "unknown"); Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, "json"); Assertions.assertTrue(result.isPresent()); Assertions.assertEquals(1, result.get().count()); @@ -111,9 +111,9 @@ public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness TypedProperties props = new TypedProperties(); TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc"); String schemaFilePath = TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath(); - props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath); - props.put("hoodie.deltastreamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); - props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", "country,state"); + props.put("hoodie.streamer.schemaprovider.source.schema.file", schemaFilePath); + props.put("hoodie.streamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); + props.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", "country,state"); // Setting this config so that dataset repartition happens inside `loadAsDataset` props.put("hoodie.streamer.source.cloud.data.partition.max.size", "1"); List<CloudObjectMetadata> input = Arrays.asList( diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index d3031729e6e..fc3ab90a036 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -65,9 +65,9 @@ public class TestKafkaOffsetGen { private TypedProperties getConsumerConfigs(String autoOffsetReset, String kafkaCheckpointType) { TypedProperties props = new TypedProperties(); - props.put("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType); + props.put("hoodie.streamer.source.kafka.checkpoint.type", kafkaCheckpointType); props.put("auto.offset.reset", autoOffsetReset); - props.put("hoodie.deltastreamer.source.kafka.topic", testTopicName); + props.put("hoodie.streamer.source.kafka.topic", testTopicName); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); props.setProperty("key.deserializer", StringDeserializer.class.getName()); props.setProperty("value.deserializer", StringDeserializer.class.getName()); @@ -250,7 +250,7 @@ public class TestKafkaOffsetGen { testUtils.createTopic(testTopicName, 1); boolean topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer(props)); assertTrue(topicExists); - props.put("hoodie.deltastreamer.source.kafka.topic", "random"); + props.put("hoodie.streamer.source.kafka.topic", "random"); kafkaOffsetGen = new KafkaOffsetGen(props); topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer(props)); assertFalse(topicExists); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index a949335a21a..a200f3a5151 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -450,14 +450,14 @@ public class UtilitiesTestBase { public static TypedProperties setupSchemaOnDFS(String scope, String filename) throws IOException { UtilitiesTestBase.Helpers.copyToDFS(scope + "/" + filename, fs, basePath + "/" + filename); TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/" + filename); + props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/" + filename); return props; } public static TypedProperties setupSchemaOnDFSWithAbsoluteScope(String scope, String filename) throws IOException { UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(scope + "/" + filename, fs, basePath + "/" + filename); TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/" + filename); + props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/" + filename); return props; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java index 56d435ddf0f..08e73d36bc0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.RawTripTestPayload; +import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.collection.RocksDBBasedMap; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.utilities.config.SourceTestConfig; @@ -63,11 +64,10 @@ public abstract class AbstractBaseTestSource extends AvroSource { public static void initDataGen(TypedProperties props, int partition) { try { - boolean useRocksForTestDataGenKeys = props.getBoolean(SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS.key(), - SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS.defaultValue()); - String baseStoreDir = props.getString(SourceTestConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS.key(), + boolean useRocksForTestDataGenKeys = ConfigUtils.getBooleanWithAltKeys(props, SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS); + String baseStoreDir = ConfigUtils.getStringWithAltKeys(props, SourceTestConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS, File.createTempFile("test_data_gen", ".keys").getParent()) + "/" + partition; - LOG.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", BaseStoreDir=" + baseStoreDir); + LOG.info("useRocksForTestDataGenKeys={}, BaseStoreDir={}", useRocksForTestDataGenKeys, baseStoreDir); dataGeneratorMap.put(partition, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) : new HashMap<>())); } catch (IOException e) { @@ -106,18 +106,17 @@ public abstract class AbstractBaseTestSource extends AvroSource { protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String instantTime, int partition) { - int maxUniqueKeys = - props.getInteger(SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.defaultValue()); + int maxUniqueKeys = ConfigUtils.getIntWithAltKeys(props, SourceTestConfig.MAX_UNIQUE_RECORDS_PROP); HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition); // generate `sourceLimit` number of upserts each time. int numExistingKeys = dataGenerator.getNumExistingKeys(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); - LOG.info("NumExistingKeys=" + numExistingKeys); + LOG.info("NumExistingKeys={}", numExistingKeys); int numUpdates = Math.min(numExistingKeys, sourceLimit / 2); int numInserts = sourceLimit - numUpdates; - LOG.info("Before adjustments => numInserts=" + numInserts + ", numUpdates=" + numUpdates); + LOG.info("Before adjustments => numInserts={}, numUpdates={}", numInserts, numUpdates); boolean reachedMax = false; if (numInserts + numExistingKeys > maxUniqueKeys) { @@ -134,17 +133,16 @@ public abstract class AbstractBaseTestSource extends AvroSource { Stream<GenericRecord> deleteStream = Stream.empty(); Stream<GenericRecord> updateStream; long memoryUsage1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); - LOG.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory() - + ", Free Memory=" + Runtime.getRuntime().freeMemory()); + LOG.info("Before DataGen. Memory Usage={}, Total Memory={}, Free Memory={}", memoryUsage1, Runtime.getRuntime().totalMemory(), + Runtime.getRuntime().freeMemory()); if (!reachedMax && numUpdates >= 50) { - LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + (numUpdates - 50) + ", NumDeletes=50, maxUniqueRecords=" - + maxUniqueKeys); + LOG.info("After adjustments => NumInserts={}, NumUpdates={}, NumDeletes=50, maxUniqueRecords={}", numInserts, (numUpdates - 50), maxUniqueKeys); // if we generate update followed by deletes -> some keys in update batch might be picked up for deletes. Hence generating delete batch followed by updates deleteStream = dataGenerator.generateUniqueDeleteRecordStream(instantTime, 50).map(AbstractBaseTestSource::toGenericRecord); updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, numUpdates - 50, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .map(AbstractBaseTestSource::toGenericRecord); } else { - LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys); + LOG.info("After adjustments => NumInserts={}, NumUpdates={}, maxUniqueRecords={}", numInserts, numUpdates, maxUniqueKeys); updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, numUpdates, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .map(AbstractBaseTestSource::toGenericRecord); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java index 4bcbdbbe874..808a8efb8a4 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.testutils.sources; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.config.SourceTestConfig; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -46,15 +47,14 @@ public class DistributedTestDataSource extends AbstractBaseTestSource { public DistributedTestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); - this.numTestSourcePartitions = - props.getInteger(SourceTestConfig.NUM_SOURCE_PARTITIONS_PROP.key(), SourceTestConfig.NUM_SOURCE_PARTITIONS_PROP.defaultValue()); + this.numTestSourcePartitions = ConfigUtils.getIntWithAltKeys(props, SourceTestConfig.NUM_SOURCE_PARTITIONS_PROP); } @Override protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) { int nextCommitNum = lastCkptStr.map(s -> Integer.parseInt(s) + 1).orElse(0); String instantTime = String.format("%05d", nextCommitNum); - LOG.info("Source Limit is set to " + sourceLimit); + LOG.info("Source Limit is set to {}", sourceLimit); // No new data. if (sourceLimit <= 0) { @@ -65,15 +65,14 @@ public class DistributedTestDataSource extends AbstractBaseTestSource { newProps.putAll(props); // Set the maxUniqueRecords per partition for TestDataSource - int maxUniqueRecords = - props.getInteger(SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.defaultValue()); + int maxUniqueRecords = ConfigUtils.getIntWithAltKeys(props, SourceTestConfig.MAX_UNIQUE_RECORDS_PROP); String maxUniqueRecordsPerPartition = String.valueOf(Math.max(1, maxUniqueRecords / numTestSourcePartitions)); newProps.setProperty(SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), maxUniqueRecordsPerPartition); int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / numTestSourcePartitions)); JavaRDD<GenericRecord> avroRDD = sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed().collect(Collectors.toList()), numTestSourcePartitions).mapPartitionsWithIndex((p, idx) -> { - LOG.info("Initializing source with newProps=" + newProps); + LOG.info("Initializing source with newProps={}", newProps); if (!dataGeneratorMap.containsKey(p)) { initDataGen(newProps, p); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java index 1b0cc7f52a6..ea2ce8ed86f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java @@ -87,7 +87,7 @@ public class TestSqlFileBasedTransformer extends UtilitiesTestBase { public void testSqlFileBasedTransformerIncorrectConfig() { // Test if the class throws hoodie IO exception correctly when given a incorrect config. props.setProperty( - "hoodie.deltastreamer.transformer.sql.file", + "hoodie.streamer.transformer.sql.file", UtilitiesTestBase.basePath + "/non-exist-sql-file.sql"); assertThrows( HoodieTransformException.class, @@ -103,7 +103,7 @@ public class TestSqlFileBasedTransformer extends UtilitiesTestBase { // Test if the SQL file based transformer works as expected for the invalid SQL statements. props.setProperty( - "hoodie.deltastreamer.transformer.sql.file", + "hoodie.streamer.transformer.sql.file", UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql"); assertThrows( ParseException.class, @@ -119,7 +119,7 @@ public class TestSqlFileBasedTransformer extends UtilitiesTestBase { // Test if the SQL file based transformer works as expected for the empty SQL statements. props.setProperty( - "hoodie.deltastreamer.transformer.sql.file", + "hoodie.streamer.transformer.sql.file", UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql"); Dataset<Row> emptyRow = sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props); String[] actualRows = emptyRow.as(Encoders.STRING()).collectAsList().toArray(new String[0]); @@ -136,7 +136,7 @@ public class TestSqlFileBasedTransformer extends UtilitiesTestBase { // Test if the SQL file based transformer works as expected for the correct input. props.setProperty( - "hoodie.deltastreamer.transformer.sql.file", + "hoodie.streamer.transformer.sql.file", UtilitiesTestBase.basePath + "/sql-file-transformer.sql"); Dataset<Row> transformedRow = sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java index 6f05dc1b184..e9f6f9e4fd3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java @@ -78,7 +78,7 @@ public class TestSqlQueryBasedTransformer { + "from\n" + "\t<SRC>"; TypedProperties props = new TypedProperties(); - props.put("hoodie.deltastreamer.transformer.sql", transSql); + props.put("hoodie.streamer.transformer.sql", transSql); // transform SqlQueryBasedTransformer transformer = new SqlQueryBasedTransformer(); diff --git a/hudi-utilities/src/test/resources/streamer-config/dfs-source.properties b/hudi-utilities/src/test/resources/streamer-config/dfs-source.properties index 3a5edb2b6f2..35beefab7b2 100644 --- a/hudi-utilities/src/test/resources/streamer-config/dfs-source.properties +++ b/hudi-utilities/src/test/resources/streamer-config/dfs-source.properties @@ -20,8 +20,8 @@ include=base.properties hoodie.datasource.write.recordkey.field=_row_key hoodie.datasource.write.partitionpath.field=driver # Schema provider props (change to absolute path based on your installation) -hoodie.deltastreamer.filebased.schemaprovider.source.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/source.avsc -hoodie.deltastreamer.filebased.schemaprovider.target.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/target.avsc +hoodie.streamer.filebased.schemaprovider.source.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/source.avsc +hoodie.streamer.filebased.schemaprovider.target.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/target.avsc # DFS Source -hoodie.deltastreamer.source.dfs.root=file:///tmp/hoodie-dfs-input +hoodie.streamer.source.dfs.root=file:///tmp/hoodie-dfs-input diff --git a/hudi-utilities/src/test/resources/streamer-config/invalid_hive_sync_uber_config.properties b/hudi-utilities/src/test/resources/streamer-config/invalid_hive_sync_uber_config.properties index 5c569c5d0a0..248de399272 100644 --- a/hudi-utilities/src/test/resources/streamer-config/invalid_hive_sync_uber_config.properties +++ b/hudi-utilities/src/test/resources/streamer-config/invalid_hive_sync_uber_config.properties @@ -18,6 +18,6 @@ include=base.properties hoodie.datasource.write.recordkey.field=_row_key hoodie.datasource.write.partitionpath.field=created_at -hoodie.deltastreamer.source.kafka.topic=test_topic -hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP -hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd \ No newline at end of file +hoodie.streamer.source.kafka.topic=test_topic +hoodie.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties b/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties index e256b8c77fb..87edb1a1df7 100644 --- a/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties +++ b/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties @@ -20,10 +20,10 @@ include=base.properties hoodie.datasource.write.recordkey.field=impressionid hoodie.datasource.write.partitionpath.field=userid # schema provider configs -hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latest +hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latest # Kafka Source -#hoodie.deltastreamer.source.kafka.topic=uber_trips -hoodie.deltastreamer.source.kafka.topic=impressions +#hoodie.streamer.source.kafka.topic=uber_trips +hoodie.streamer.source.kafka.topic=impressions #Kafka props bootstrap.servers=localhost:9092 auto.offset.reset=earliest diff --git a/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties b/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties index d415e19eb20..b74f5a080f3 100644 --- a/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties +++ b/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties @@ -18,11 +18,11 @@ include=base.properties hoodie.datasource.write.recordkey.field=_row_key hoodie.datasource.write.partitionpath.field=created_at -hoodie.deltastreamer.source.kafka.topic=topic2 -hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP -hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S +hoodie.streamer.source.kafka.topic=topic2 +hoodie.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestGenerator -hoodie.deltastreamer.schemaprovider.registry.baseUrl=http://localhost:8081/subjects/ -hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest -hoodie.deltastreamer.transformer.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestIdentityTransformer +hoodie.streamer.schemaprovider.registry.baseUrl=http://localhost:8081/subjects/ +hoodie.streamer.schemaprovider.registry.urlSuffix=-value/versions/latest +hoodie.streamer.transformer.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestIdentityTransformer diff --git a/hudi-utilities/src/test/resources/streamer-config/sql-transformer.properties b/hudi-utilities/src/test/resources/streamer-config/sql-transformer.properties index 9172337d038..9bfbd889de9 100644 --- a/hudi-utilities/src/test/resources/streamer-config/sql-transformer.properties +++ b/hudi-utilities/src/test/resources/streamer-config/sql-transformer.properties @@ -16,4 +16,4 @@ # limitations under the License. ### include=base.properties -hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.partition_path, a.trip_type, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, a.nation, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a +hoodie.streamer.transformer.sql=SELECT a.timestamp, a._row_key, a.partition_path, a.trip_type, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, a.nation, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a diff --git a/hudi-utilities/src/test/resources/streamer-config/uber_config.properties b/hudi-utilities/src/test/resources/streamer-config/uber_config.properties index f5b079265d4..a8e278249e8 100644 --- a/hudi-utilities/src/test/resources/streamer-config/uber_config.properties +++ b/hudi-utilities/src/test/resources/streamer-config/uber_config.properties @@ -18,10 +18,10 @@ include=base.properties hoodie.datasource.write.recordkey.field=_row_key hoodie.datasource.write.partitionpath.field=created_at -hoodie.deltastreamer.source.kafka.topic=topic1 -hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP -hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S +hoodie.streamer.source.kafka.topic=topic1 +hoodie.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S hoodie.datasource.hive_sync.database=uber_hive_db hoodie.datasource.hive_sync.table=uber_hive_dummy_table -hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/random-value/versions/latest -hoodie.deltastreamer.schemaprovider.registry.targetUrl=http://localhost:8081/subjects/random-value/versions/latest \ No newline at end of file +hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/random-value/versions/latest +hoodie.streamer.schemaprovider.registry.targetUrl=http://localhost:8081/subjects/random-value/versions/latest \ No newline at end of file