This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 26c00a3adef [HUDI-7187] Fix integ test props to honor new streamer 
properties (#10866)
26c00a3adef is described below

commit 26c00a3adefff9217187ca0ab9a5b2a7c9e42199
Author: wombatu-kun <wombatu...@gmail.com>
AuthorDate: Sun Mar 31 11:16:01 2024 +0700

    [HUDI-7187] Fix integ test props to honor new streamer properties (#10866)
    
    Co-authored-by: Vova Kolmakov <kolmakov.vladi...@huawei-partners.com>
---
 .../TestKafkaConnectHdfsProvider.java              |  4 +-
 .../hudi/utilities/config/SourceTestConfig.java    | 15 +++--
 .../deltastreamer/HoodieDeltaStreamerTestBase.java | 54 ++++++++--------
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 48 +++++++-------
 .../TestHoodieDeltaStreamerWithMultiWriter.java    |  4 +-
 .../TestHoodieMultiTableDeltaStreamer.java         | 14 ++--
 .../functional/TestHiveSchemaProvider.java         | 10 +--
 .../functional/TestJdbcbasedSchemaProvider.java    | 14 ++--
 .../schema/TestSchemaRegistryProvider.java         | 16 ++---
 .../utilities/sources/BaseTestKafkaSource.java     |  2 +-
 .../hudi/utilities/sources/TestAvroDFSSource.java  |  2 +-
 .../utilities/sources/TestAvroKafkaSource.java     | 12 ++--
 .../hudi/utilities/sources/TestCsvDFSSource.java   |  6 +-
 .../sources/TestGcsEventsHoodieIncrSource.java     | 18 +++---
 .../utilities/sources/TestHoodieIncrSource.java    |  4 +-
 .../hudi/utilities/sources/TestJdbcSource.java     | 74 +++++++++++-----------
 .../hudi/utilities/sources/TestJsonDFSSource.java  |  2 +-
 .../utilities/sources/TestJsonKafkaSource.java     |  6 +-
 .../sources/TestJsonKafkaSourcePostProcessor.java  |  2 +-
 .../utilities/sources/TestParquetDFSSource.java    |  2 +-
 .../utilities/sources/TestProtoKafkaSource.java    |  4 +-
 .../sources/TestS3EventsHoodieIncrSource.java      | 20 +++---
 .../utilities/sources/TestSqlFileBasedSource.java  |  4 +-
 .../hudi/utilities/sources/TestSqlSource.java      |  2 +-
 .../debezium/TestAbstractDebeziumSource.java       |  6 +-
 .../helpers/TestCloudObjectsSelectorCommon.java    | 18 +++---
 .../sources/helpers/TestKafkaOffsetGen.java        |  6 +-
 .../utilities/testutils/UtilitiesTestBase.java     |  4 +-
 .../testutils/sources/AbstractBaseTestSource.java  | 24 ++++---
 .../sources/DistributedTestDataSource.java         | 11 ++--
 .../transform/TestSqlFileBasedTransformer.java     |  8 +--
 .../transform/TestSqlQueryBasedTransformer.java    |  2 +-
 .../streamer-config/dfs-source.properties          |  6 +-
 .../invalid_hive_sync_uber_config.properties       |  6 +-
 .../streamer-config/kafka-source.properties        |  6 +-
 .../short_trip_uber_config.properties              | 12 ++--
 .../streamer-config/sql-transformer.properties     |  2 +-
 .../streamer-config/uber_config.properties         | 10 +--
 38 files changed, 232 insertions(+), 228 deletions(-)

diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java
index fb6f5d649cb..e90cfdb6856 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java
@@ -62,7 +62,7 @@ public class TestKafkaConnectHdfsProvider extends 
HoodieCommonTestHarness {
     new File(topicPath + "/year=2016/month=05/day=02/"
         + "random_snappy_2" + BASE_FILE_EXTENSION).createNewFile();
     final TypedProperties props = new TypedProperties();
-    props.put("hoodie.deltastreamer.checkpoint.provider.path", 
topicPath.toString());
+    props.put("hoodie.streamer.checkpoint.provider.path", 
topicPath.toString());
     final InitialCheckPointProvider provider = new 
KafkaConnectHdfsProvider(props);
     provider.init(HoodieTestUtils.getDefaultHadoopConf());
     assertEquals("topic1,0:300,1:200", provider.getCheckpoint());
@@ -83,7 +83,7 @@ public class TestKafkaConnectHdfsProvider extends 
HoodieCommonTestHarness {
     new File(topicPath + "/year=2016/month=05/day=02/"
         + "topic1+0+201+300" + BASE_FILE_EXTENSION).createNewFile();
     final TypedProperties props = new TypedProperties();
-    props.put("hoodie.deltastreamer.checkpoint.provider.path", 
topicPath.toString());
+    props.put("hoodie.streamer.checkpoint.provider.path", 
topicPath.toString());
     final InitialCheckPointProvider provider = new 
KafkaConnectHdfsProvider(props);
     provider.init(HoodieTestUtils.getDefaultHadoopConf());
     assertThrows(HoodieException.class, provider::getCheckpoint);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/config/SourceTestConfig.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/config/SourceTestConfig.java
index 450d6e8dc3a..760e7ed7ff4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/config/SourceTestConfig.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/config/SourceTestConfig.java
@@ -21,29 +21,36 @@ package org.apache.hudi.utilities.config;
 
 import org.apache.hudi.common.config.ConfigProperty;
 
+import static 
org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX;
+import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
+
 /**
  * Configurations for Test Data Sources.
  */
 public class SourceTestConfig {
 
   public static final ConfigProperty<Integer> NUM_SOURCE_PARTITIONS_PROP = 
ConfigProperty
-      .key("hoodie.deltastreamer.source.test.num_partitions")
+      .key(STREAMER_CONFIG_PREFIX + "source.test.num_partitions")
       .defaultValue(10)
+      .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.test.num_partitions")
       .withDocumentation("Used by DistributedTestDataSource only. Number of 
partitions where each partitions generates test-data");
 
   public static final ConfigProperty<Integer> MAX_UNIQUE_RECORDS_PROP = 
ConfigProperty
-      .key("hoodie.deltastreamer.source.test.max_unique_records")
+      .key(STREAMER_CONFIG_PREFIX + "source.test.max_unique_records")
       .defaultValue(Integer.MAX_VALUE)
+      .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.test.max_unique_records")
       .withDocumentation("Maximum number of unique records generated for the 
run");
 
   public static final ConfigProperty<Boolean> 
USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS = ConfigProperty
-      
.key("hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys")
+      .key(STREAMER_CONFIG_PREFIX + 
"source.test.datagen.use_rocksdb_for_storing_existing_keys")
       .defaultValue(false)
+      .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.test.datagen.use_rocksdb_for_storing_existing_keys")
       .withDocumentation("If true, uses Rocks DB for storing datagen keys");
 
   public static final ConfigProperty<String> 
ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS = ConfigProperty
-      .key("hoodie.deltastreamer.source.test.datagen.rocksdb_base_dir")
+      .key(STREAMER_CONFIG_PREFIX + "source.test.datagen.rocksdb_base_dir")
       .noDefaultValue()
+      .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.test.datagen.rocksdb_base_dir")
       .withDocumentation("Base Dir for storing datagen keys");
 
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 3c74388860e..2b2013d04cd 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -204,8 +204,8 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"partition_path");
 
     // Source schema is the target schema of upstream table
-    
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/target.avsc");
-    
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
+    
downstreamProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
 dfsBasePath + "/target.avsc");
+    
downstreamProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
     UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs, dfsBasePath 
+ "/test-downstream-source.properties");
 
     // Properties used for testing invalid key generator
@@ -214,8 +214,8 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     invalidProps.setProperty("hoodie.datasource.write.keygenerator.class", 
"invalid");
     invalidProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
     invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"partition_path");
-    
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source.avsc");
-    
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
+    
invalidProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", 
dfsBasePath + "/source.avsc");
+    
invalidProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", 
dfsBasePath + "/target.avsc");
     UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + 
"/" + PROPS_FILENAME_TEST_INVALID);
 
     // Properties used for testing inferring key generator for complex key 
generator
@@ -223,8 +223,8 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     inferKeygenProps.setProperty("include", "base.properties");
     inferKeygenProps.setProperty("hoodie.datasource.write.recordkey.field", 
"timestamp,_row_key");
     
inferKeygenProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"partition_path");
-    
inferKeygenProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source.avsc");
-    
inferKeygenProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
+    
inferKeygenProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source.avsc");
+    
inferKeygenProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
     UtilitiesTestBase.Helpers.savePropsToDFS(inferKeygenProps, dfs, 
dfsBasePath + "/" + PROPS_FILENAME_INFER_COMPLEX_KEYGEN);
 
     // Properties used for testing inferring key generator for non-partitioned 
key generator
@@ -240,8 +240,8 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath + 
"/" + PROPS_INVALID_TABLE_CONFIG_FILE);
 
     TypedProperties invalidHiveSyncProps = new TypedProperties();
-    
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
 "uber_db.dummy_table_uber");
-    
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
+    
invalidHiveSyncProps.setProperty("hoodie.streamer.ingestion.tablesToBeIngested",
 "uber_db.dummy_table_uber");
+    
invalidHiveSyncProps.setProperty("hoodie.streamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
     UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, 
dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
   }
 
@@ -251,8 +251,8 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
     props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
     props.setProperty("hoodie.datasource.write.partitionpath.field", 
"partition_path");
-    
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/source.avsc");
-    
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
dfsBasePath + "/target.avsc");
+    props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", 
dfsBasePath + "/source.avsc");
+    props.setProperty("hoodie.streamer.schemaprovider.target.schema.file", 
dfsBasePath + "/target.avsc");
 
     // Hive Configs
     props.setProperty(HiveSyncConfigHolder.HIVE_URL.key(), 
HiveTestService.HS2_JDBC_URL);
@@ -266,9 +266,9 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
 
   protected static void 
populateInvalidTableConfigFilePathProps(TypedProperties props, String 
dfsBasePath) {
     props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
-    
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", 
"yyyyMMdd");
-    props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", 
"uber_db.dummy_table_uber");
-    
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/invalid_uber_config.properties");
+    props.setProperty("hoodie.keygen.timebased.output.dateformat", "yyyyMMdd");
+    props.setProperty("hoodie.streamer.ingestion.tablesToBeIngested", 
"uber_db.dummy_table_uber");
+    
props.setProperty("hoodie.streamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/invalid_uber_config.properties");
   }
 
   protected static void populateAllCommonProps(TypedProperties props, String 
dfsBasePath, String brokerAddress) {
@@ -279,10 +279,10 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
 
   protected static void populateCommonProps(TypedProperties props, String 
dfsBasePath) {
     props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
-    
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", 
"yyyyMMdd");
-    props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", 
"short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
-    
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/uber_config.properties");
-    
props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile",
 dfsBasePath + "/config/short_trip_uber_config.properties");
+    props.setProperty("hoodie.keygen.timebased.output.dateformat", "yyyyMMdd");
+    props.setProperty("hoodie.streamer.ingestion.tablesToBeIngested", 
"short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
+    
props.setProperty("hoodie.streamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/uber_config.properties");
+    
props.setProperty("hoodie.streamer.ingestion.short_trip_db.dummy_table_short_trip.configFile",
 dfsBasePath + "/config/short_trip_uber_config.properties");
   }
 
   protected static void populateCommonKafkaProps(TypedProperties props, String 
brokerAddress) {
@@ -291,7 +291,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     props.setProperty("auto.offset.reset", "earliest");
     props.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
     props.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", 
String.valueOf(5000));
+    props.setProperty("hoodie.streamer.kafka.source.maxEvents", 
String.valueOf(5000));
   }
 
   protected static void populateCommonHiveProps(TypedProperties props) {
@@ -384,12 +384,12 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     parquetProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
     parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", 
partitionPath);
     if (useSchemaProvider) {
-      
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 basePath + "/" + sourceSchemaFile);
+      
parquetProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", 
basePath + "/" + sourceSchemaFile);
       if (hasTransformer) {
-        
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 basePath + "/" + targetSchemaFile);
+        
parquetProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", 
basePath + "/" + targetSchemaFile);
       }
     }
-    parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", 
parquetSourceRoot);
+    parquetProps.setProperty("hoodie.streamer.source.dfs.root", 
parquetSourceRoot);
     if (!StringUtils.isNullOrEmpty(emptyBatchParam)) {
       
parquetProps.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH, 
emptyBatchParam);
     }
@@ -405,11 +405,11 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     props.setProperty("hoodie.embed.timeline.server", "false");
     props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
     props.setProperty("hoodie.datasource.write.partitionpath.field", 
partitionPath);
-    props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
-    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", 
String.valueOf(5000));
+    props.setProperty("hoodie.streamer.source.kafka.topic", topicName);
+    props.setProperty("hoodie.streamer.kafka.source.maxEvents", 
String.valueOf(5000));
     props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
     
props.setProperty(KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS.key(),  
ByteArrayDeserializer.class.getName());
-    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
+    props.setProperty("hoodie.streamer.kafka.source.maxEvents",
         maxEventsToReadFromKafkaSource != null ? 
String.valueOf(maxEventsToReadFromKafkaSource) :
             
String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
     props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
@@ -617,10 +617,10 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       }
       List<String> cfgs = new ArrayList<>();
       cfgs.add(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key() + 
"=true");
-      
cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" 
+ addReadLatestOnMissingCkpt);
-      cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath);
+      
cfgs.add("hoodie.streamer.source.hoodieincr.read_latest_on_missing_ckpt=" + 
addReadLatestOnMissingCkpt);
+      cfgs.add("hoodie.streamer.source.hoodieincr.path=" + srcBasePath);
       // No partition
-      
cfgs.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr");
+      cfgs.add("hoodie.streamer.source.hoodieincr.partition.fields=datestr");
       cfg.configs = cfgs;
       return cfg;
     }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 2a2c4dafb1e..34486a07ab8 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -376,7 +376,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer.Config cfg = 
TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
     TypedProperties props =
         new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/" + 
PROPS_FILENAME_TEST_SOURCE)).getProps();
-    props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath);
+    props.put("hoodie.streamer.checkpoint.provider.path", bootstrapPath);
     cfg.initialCheckpointProvider = checkpointProviderClass;
     // create regular kafka connect hdfs dirs
     fs.mkdirs(new Path(bootstrapPath));
@@ -568,8 +568,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
         PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
     addRecordMerger(recordType, cfg.configs);
-    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" 
+ basePath + "/source.avsc");
-    cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" 
+ basePath + "/source.avsc");
+    cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + 
basePath + "/source.avsc");
+    cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + 
basePath + "/source.avsc");
     cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
     if (!useSchemaPostProcessor) {
       
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key()
 + "=false");
@@ -582,8 +582,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()),
         PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
     addRecordMerger(recordType, cfg.configs);
-    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" 
+ basePath + "/source.avsc");
-    cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" 
+ basePath + "/source_evolved.avsc");
+    cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + 
basePath + "/source.avsc");
+    cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + 
basePath + "/source_evolved.avsc");
     cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
     if (!useSchemaPostProcessor) {
       
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key()
 + "=false");
@@ -607,9 +607,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
         PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
     addRecordMerger(recordType, cfg.configs);
-    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" 
+ basePath + "/source.avsc");
+    cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + 
basePath + "/source.avsc");
     if (useUserProvidedSchema) {
-      
cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + 
basePath + "/source_evolved.avsc");
+      cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + 
basePath + "/source_evolved.avsc");
     }
     if (!useSchemaPostProcessor) {
       
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key()
 + "=false");
@@ -1822,12 +1822,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     orcProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
     orcProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"partition_path");
     if (useSchemaProvider) {
-      
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
basePath + "/" + "source.avsc");
+      
orcProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", 
basePath + "/" + "source.avsc");
       if (transformerClassNames != null) {
-        
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
basePath + "/" + "target.avsc");
+        
orcProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", 
basePath + "/" + "target.avsc");
       }
     }
-    orcProps.setProperty("hoodie.deltastreamer.source.dfs.root", 
ORC_SOURCE_ROOT);
+    orcProps.setProperty("hoodie.streamer.source.dfs.root", ORC_SOURCE_ROOT);
     UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, fs, basePath + "/" + 
PROPS_FILENAME_TEST_ORC);
 
     String tableBasePath = basePath + "/test_orc_source_table" + testNum;
@@ -1852,11 +1852,11 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     props.setProperty("hoodie.embed.timeline.server", "false");
     props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
     props.setProperty("hoodie.datasource.write.partitionpath.field", "driver");
-    props.setProperty("hoodie.deltastreamer.source.dfs.root", 
JSON_KAFKA_SOURCE_ROOT);
-    props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
-    props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", 
kafkaCheckpointType);
-    
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
basePath + "/source_uber.avsc");
-    
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
basePath + "/target_uber.avsc");
+    props.setProperty("hoodie.streamer.source.dfs.root", 
JSON_KAFKA_SOURCE_ROOT);
+    props.setProperty("hoodie.streamer.source.kafka.topic", topicName);
+    props.setProperty("hoodie.streamer.source.kafka.checkpoint.type", 
kafkaCheckpointType);
+    props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", 
basePath + "/source_uber.avsc");
+    props.setProperty("hoodie.streamer.schemaprovider.target.schema.file", 
basePath + "/target_uber.avsc");
     props.setProperty("auto.offset.reset", autoResetValue);
     if (extraProps != null && !extraProps.isEmpty()) {
       extraProps.forEach(props::setProperty);
@@ -2255,22 +2255,22 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     csvProps.setProperty("hoodie.datasource.write.recordkey.field", 
recordKeyField);
     csvProps.setProperty("hoodie.datasource.write.partitionpath.field", 
partitionPath);
     if (useSchemaProvider) {
-      
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
basePath + "/source-flattened.avsc");
+      
csvProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file", 
basePath + "/source-flattened.avsc");
       if (hasTransformer) {
-        
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
basePath + "/target-flattened.avsc");
+        
csvProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file", 
basePath + "/target-flattened.avsc");
       }
     }
-    csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
+    csvProps.setProperty("hoodie.streamer.source.dfs.root", sourceRoot);
 
     if (sep != ',') {
       if (sep == '\t') {
-        csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
+        csvProps.setProperty("hoodie.streamer.csv.sep", "\\t");
       } else {
-        csvProps.setProperty("hoodie.deltastreamer.csv.sep", 
Character.toString(sep));
+        csvProps.setProperty("hoodie.streamer.csv.sep", 
Character.toString(sep));
       }
     }
     if (hasHeader) {
-      csvProps.setProperty("hoodie.deltastreamer.csv.header", 
Boolean.toString(hasHeader));
+      csvProps.setProperty("hoodie.streamer.csv.header", 
Boolean.toString(hasHeader));
     }
 
     UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, fs, basePath + "/" + 
PROPS_FILENAME_TEST_CSV);
@@ -2391,7 +2391,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     sqlSourceProps.setProperty("hoodie.embed.timeline.server", "false");
     sqlSourceProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
     sqlSourceProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"partition_path");
-    sqlSourceProps.setProperty("hoodie.deltastreamer.source.sql.sql.query", 
"select * from test_sql_table");
+    sqlSourceProps.setProperty("hoodie.streamer.source.sql.sql.query", "select 
* from test_sql_table");
 
     UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, fs, basePath + 
"/" + PROPS_FILENAME_TEST_SQL_SOURCE);
 
@@ -2465,7 +2465,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer.Config downstreamCfg =
         TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, 
downstreamTableBasePath,
             WriteOperationType.BULK_INSERT, true, null);
-    
downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=1");
+    
downstreamCfg.configs.add("hoodie.streamer.source.hoodieincr.num_instants=1");
     new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
 
     insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
@@ -2481,7 +2481,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     downstreamCfg.configs.remove(downstreamCfg.configs.size() - 1);
     
downstreamCfg.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key()
 + "=true");
     //Adding this conf to make testing easier :)
-    
downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10");
+    
downstreamCfg.configs.add("hoodie.streamer.source.hoodieincr.num_instants=10");
     downstreamCfg.operation = WriteOperationType.UPSERT;
     new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
     new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
index 2745edef584..635b57c9fa6 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -320,8 +320,8 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends 
HoodieDeltaStreamerT
     props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
     props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
     props.setProperty("hoodie.datasource.write.partitionpath.field", 
"partition_path");
-    
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
basePath + "/source.avsc");
-    
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
basePath + "/target.avsc");
+    props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", 
basePath + "/source.avsc");
+    props.setProperty("hoodie.streamer.schemaprovider.target.schema.file", 
basePath + "/target.avsc");
 
     props.setProperty("include", "base.properties");
     props.setProperty("hoodie.write.concurrency.mode", 
"optimistic_concurrency_control");
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
index 26ea61e31fe..0c5de863436 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
@@ -178,16 +178,16 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
     HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg, jsc);
     List<TableExecutionContext> executionContexts = 
streamer.getTableExecutionContexts();
     TypedProperties properties = executionContexts.get(1).getProperties();
-    
properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 basePath + "/source_uber.avsc");
-    
properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 basePath + "/target_uber.avsc");
+    
properties.setProperty("hoodie.streamer.schemaprovider.source.schema.file", 
basePath + "/source_uber.avsc");
+    
properties.setProperty("hoodie.streamer.schemaprovider.target.schema.file", 
basePath + "/target_uber.avsc");
     properties.setProperty("hoodie.datasource.write.partitionpath.field", 
"timestamp");
-    properties.setProperty("hoodie.deltastreamer.source.kafka.topic", 
topicName2);
+    properties.setProperty("hoodie.streamer.source.kafka.topic", topicName2);
     executionContexts.get(1).setProperties(properties);
     TypedProperties properties1 = executionContexts.get(0).getProperties();
-    
properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 basePath + "/source_short_trip_uber.avsc");
-    
properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 basePath + "/target_short_trip_uber.avsc");
+    
properties1.setProperty("hoodie.streamer.schemaprovider.source.schema.file", 
basePath + "/source_short_trip_uber.avsc");
+    
properties1.setProperty("hoodie.streamer.schemaprovider.target.schema.file", 
basePath + "/target_short_trip_uber.avsc");
     properties1.setProperty("hoodie.datasource.write.partitionpath.field", 
"timestamp");
-    properties1.setProperty("hoodie.deltastreamer.source.kafka.topic", 
topicName1);
+    properties1.setProperty("hoodie.streamer.source.kafka.topic", topicName1);
     executionContexts.get(0).setProperties(properties1);
     String targetBasePath1 = 
executionContexts.get(0).getConfig().targetBasePath;
     String targetBasePath2 = 
executionContexts.get(1).getConfig().targetBasePath;
@@ -288,7 +288,7 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
     props.setProperty("include", "base.properties");
     props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
     props.setProperty("hoodie.datasource.write.partitionpath.field", 
"partition_path");
-    props.setProperty("hoodie.deltastreamer.source.dfs.root", 
parquetSourceRoot);
+    props.setProperty("hoodie.streamer.source.dfs.root", parquetSourceRoot);
     return props;
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
index e2ae67aae23..75e812acf37 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
@@ -55,8 +55,8 @@ public class TestHiveSchemaProvider extends 
SparkClientFunctionalTestHarnessWith
   @BeforeAll
   public static void init() {
     Pair<String, String> dbAndTableName = 
paresDBAndTableName(SOURCE_SCHEMA_TABLE_NAME);
-    
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.database",
 dbAndTableName.getLeft());
-    
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table",
 dbAndTableName.getRight());
+    
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.hive.database", 
dbAndTableName.getLeft());
+    
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.hive.table", 
dbAndTableName.getRight());
   }
 
   @Disabled
@@ -84,8 +84,8 @@ public class TestHiveSchemaProvider extends 
SparkClientFunctionalTestHarnessWith
   public void testTargetSchema() throws Exception {
     try {
       Pair<String, String> dbAndTableName = 
paresDBAndTableName(TARGET_SCHEMA_TABLE_NAME);
-      
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.database",
 dbAndTableName.getLeft());
-      
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.table",
 dbAndTableName.getRight());
+      
PROPS.setProperty("hoodie.streamer.schemaprovider.target.schema.hive.database", 
dbAndTableName.getLeft());
+      
PROPS.setProperty("hoodie.streamer.schemaprovider.target.schema.hive.table", 
dbAndTableName.getRight());
       createSchemaTable(SOURCE_SCHEMA_TABLE_NAME);
       createSchemaTable(TARGET_SCHEMA_TABLE_NAME);
       Schema targetSchema = 
UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, 
jsc()).getTargetSchema();
@@ -105,7 +105,7 @@ public class TestHiveSchemaProvider extends 
SparkClientFunctionalTestHarnessWith
   @Test
   public void testNotExistTable() {
     String wrongName = "wrong_schema_tab";
-    
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table",
 wrongName);
+    
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.hive.table", 
wrongName);
     Assertions.assertThrows(NoSuchTableException.class, () -> {
       try {
         UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), 
PROPS, jsc()).getSourceSchema();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
index 05a623f0e09..82588429db5 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
@@ -51,13 +51,13 @@ public class TestJdbcbasedSchemaProvider extends 
SparkClientFunctionalTestHarnes
 
   @BeforeAll
   public static void init() {
-    
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url",
 JDBC_URL);
-    
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type",
 JDBC_DRIVER);
-    
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username",
 JDBC_USER);
-    
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password",
 JDBC_PASS);
-    
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable",
 "triprec");
-    
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout",
 "0");
-    
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable",
 "false");
+    
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.connection.url",
 JDBC_URL);
+    
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.driver.type",
 JDBC_DRIVER);
+    
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.username", 
JDBC_USER);
+    
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.password", 
JDBC_PASS);
+    
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.dbtable", 
"triprec");
+    
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.timeout", 
"0");
+    
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.nullable", 
"false");
   }
 
   @Test
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
index 397e72a0ec4..88f67723c85 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
@@ -64,10 +64,10 @@ class TestSchemaRegistryProvider {
   private static TypedProperties getProps() {
     return new TypedProperties() {
       {
-        put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://"; 
+ BASIC_AUTH + "@localhost");
-        put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", 
"-value");
-        put("hoodie.deltastreamer.schemaprovider.registry.url", 
"http://foo:bar@localhost";);
-        put("hoodie.deltastreamer.source.kafka.topic", "foo");
+        put("hoodie.streamer.schemaprovider.registry.baseUrl", "http://"; + 
BASIC_AUTH + "@localhost");
+        put("hoodie.streamer.schemaprovider.registry.urlSuffix", "-value");
+        put("hoodie.streamer.schemaprovider.registry.url", 
"http://foo:bar@localhost";);
+        put("hoodie.streamer.source.kafka.topic", "foo");
       }
     };
   }
@@ -102,8 +102,8 @@ class TestSchemaRegistryProvider {
   @Test
   public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws 
IOException {
     TypedProperties props = getProps();
-    props.put("hoodie.deltastreamer.schemaprovider.registry.url", 
"http://localhost";);
-    props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", 
DummySchemaConverter.class.getName());
+    props.put("hoodie.streamer.schemaprovider.registry.url", 
"http://localhost";);
+    props.put("hoodie.streamer.schemaprovider.registry.schemaconverter", 
DummySchemaConverter.class.getName());
     SchemaRegistryProvider spyUnderTest = getUnderTest(props);
     Schema actual = spyUnderTest.getSourceSchema();
     assertNotNull(actual);
@@ -114,8 +114,8 @@ class TestSchemaRegistryProvider {
   @Test
   public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws 
IOException {
     TypedProperties props = getProps();
-    props.put("hoodie.deltastreamer.schemaprovider.registry.url", 
"http://localhost";);
-    props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", 
DummySchemaConverter.class.getName());
+    props.put("hoodie.streamer.schemaprovider.registry.url", 
"http://localhost";);
+    props.put("hoodie.streamer.schemaprovider.registry.schemaconverter", 
DummySchemaConverter.class.getName());
     SchemaRegistryProvider spyUnderTest = getUnderTest(props);
     Schema actual = spyUnderTest.getTargetSchema();
     assertNotNull(actual);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index 011a1f626b2..c5fc7bfaafa 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -169,7 +169,7 @@ abstract class BaseTestKafkaSource extends 
SparkClientFunctionalTestHarness {
     testUtils.createTopic(topic, 2);
     TypedProperties props = createPropsForKafkaSource(topic, Long.MAX_VALUE, 
"earliest");
     SourceFormatAdapter kafkaSource = createSource(props);
-    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500");
+    props.setProperty("hoodie.streamer.kafka.source.maxEvents", "500");
 
     /*
      1. maxEventsFromKafkaSourceProp set to more than generated insert records
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
index 1cda910b707..5ccf9ad2b29 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
@@ -41,7 +41,7 @@ public class TestAvroDFSSource extends 
AbstractDFSSourceTestBase {
   @Override
   protected Source prepareDFSSource() {
     TypedProperties props = new TypedProperties();
-    props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+    props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
     try {
       return new AvroDFSSource(props, jsc, sparkSession, schemaProvider);
     } catch (IOException e) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index 558181f4258..497757ab378 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -97,11 +97,11 @@ public class TestAvroKafkaSource extends 
SparkClientFunctionalTestHarness {
 
   protected TypedProperties createPropsForKafkaSource(String topic, Long 
maxEventsToReadFromKafkaSource, String resetStrategy) {
     TypedProperties props = new TypedProperties();
-    props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
+    props.setProperty("hoodie.streamer.source.kafka.topic", topic);
     props.setProperty("bootstrap.servers", testUtils.brokerAddress());
     props.setProperty("auto.offset.reset", resetStrategy);
     props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
+    props.setProperty("hoodie.streamer.kafka.source.maxEvents",
         maxEventsToReadFromKafkaSource != null ? 
String.valueOf(maxEventsToReadFromKafkaSource) :
             
String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
     props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
@@ -160,8 +160,8 @@ public class TestAvroKafkaSource extends 
SparkClientFunctionalTestHarness {
         "test", dataGen.generateGenericRecord());
     JavaRDD<ConsumerRecord<Object, Object>> rdd = 
jsc().parallelize(Arrays.asList(recordConsumerRecord));
     TypedProperties props = new TypedProperties();
-    props.put("hoodie.deltastreamer.source.kafka.topic", "test");
-    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
SCHEMA_PATH);
+    props.put("hoodie.streamer.source.kafka.topic", "test");
+    props.put("hoodie.streamer.schemaprovider.source.schema.file", 
SCHEMA_PATH);
     SchemaProvider schemaProvider = 
UtilHelpers.wrapSchemaProviderWithPostProcessor(
         
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), 
props, jsc()), props, jsc(), new ArrayList<>());
 
@@ -191,11 +191,11 @@ public class TestAvroKafkaSource extends 
SparkClientFunctionalTestHarness {
     final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";
     TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
 
-    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
SCHEMA_PATH);
+    props.put("hoodie.streamer.schemaprovider.source.schema.file", 
SCHEMA_PATH);
     SchemaProvider schemaProvider = 
UtilHelpers.wrapSchemaProviderWithPostProcessor(
         
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), 
props, jsc()), props, jsc(), new ArrayList<>());
 
-    props.put("hoodie.deltastreamer.source.kafka.value.deserializer.class", 
ByteArrayDeserializer.class.getName());
+    props.put("hoodie.streamer.source.kafka.value.deserializer.class", 
ByteArrayDeserializer.class.getName());
     int numPartitions = 2;
     int numMessages = 30;
     testUtils.createTopic(topic,numPartitions);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
index 8eaa1d95b23..6a2bbcd0136 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
@@ -48,9 +48,9 @@ public class TestCsvDFSSource extends 
AbstractDFSSourceTestBase {
   @Override
   public Source prepareDFSSource() {
     TypedProperties props = new TypedProperties();
-    props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
-    props.setProperty("hoodie.deltastreamer.csv.header", 
Boolean.toString(true));
-    props.setProperty("hoodie.deltastreamer.csv.sep", "\t");
+    props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
+    props.setProperty("hoodie.streamer.csv.header", Boolean.toString(true));
+    props.setProperty("hoodie.streamer.csv.sep", "\t");
     return new CsvDFSSource(props, jsc, sparkSession, schemaProvider);
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index c1844c7a2a1..3b018473dc4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -114,8 +114,8 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
     String schemaFilePath = 
TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
     TypedProperties props = new TypedProperties();
-    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
-    props.put("hoodie.deltastreamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
+    props.put("hoodie.streamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put("hoodie.streamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
     this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc));
     MockitoAnnotations.initMocks(this);
   }
@@ -263,14 +263,14 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
 
     setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint));
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
-    
typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix",
 "path/to/skip");
+    
typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix",
 "path/to/skip");
     //1. snapshot query, read all records
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, 
typedProperties);
     //2. incremental query, as commit is present in timeline
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, 
exptected2, typedProperties);
     //3. snapshot query with source limit less than first commit size
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, 
typedProperties);
-    
typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix",
 "path/to");
+    
typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix",
 "path/to");
     //4. As snapshotQuery will return 1 -> same would be return as 
nextCheckpoint (dataset is empty due to ignore prefix).
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, 
typedProperties);
   }
@@ -316,7 +316,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
   private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
                              Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint) {
     TypedProperties typedProperties = setProps(missingCheckpointStrategy);
-    typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format", 
"json");
+    typedProperties.put("hoodie.streamer.source.hoodieincr.file.format", 
"json");
     readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, 
expectedCheckpoint, typedProperties);
   }
 
@@ -388,10 +388,10 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
   private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy) {
     Properties properties = new Properties();
     //String schemaFilePath = 
TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
-    //properties.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
-    properties.put("hoodie.deltastreamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
-    properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", 
basePath());
-    
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
+    //properties.put("hoodie.streamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    properties.put("hoodie.streamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
+    properties.setProperty("hoodie.streamer.source.hoodieincr.path", 
basePath());
+    
properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy",
         missingCheckpointStrategy.name());
     properties.setProperty(CloudSourceConfig.DATAFILE_FORMAT.key(), "json");
     return new TypedProperties(properties);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index b9e20fb3a19..3d9f3362a15 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -337,8 +337,8 @@ public class TestHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
                              String expectedCheckpoint, Option<String> 
snapshotCheckPointImplClassOpt) {
 
     Properties properties = new Properties();
-    properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", 
basePath());
-    
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
 missingCheckpointStrategy.name());
+    properties.setProperty("hoodie.streamer.source.hoodieincr.path", 
basePath());
+    
properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy",
 missingCheckpointStrategy.name());
     // TODO: [HUDI-7081] get rid of this
     properties.setProperty(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
"false");
     snapshotCheckPointImplClassOpt.map(className ->
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
index dcd12ac7c8e..ade781e6c8b 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
@@ -77,11 +77,11 @@ public class TestJdbcSource extends UtilitiesTestBase {
   @BeforeEach
   public void setup() throws Exception {
     super.setup();
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.url", JDBC_URL);
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.driver.class", JDBC_DRIVER);
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.user", JDBC_USER);
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.password", JDBC_PASS);
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.name", "triprec");
+    PROPS.setProperty("hoodie.streamer.jdbc.url", JDBC_URL);
+    PROPS.setProperty("hoodie.streamer.jdbc.driver.class", JDBC_DRIVER);
+    PROPS.setProperty("hoodie.streamer.jdbc.user", JDBC_USER);
+    PROPS.setProperty("hoodie.streamer.jdbc.password", JDBC_PASS);
+    PROPS.setProperty("hoodie.streamer.jdbc.table.name", "triprec");
     connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASS);
   }
 
@@ -93,8 +93,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
 
   @Test
   public void testSingleCommit() {
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", 
"last_insert");
+    PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", 
"last_insert");
 
     try {
       int numRecords = 100;
@@ -116,8 +116,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
 
   @Test
   public void testInsertAndUpdate() {
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", 
"last_insert");
+    PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", 
"last_insert");
 
     try {
       final String commitTime = "000";
@@ -150,8 +150,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
 
   @Test
   public void testTwoCommits() {
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", 
"last_insert");
+    PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", 
"last_insert");
 
     try {
       // Add 10 records with commit time "000"
@@ -178,8 +178,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
 
   @Test
   public void testIncrementalFetchWithCommitTime() {
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", 
"last_insert");
+    PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", 
"last_insert");
 
     try {
       // Add 10 records with commit time "000"
@@ -204,8 +204,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
 
   @Test
   public void testIncrementalFetchWithNoMatchingRows() {
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", 
"last_insert");
+    PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", 
"last_insert");
 
     try {
       // Add 10 records with commit time "000"
@@ -226,8 +226,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
 
   @Test
   public void testIncrementalFetchWhenTableRecordsMoreThanSourceLimit() {
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", 
"id");
+    PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "id");
 
     try {
       // Add 100 records with commit time "000"
@@ -257,8 +257,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
 
   @Test
   public void testIncrementalFetchWhenLastCheckpointMoreThanTableRecords() {
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", 
"id");
+    PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "id");
 
     try {
       // Add 100 records with commit time "000"
@@ -284,8 +284,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
 
   @Test
   public void testIncrementalFetchFallbackToFullFetchWhenError() {
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", 
"last_insert");
+    PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+    PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", 
"last_insert");
 
     try {
       // Add 10 records with commit time "000"
@@ -299,14 +299,14 @@ public class TestJdbcSource extends UtilitiesTestBase {
       // Add 10 records with commit time "001"
       insert("001", 10, connection, DATA_GENERATOR, PROPS);
 
-      PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", 
"dummy_col");
+      PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", 
"dummy_col");
       assertThrows(HoodieException.class, () -> {
         // Start incremental scan with a dummy column that does not exist.
         // This will throw an exception as the default behavior is to not 
fallback to full fetch.
         runSource(Option.of(batch.getCheckpointForNextBatch()), -1);
       });
 
-      
PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch", 
"true");
+      PROPS.setProperty("hoodie.streamer.jdbc.incr.fallback.to.full.fetch", 
"true");
 
       // Start incremental scan with a dummy column that does not exist.
       // This will fallback to full fetch mode but still throw an exception 
checkpointing will fail.
@@ -321,7 +321,7 @@ public class TestJdbcSource extends UtilitiesTestBase {
 
   @Test
   public void testFullFetchWithCommitTime() {
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+    PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false");
 
     try {
       // Add 10 records with commit time "000"
@@ -345,8 +345,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
 
   @Test
   public void testFullFetchWithCheckpoint() {
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", 
"last_insert");
+    PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false");
+    PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", 
"last_insert");
 
     try {
       // Add 10 records with commit time "000"
@@ -360,7 +360,7 @@ public class TestJdbcSource extends UtilitiesTestBase {
 
       // Get max of incremental column
       Column incrementalColumn = rowDataset
-          
.col(PROPS.getString("hoodie.deltastreamer.jdbc.table.incr.column.name"));
+          .col(PROPS.getString("hoodie.streamer.jdbc.table.incr.column.name"));
       final String max = 
rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
           .getString(0);
 
@@ -382,10 +382,10 @@ public class TestJdbcSource extends UtilitiesTestBase {
       // Write secret string to fs in a file
       writeSecretToFs();
       // Remove secret string from props
-      PROPS.remove("hoodie.deltastreamer.jdbc.password");
+      PROPS.remove("hoodie.streamer.jdbc.password");
       // Set property to read secret from fs file
-      PROPS.setProperty("hoodie.deltastreamer.jdbc.password.file", 
"file:///tmp/hudi/config/secret");
-      PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+      PROPS.setProperty("hoodie.streamer.jdbc.password.file", 
"file:///tmp/hudi/config/secret");
+      PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false");
       // Add 10 records with commit time 000
       clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
       Dataset<Row> rowDataset = runSource(Option.empty(), 10).getBatch().get();
@@ -401,8 +401,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
       // Write secret string to fs in a file
       writeSecretToFs();
       // Remove secret string from props
-      PROPS.remove("hoodie.deltastreamer.jdbc.password");
-      PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+      PROPS.remove("hoodie.streamer.jdbc.password");
+      PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false");
       // Add 10 records with commit time 000
       clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
       runSource(Option.empty(), 10);
@@ -411,9 +411,9 @@ public class TestJdbcSource extends UtilitiesTestBase {
 
   @Test
   public void testSourceWithExtraOptions() {
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.extra.options.fetchsize", 
"10");
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
-    PROPS.remove("hoodie.deltastreamer.jdbc.table.incr.column.name");
+    PROPS.setProperty("hoodie.streamer.jdbc.extra.options.fetchsize", "10");
+    PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false");
+    PROPS.remove("hoodie.streamer.jdbc.table.incr.column.name");
     try {
       // Add 20 records with commit time 000
       clearAndInsert("000", 20, connection, DATA_GENERATOR, PROPS);
@@ -426,8 +426,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
 
   @Test
   public void testSourceWithStorageLevel() {
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.storage.level", "NONE");
-    PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+    PROPS.setProperty("hoodie.streamer.jdbc.storage.level", "NONE");
+    PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false");
     try {
       // Add 10 records with commit time 000
       clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
index fde10b2d9a5..24a341fe9c3 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
@@ -44,7 +44,7 @@ public class TestJsonDFSSource extends 
AbstractDFSSourceTestBase {
   @Override
   public Source prepareDFSSource() {
     TypedProperties props = new TypedProperties();
-    props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+    props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
     return new JsonDFSSource(props, jsc, sparkSession, schemaProvider);
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 6b24f57a50d..4b615c50ee1 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -82,7 +82,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
   public void init() throws Exception {
     String schemaFilePath = 
Objects.requireNonNull(SCHEMA_FILE_URL).toURI().getPath();
     TypedProperties props = new TypedProperties();
-    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put("hoodie.streamer.schemaprovider.source.schema.file", 
schemaFilePath);
     schemaProvider = new FilebasedSchemaProvider(props, jsc());
   }
 
@@ -93,11 +93,11 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
 
   static TypedProperties createPropsForJsonKafkaSource(String brokerAddress, 
String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
     TypedProperties props = new TypedProperties();
-    props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
+    props.setProperty("hoodie.streamer.source.kafka.topic", topic);
     props.setProperty("bootstrap.servers", brokerAddress);
     props.setProperty("auto.offset.reset", resetStrategy);
     props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
+    props.setProperty("hoodie.streamer.kafka.source.maxEvents",
         maxEventsToReadFromKafkaSource != null ? 
String.valueOf(maxEventsToReadFromKafkaSource) :
             
String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
     props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
index b6bc3480e3d..1f1a4e2b5c1 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
@@ -80,7 +80,7 @@ public class TestJsonKafkaSourcePostProcessor extends 
SparkClientFunctionalTestH
   public void init() throws Exception {
     String schemaFilePath = 
Objects.requireNonNull(TestJsonKafkaSource.SCHEMA_FILE_URL).toURI().getPath();
     TypedProperties props = new TypedProperties();
-    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put("hoodie.streamer.schemaprovider.source.schema.file", 
schemaFilePath);
     schemaProvider = new FilebasedSchemaProvider(props, jsc());
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
index 44489037e82..159ababcf47 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
@@ -43,7 +43,7 @@ public class TestParquetDFSSource extends 
AbstractDFSSourceTestBase {
   @Override
   public Source prepareDFSSource() {
     TypedProperties props = new TypedProperties();
-    props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+    props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
     return new ParquetDFSSource(props, jsc, sparkSession, schemaProvider);
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
index b56d87c9263..f9679211144 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
@@ -75,11 +75,11 @@ public class TestProtoKafkaSource extends 
BaseTestKafkaSource {
 
   protected TypedProperties createPropsForKafkaSource(String topic, Long 
maxEventsToReadFromKafkaSource, String resetStrategy) {
     TypedProperties props = new TypedProperties();
-    props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
+    props.setProperty("hoodie.streamer.source.kafka.topic", topic);
     props.setProperty("bootstrap.servers", testUtils.brokerAddress());
     props.setProperty("auto.offset.reset", resetStrategy);
     props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
+    props.setProperty("hoodie.streamer.kafka.source.maxEvents",
         maxEventsToReadFromKafkaSource != null ? 
String.valueOf(maxEventsToReadFromKafkaSource) :
             
String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
     props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index 90fbeb3bb35..a9dd11c5544 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -105,8 +105,8 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     metaClient = getHoodieMetaClient(hadoopConf(), basePath());
     String schemaFilePath = 
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
     TypedProperties props = new TypedProperties();
-    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
-    props.put("hoodie.deltastreamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
+    props.put("hoodie.streamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put("hoodie.streamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
     this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc));
   }
 
@@ -186,10 +186,10 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
   private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy) {
     Properties properties = new Properties();
-    properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", 
basePath());
-    
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
+    properties.setProperty("hoodie.streamer.source.hoodieincr.path", 
basePath());
+    
properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy",
         missingCheckpointStrategy.name());
-    
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.file.format", 
"json");
+    properties.setProperty("hoodie.streamer.source.hoodieincr.file.format", 
"json");
     return new TypedProperties(properties);
   }
 
@@ -354,7 +354,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
     setMockQueryRunner(inputDs);
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
-    
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
 "path/to/skip");
+    
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", 
"path/to/skip");
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 1000L, "2", 
typedProperties);
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 
1000L, "2", typedProperties);
@@ -388,7 +388,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
-    
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
 "path/to/skip");
+    
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", 
"path/to/skip");
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 50L, 
"2#path/to/file4.json", typedProperties);
   }
@@ -420,7 +420,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
-    
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
 "path/to/skip");
+    
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", 
"path/to/skip");
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 
50L, "3#path/to/file4.json", typedProperties);
 
@@ -457,14 +457,14 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
-    
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
 "path/to/skip");
+    
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", 
"path/to/skip");
     //1. snapshot query, read all records
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, 
typedProperties);
     //2. incremental query, as commit is present in timeline
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, 
exptected2, typedProperties);
     //3. snapshot query with source limit less than first commit size
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, 
typedProperties);
-    
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
 "path/to");
+    
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", 
"path/to");
     //4. As snapshotQuery will return 1 -> same would be return as 
nextCheckpoint (dataset is empty due to ignore prefix).
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, 
typedProperties);
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
index 89769954d38..ee488e38c6a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
@@ -51,8 +51,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class TestSqlFileBasedSource extends UtilitiesTestBase {
 
   private final boolean useFlattenedSchema = false;
-  private final String sqlFileSourceConfig = 
"hoodie.deltastreamer.source.sql.file";
-  private final String sqlFileSourceConfigEmitChkPointConf = 
"hoodie.deltastreamer.source.sql.checkpoint.emit";
+  private final String sqlFileSourceConfig = "hoodie.streamer.source.sql.file";
+  private final String sqlFileSourceConfigEmitChkPointConf = 
"hoodie.streamer.source.sql.checkpoint.emit";
   protected FilebasedSchemaProvider schemaProvider;
   protected HoodieTestDataGenerator dataGenerator = new 
HoodieTestDataGenerator();
   private String dfsRoot;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
index 64578f3bae3..a738003a3fc 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
@@ -50,7 +50,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 public class TestSqlSource extends UtilitiesTestBase {
 
   private final boolean useFlattenedSchema = false;
-  private final String sqlSourceConfig = 
"hoodie.deltastreamer.source.sql.sql.query";
+  private final String sqlSourceConfig = 
"hoodie.streamer.source.sql.sql.query";
   protected FilebasedSchemaProvider schemaProvider;
   protected HoodieTestDataGenerator dataGenerator = new 
HoodieTestDataGenerator();
   private String dfsRoot;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index c9f46144e96..a57383c43b2 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -86,12 +86,12 @@ public abstract class TestAbstractDebeziumSource extends 
UtilitiesTestBase {
 
   private TypedProperties createPropsForJsonSource() {
     TypedProperties props = new TypedProperties();
-    props.setProperty("hoodie.deltastreamer.source.kafka.topic", 
testTopicName);
+    props.setProperty("hoodie.streamer.source.kafka.topic", testTopicName);
     props.setProperty("bootstrap.servers", testUtils.brokerAddress());
     props.setProperty("auto.offset.reset", "earliest");
     props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-    props.setProperty("hoodie.deltastreamer.schemaprovider.registry.url", 
"localhost");
-    
props.setProperty("hoodie.deltastreamer.source.kafka.value.deserializer.class", 
StringDeserializer.class.getName());
+    props.setProperty("hoodie.streamer.schemaprovider.registry.url", 
"localhost");
+    props.setProperty("hoodie.streamer.source.kafka.value.deserializer.class", 
StringDeserializer.class.getName());
     props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
 
     return props;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index b97e2fa80a0..79f15975cb5 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -69,7 +69,7 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
     List<CloudObjectMetadata> input = Collections.singletonList(new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1));
 
     TypedProperties properties = new TypedProperties();
-    
properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path",
 "country,state");
+    
properties.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", 
"country,state");
     Option<Dataset<Row>> result = 
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, 
"json");
     Assertions.assertTrue(result.isPresent());
     Assertions.assertEquals(1, result.get().count());
@@ -82,9 +82,9 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
     TypedProperties props = new TypedProperties();
     
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc");
     String schemaFilePath = 
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath();
-    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
-    props.put("hoodie.deltastreamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
-    
props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", 
"country,state");
+    props.put("hoodie.streamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put("hoodie.streamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
+    props.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", 
"country,state");
     List<CloudObjectMetadata> input = Collections.singletonList(new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1));
     Option<Dataset<Row>> result = 
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json", 
Option.of(new FilebasedSchemaProvider(props, jsc)));
     Assertions.assertTrue(result.isPresent());
@@ -97,8 +97,8 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
   public void partitionKeyNotPresentInPath() {
     List<CloudObjectMetadata> input = Collections.singletonList(new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1));
     TypedProperties properties = new TypedProperties();
-    
properties.put("hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format",
 "false");
-    
properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path",
 "unknown");
+    
properties.put("hoodie.streamer.source.cloud.data.reader.comma.separated.path.format",
 "false");
+    
properties.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", 
"unknown");
     Option<Dataset<Row>> result = 
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, 
"json");
     Assertions.assertTrue(result.isPresent());
     Assertions.assertEquals(1, result.get().count());
@@ -111,9 +111,9 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
     TypedProperties props = new TypedProperties();
     
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc");
     String schemaFilePath = 
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath();
-    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
-    props.put("hoodie.deltastreamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
-    
props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", 
"country,state");
+    props.put("hoodie.streamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put("hoodie.streamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
+    props.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", 
"country,state");
     // Setting this config so that dataset repartition happens inside 
`loadAsDataset`
     props.put("hoodie.streamer.source.cloud.data.partition.max.size", "1");
     List<CloudObjectMetadata> input = Arrays.asList(
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index d3031729e6e..fc3ab90a036 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -65,9 +65,9 @@ public class TestKafkaOffsetGen {
 
   private TypedProperties getConsumerConfigs(String autoOffsetReset, String 
kafkaCheckpointType) {
     TypedProperties props = new TypedProperties();
-    props.put("hoodie.deltastreamer.source.kafka.checkpoint.type", 
kafkaCheckpointType);
+    props.put("hoodie.streamer.source.kafka.checkpoint.type", 
kafkaCheckpointType);
     props.put("auto.offset.reset", autoOffsetReset);
-    props.put("hoodie.deltastreamer.source.kafka.topic", testTopicName);
+    props.put("hoodie.streamer.source.kafka.topic", testTopicName);
     props.setProperty("bootstrap.servers", testUtils.brokerAddress());
     props.setProperty("key.deserializer", StringDeserializer.class.getName());
     props.setProperty("value.deserializer", 
StringDeserializer.class.getName());
@@ -250,7 +250,7 @@ public class TestKafkaOffsetGen {
     testUtils.createTopic(testTopicName, 1);
     boolean topicExists = kafkaOffsetGen.checkTopicExists(new 
KafkaConsumer(props));
     assertTrue(topicExists);
-    props.put("hoodie.deltastreamer.source.kafka.topic", "random");
+    props.put("hoodie.streamer.source.kafka.topic", "random");
     kafkaOffsetGen = new KafkaOffsetGen(props);
     topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer(props));
     assertFalse(topicExists);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index a949335a21a..a200f3a5151 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -450,14 +450,14 @@ public class UtilitiesTestBase {
     public static TypedProperties setupSchemaOnDFS(String scope, String 
filename) throws IOException {
       UtilitiesTestBase.Helpers.copyToDFS(scope + "/" + filename, fs, basePath 
+ "/" + filename);
       TypedProperties props = new TypedProperties();
-      
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
basePath + "/" + filename);
+      props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", 
basePath + "/" + filename);
       return props;
     }
 
     public static TypedProperties setupSchemaOnDFSWithAbsoluteScope(String 
scope, String filename) throws IOException {
       UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(scope + "/" + 
filename, fs, basePath + "/" + filename);
       TypedProperties props = new TypedProperties();
-      
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
basePath + "/" + filename);
+      props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", 
basePath + "/" + filename);
       return props;
     }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
index 56d435ddf0f..08e73d36bc0 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.collection.RocksDBBasedMap;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.utilities.config.SourceTestConfig;
@@ -63,11 +64,10 @@ public abstract class AbstractBaseTestSource extends 
AvroSource {
 
   public static void initDataGen(TypedProperties props, int partition) {
     try {
-      boolean useRocksForTestDataGenKeys = 
props.getBoolean(SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS.key(),
-          SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS.defaultValue());
-      String baseStoreDir = 
props.getString(SourceTestConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS.key(),
+      boolean useRocksForTestDataGenKeys = 
ConfigUtils.getBooleanWithAltKeys(props, 
SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS);
+      String baseStoreDir = ConfigUtils.getStringWithAltKeys(props, 
SourceTestConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS,
           File.createTempFile("test_data_gen", ".keys").getParent()) + "/" + 
partition;
-      LOG.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", 
BaseStoreDir=" + baseStoreDir);
+      LOG.info("useRocksForTestDataGenKeys={}, BaseStoreDir={}", 
useRocksForTestDataGenKeys, baseStoreDir);
       dataGeneratorMap.put(partition, new 
HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS,
           useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) : 
new HashMap<>()));
     } catch (IOException e) {
@@ -106,18 +106,17 @@ public abstract class AbstractBaseTestSource extends 
AvroSource {
 
   protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, 
int sourceLimit, String instantTime,
       int partition) {
-    int maxUniqueKeys =
-        props.getInteger(SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), 
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.defaultValue());
+    int maxUniqueKeys = ConfigUtils.getIntWithAltKeys(props, 
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP);
 
     HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition);
 
     // generate `sourceLimit` number of upserts each time.
     int numExistingKeys = 
dataGenerator.getNumExistingKeys(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
-    LOG.info("NumExistingKeys=" + numExistingKeys);
+    LOG.info("NumExistingKeys={}", numExistingKeys);
 
     int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
     int numInserts = sourceLimit - numUpdates;
-    LOG.info("Before adjustments => numInserts=" + numInserts + ", 
numUpdates=" + numUpdates);
+    LOG.info("Before adjustments => numInserts={}, numUpdates={}", numInserts, 
numUpdates);
     boolean reachedMax = false;
 
     if (numInserts + numExistingKeys > maxUniqueKeys) {
@@ -134,17 +133,16 @@ public abstract class AbstractBaseTestSource extends 
AvroSource {
     Stream<GenericRecord> deleteStream = Stream.empty();
     Stream<GenericRecord> updateStream;
     long memoryUsage1 = Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();
-    LOG.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total 
Memory=" + Runtime.getRuntime().totalMemory()
-        + ", Free Memory=" + Runtime.getRuntime().freeMemory());
+    LOG.info("Before DataGen. Memory Usage={}, Total Memory={}, Free 
Memory={}", memoryUsage1, Runtime.getRuntime().totalMemory(),
+        Runtime.getRuntime().freeMemory());
     if (!reachedMax && numUpdates >= 50) {
-      LOG.info("After adjustments => NumInserts=" + numInserts + ", 
NumUpdates=" + (numUpdates - 50) + ", NumDeletes=50, maxUniqueRecords="
-          + maxUniqueKeys);
+      LOG.info("After adjustments => NumInserts={}, NumUpdates={}, 
NumDeletes=50, maxUniqueRecords={}", numInserts, (numUpdates - 50), 
maxUniqueKeys);
       // if we generate update followed by deletes -> some keys in update 
batch might be picked up for deletes. Hence generating delete batch followed by 
updates
       deleteStream = 
dataGenerator.generateUniqueDeleteRecordStream(instantTime, 
50).map(AbstractBaseTestSource::toGenericRecord);
       updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, 
numUpdates - 50, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
           .map(AbstractBaseTestSource::toGenericRecord);
     } else {
-      LOG.info("After adjustments => NumInserts=" + numInserts + ", 
NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
+      LOG.info("After adjustments => NumInserts={}, NumUpdates={}, 
maxUniqueRecords={}", numInserts, numUpdates, maxUniqueKeys);
       updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, 
numUpdates, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
           .map(AbstractBaseTestSource::toGenericRecord);
     }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
index 4bcbdbbe874..808a8efb8a4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.testutils.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.config.SourceTestConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -46,15 +47,14 @@ public class DistributedTestDataSource extends 
AbstractBaseTestSource {
   public DistributedTestDataSource(TypedProperties props, JavaSparkContext 
sparkContext, SparkSession sparkSession,
       SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider);
-    this.numTestSourcePartitions =
-        props.getInteger(SourceTestConfig.NUM_SOURCE_PARTITIONS_PROP.key(), 
SourceTestConfig.NUM_SOURCE_PARTITIONS_PROP.defaultValue());
+    this.numTestSourcePartitions = ConfigUtils.getIntWithAltKeys(props, 
SourceTestConfig.NUM_SOURCE_PARTITIONS_PROP);
   }
 
   @Override
   protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> 
lastCkptStr, long sourceLimit) {
     int nextCommitNum = lastCkptStr.map(s -> Integer.parseInt(s) + 
1).orElse(0);
     String instantTime = String.format("%05d", nextCommitNum);
-    LOG.info("Source Limit is set to " + sourceLimit);
+    LOG.info("Source Limit is set to {}", sourceLimit);
 
     // No new data.
     if (sourceLimit <= 0) {
@@ -65,15 +65,14 @@ public class DistributedTestDataSource extends 
AbstractBaseTestSource {
     newProps.putAll(props);
 
     // Set the maxUniqueRecords per partition for TestDataSource
-    int maxUniqueRecords =
-        props.getInteger(SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), 
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.defaultValue());
+    int maxUniqueRecords = ConfigUtils.getIntWithAltKeys(props, 
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP);
     String maxUniqueRecordsPerPartition = String.valueOf(Math.max(1, 
maxUniqueRecords / numTestSourcePartitions));
     newProps.setProperty(SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), 
maxUniqueRecordsPerPartition);
     int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / 
numTestSourcePartitions));
     JavaRDD<GenericRecord> avroRDD =
         sparkContext.parallelize(IntStream.range(0, 
numTestSourcePartitions).boxed().collect(Collectors.toList()),
             numTestSourcePartitions).mapPartitionsWithIndex((p, idx) -> {
-              LOG.info("Initializing source with newProps=" + newProps);
+              LOG.info("Initializing source with newProps={}", newProps);
               if (!dataGeneratorMap.containsKey(p)) {
                 initDataGen(newProps, p);
               }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
index 1b0cc7f52a6..ea2ce8ed86f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
@@ -87,7 +87,7 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
   public void testSqlFileBasedTransformerIncorrectConfig() {
     // Test if the class throws hoodie IO exception correctly when given a 
incorrect config.
     props.setProperty(
-        "hoodie.deltastreamer.transformer.sql.file",
+        "hoodie.streamer.transformer.sql.file",
         UtilitiesTestBase.basePath + "/non-exist-sql-file.sql");
     assertThrows(
         HoodieTransformException.class,
@@ -103,7 +103,7 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
 
     // Test if the SQL file based transformer works as expected for the 
invalid SQL statements.
     props.setProperty(
-        "hoodie.deltastreamer.transformer.sql.file",
+        "hoodie.streamer.transformer.sql.file",
         UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql");
     assertThrows(
         ParseException.class,
@@ -119,7 +119,7 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
 
     // Test if the SQL file based transformer works as expected for the empty 
SQL statements.
     props.setProperty(
-        "hoodie.deltastreamer.transformer.sql.file",
+        "hoodie.streamer.transformer.sql.file",
         UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql");
     Dataset<Row> emptyRow = sqlFileTransformer.apply(jsc, sparkSession, 
inputDatasetRows, props);
     String[] actualRows = 
emptyRow.as(Encoders.STRING()).collectAsList().toArray(new String[0]);
@@ -136,7 +136,7 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
 
     // Test if the SQL file based transformer works as expected for the 
correct input.
     props.setProperty(
-        "hoodie.deltastreamer.transformer.sql.file",
+        "hoodie.streamer.transformer.sql.file",
         UtilitiesTestBase.basePath + "/sql-file-transformer.sql");
     Dataset<Row> transformedRow =
         sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java
index 6f05dc1b184..e9f6f9e4fd3 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java
@@ -78,7 +78,7 @@ public class TestSqlQueryBasedTransformer {
         + "from\n"
         + "\t<SRC>";
     TypedProperties props = new TypedProperties();
-    props.put("hoodie.deltastreamer.transformer.sql", transSql);
+    props.put("hoodie.streamer.transformer.sql", transSql);
 
     // transform
     SqlQueryBasedTransformer transformer = new SqlQueryBasedTransformer();
diff --git 
a/hudi-utilities/src/test/resources/streamer-config/dfs-source.properties 
b/hudi-utilities/src/test/resources/streamer-config/dfs-source.properties
index 3a5edb2b6f2..35beefab7b2 100644
--- a/hudi-utilities/src/test/resources/streamer-config/dfs-source.properties
+++ b/hudi-utilities/src/test/resources/streamer-config/dfs-source.properties
@@ -20,8 +20,8 @@ include=base.properties
 hoodie.datasource.write.recordkey.field=_row_key
 hoodie.datasource.write.partitionpath.field=driver
 # Schema provider props (change to absolute path based on your installation)
-hoodie.deltastreamer.filebased.schemaprovider.source.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/source.avsc
-hoodie.deltastreamer.filebased.schemaprovider.target.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/target.avsc
+hoodie.streamer.filebased.schemaprovider.source.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/source.avsc
+hoodie.streamer.filebased.schemaprovider.target.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/target.avsc
 # DFS Source
-hoodie.deltastreamer.source.dfs.root=file:///tmp/hoodie-dfs-input
+hoodie.streamer.source.dfs.root=file:///tmp/hoodie-dfs-input
 
diff --git 
a/hudi-utilities/src/test/resources/streamer-config/invalid_hive_sync_uber_config.properties
 
b/hudi-utilities/src/test/resources/streamer-config/invalid_hive_sync_uber_config.properties
index 5c569c5d0a0..248de399272 100644
--- 
a/hudi-utilities/src/test/resources/streamer-config/invalid_hive_sync_uber_config.properties
+++ 
b/hudi-utilities/src/test/resources/streamer-config/invalid_hive_sync_uber_config.properties
@@ -18,6 +18,6 @@
 include=base.properties
 hoodie.datasource.write.recordkey.field=_row_key
 hoodie.datasource.write.partitionpath.field=created_at
-hoodie.deltastreamer.source.kafka.topic=test_topic
-hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
-hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd
\ No newline at end of file
+hoodie.streamer.source.kafka.topic=test_topic
+hoodie.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties 
b/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties
index e256b8c77fb..87edb1a1df7 100644
--- a/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties
+++ b/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties
@@ -20,10 +20,10 @@ include=base.properties
 hoodie.datasource.write.recordkey.field=impressionid
 hoodie.datasource.write.partitionpath.field=userid
 # schema provider configs
-hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latest
+hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latest
 # Kafka Source
-#hoodie.deltastreamer.source.kafka.topic=uber_trips
-hoodie.deltastreamer.source.kafka.topic=impressions
+#hoodie.streamer.source.kafka.topic=uber_trips
+hoodie.streamer.source.kafka.topic=impressions
 #Kafka props
 bootstrap.servers=localhost:9092
 auto.offset.reset=earliest
diff --git 
a/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties
 
b/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties
index d415e19eb20..b74f5a080f3 100644
--- 
a/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties
+++ 
b/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties
@@ -18,11 +18,11 @@
 include=base.properties
 hoodie.datasource.write.recordkey.field=_row_key
 hoodie.datasource.write.partitionpath.field=created_at
-hoodie.deltastreamer.source.kafka.topic=topic2
-hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
-hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
+hoodie.streamer.source.kafka.topic=topic2
+hoodie.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
 hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestGenerator
-hoodie.deltastreamer.schemaprovider.registry.baseUrl=http://localhost:8081/subjects/
-hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest
-hoodie.deltastreamer.transformer.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestIdentityTransformer
+hoodie.streamer.schemaprovider.registry.baseUrl=http://localhost:8081/subjects/
+hoodie.streamer.schemaprovider.registry.urlSuffix=-value/versions/latest
+hoodie.streamer.transformer.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestIdentityTransformer
diff --git 
a/hudi-utilities/src/test/resources/streamer-config/sql-transformer.properties 
b/hudi-utilities/src/test/resources/streamer-config/sql-transformer.properties
index 9172337d038..9bfbd889de9 100644
--- 
a/hudi-utilities/src/test/resources/streamer-config/sql-transformer.properties
+++ 
b/hudi-utilities/src/test/resources/streamer-config/sql-transformer.properties
@@ -16,4 +16,4 @@
 # limitations under the License.
 ###
 include=base.properties
-hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, 
a.partition_path, a.trip_type, a.rider, a.driver, a.begin_lat, a.begin_lon, 
a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, 
a.nation, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, 
a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS 
haversine_distance FROM <SRC> a
+hoodie.streamer.transformer.sql=SELECT a.timestamp, a._row_key, 
a.partition_path, a.trip_type, a.rider, a.driver, a.begin_lat, a.begin_lon, 
a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, 
a.nation, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, 
a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS 
haversine_distance FROM <SRC> a
diff --git 
a/hudi-utilities/src/test/resources/streamer-config/uber_config.properties 
b/hudi-utilities/src/test/resources/streamer-config/uber_config.properties
index f5b079265d4..a8e278249e8 100644
--- a/hudi-utilities/src/test/resources/streamer-config/uber_config.properties
+++ b/hudi-utilities/src/test/resources/streamer-config/uber_config.properties
@@ -18,10 +18,10 @@
 include=base.properties
 hoodie.datasource.write.recordkey.field=_row_key
 hoodie.datasource.write.partitionpath.field=created_at
-hoodie.deltastreamer.source.kafka.topic=topic1
-hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
-hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
+hoodie.streamer.source.kafka.topic=topic1
+hoodie.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
 hoodie.datasource.hive_sync.database=uber_hive_db
 hoodie.datasource.hive_sync.table=uber_hive_dummy_table
-hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/random-value/versions/latest
-hoodie.deltastreamer.schemaprovider.registry.targetUrl=http://localhost:8081/subjects/random-value/versions/latest
\ No newline at end of file
+hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/random-value/versions/latest
+hoodie.streamer.schemaprovider.registry.targetUrl=http://localhost:8081/subjects/random-value/versions/latest
\ No newline at end of file

Reply via email to