This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d2174acd6b Revert "[HUDI-7416] Add interface for StreamProfile to be 
used in StreamSync for reading and writing data (#10687)" (#10734)
9d2174acd6b is described below

commit 9d2174acd6ba02ce580da74167ec6551bd3978be
Author: Vinish Reddy <vinishreddygunne...@gmail.com>
AuthorDate: Fri Feb 23 08:13:56 2024 +0530

    Revert "[HUDI-7416] Add interface for StreamProfile to be used in 
StreamSync for reading and writing data (#10687)" (#10734)
    
    This reverts commit 93cd25fded8b0225ddfc54a49cc40fc5e4ad740c.
---
 .../org/apache/hudi/utilities/UtilHelpers.java     | 19 --------
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  4 +-
 .../hudi/utilities/sources/AvroKafkaSource.java    | 10 ++--
 .../hudi/utilities/sources/JsonKafkaSource.java    | 13 ++----
 .../apache/hudi/utilities/sources/KafkaSource.java | 39 +++++-----------
 .../hudi/utilities/sources/ProtoKafkaSource.java   | 13 ++----
 .../org/apache/hudi/utilities/sources/Source.java  | 11 +----
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 39 ++++++++--------
 .../utilities/streamer/DefaultStreamContext.java   | 48 -------------------
 .../hudi/utilities/streamer/HoodieStreamer.java    | 21 +++------
 .../hudi/utilities/streamer/SourceProfile.java     | 54 ----------------------
 .../utilities/streamer/SourceProfileSupplier.java  | 34 --------------
 .../hudi/utilities/streamer/StreamContext.java     | 44 ------------------
 .../apache/hudi/utilities/streamer/StreamSync.java | 10 ++--
 .../utilities/sources/BaseTestKafkaSource.java     | 51 --------------------
 .../utilities/sources/TestJsonKafkaSource.java     | 17 ++-----
 .../utilities/sources/TestProtoKafkaSource.java    |  3 +-
 17 files changed, 60 insertions(+), 370 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 18af52d334e..18e92a8463c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -66,7 +66,6 @@ import org.apache.hudi.utilities.sources.InputBatch;
 import org.apache.hudi.utilities.sources.Source;
 import 
org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor;
 import 
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
-import org.apache.hudi.utilities.streamer.StreamContext;
 import org.apache.hudi.utilities.transform.ChainedTransformer;
 import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
@@ -157,24 +156,6 @@ public class UtilHelpers {
     }
   }
 
-  public static Source createSource(String sourceClass, TypedProperties cfg, 
JavaSparkContext jssc,
-                                    SparkSession sparkSession, 
HoodieIngestionMetrics metrics, StreamContext streamContext)
-      throws IOException {
-    try {
-      try {
-        return (Source) ReflectionUtils.loadClass(sourceClass,
-            new Class<?>[] {TypedProperties.class, JavaSparkContext.class,
-                SparkSession.class,
-                HoodieIngestionMetrics.class, streamContext.getClass()},
-            cfg, jssc, sparkSession, metrics, streamContext);
-      } catch (HoodieException e) {
-        return createSource(sourceClass, cfg, jssc, sparkSession, 
streamContext.getSchemaProvider(), metrics);
-      }
-    } catch (Throwable e) {
-      throw new IOException("Could not load source class " + sourceClass, e);
-    }
-  }
-
   public static JsonKafkaSourcePostProcessor 
createJsonKafkaSourcePostProcessor(String postProcessorClassNames, 
TypedProperties props) throws IOException {
     if (StringUtils.isNullOrEmpty(postProcessorClassNames)) {
       return null;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 4002d1579bb..c794db32510 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -22,9 +22,7 @@ package org.apache.hudi.utilities.deltastreamer;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.HoodieStreamer;
 import org.apache.hudi.utilities.streamer.StreamSync;
 
@@ -51,6 +49,6 @@ public class DeltaSync extends StreamSync {
   public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, 
SchemaProvider schemaProvider,
                    TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext, FileSystem fs, Configuration conf,
                    Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
-    super(cfg, sparkSession, props, hoodieSparkContext, fs, conf, 
onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider, 
Option.empty()));
+    super(cfg, sparkSession, schemaProvider, props, hoodieSparkContext, fs, 
conf, onInitializingHoodieWriteClient);
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 46095590430..2bf92280faf 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -27,8 +27,6 @@ import 
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
-import org.apache.hudi.utilities.streamer.StreamContext;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -71,11 +69,9 @@ public class AvroKafkaSource extends 
KafkaSource<GenericRecord> {
 
   public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
                          SchemaProvider schemaProvider, HoodieIngestionMetrics 
metrics) {
-    this(props, sparkContext, sparkSession, metrics, new 
DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider,
 props, sparkContext), Option.empty()));
-  }
-
-  public AvroKafkaSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, 
StreamContext streamContext) {
-    super(properties, sparkContext, sparkSession, SourceType.AVRO, metrics, 
streamContext);
+    super(props, sparkContext, sparkSession,
+        UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, props, 
sparkContext),
+        SourceType.AVRO, metrics);
     this.originalSchemaProvider = schemaProvider;
 
     props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, 
StringDeserializer.class.getName());
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 0a609dde720..eb67abfee3a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.config.JsonKafkaPostProcessorConfig;
@@ -28,8 +27,6 @@ import 
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import 
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
-import org.apache.hudi.utilities.streamer.StreamContext;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -47,10 +44,10 @@ import java.util.LinkedList;
 import java.util.List;
 
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
-import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 
 /**
  * Read json kafka data.
@@ -59,11 +56,9 @@ public class JsonKafkaSource extends KafkaSource<String> {
 
   public JsonKafkaSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession,
                          SchemaProvider schemaProvider, HoodieIngestionMetrics 
metrics) {
-    this(properties, sparkContext, sparkSession, metrics, new 
DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider,
 properties, sparkContext), Option.empty()));
-  }
-
-  public JsonKafkaSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, 
StreamContext streamContext) {
-    super(properties, sparkContext, sparkSession, SourceType.JSON, metrics, 
streamContext);
+    super(properties, sparkContext, sparkSession,
+        UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, 
properties, sparkContext),
+        SourceType.JSON, metrics);
     properties.put("key.deserializer", StringDeserializer.class.getName());
     properties.put("value.deserializer", StringDeserializer.class.getName());
     this.offsetGen = new KafkaOffsetGen(props);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 52a6a1217cc..bb26d579582 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -26,8 +26,6 @@ import 
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
-import org.apache.hudi.utilities.streamer.SourceProfile;
-import org.apache.hudi.utilities.streamer.StreamContext;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -52,9 +50,9 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
   protected final boolean shouldAddOffsets;
 
   protected KafkaSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
-                        SourceType sourceType, HoodieIngestionMetrics metrics, 
StreamContext streamContext) {
-    super(props, sparkContext, sparkSession, sourceType, streamContext);
-    this.schemaProvider = streamContext.getSchemaProvider();
+                        SchemaProvider schemaProvider, SourceType sourceType, 
HoodieIngestionMetrics metrics) {
+    super(props, sparkContext, sparkSession, schemaProvider, sourceType);
+    this.schemaProvider = schemaProvider;
     this.metrics = metrics;
     this.shouldAddOffsets = 
KafkaOffsetPostProcessor.Config.shouldAddOffsets(props);
   }
@@ -62,34 +60,21 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
   @Override
   protected InputBatch<JavaRDD<T>> fetchNewData(Option<String> 
lastCheckpointStr, long sourceLimit) {
     try {
-      OffsetRange[] offsetRanges;
-      if (sourceProfileSupplier.isPresent() && 
sourceProfileSupplier.get().getSourceProfile() != null) {
-        SourceProfile<Long> kafkaSourceProfile = 
sourceProfileSupplier.get().getSourceProfile();
-        offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
kafkaSourceProfile.getSourceSpecificContext(), 
kafkaSourceProfile.getSourcePartitions(), metrics);
-        LOG.info("About to read numEvents {} of size {} bytes in {} partitions 
from Kafka for topic {} with offsetRanges {}",
-            kafkaSourceProfile.getSourceSpecificContext(), 
kafkaSourceProfile.getMaxSourceBytes(),
-            kafkaSourceProfile.getSourcePartitions(), 
offsetGen.getTopicName(), offsetRanges);
-      } else {
-        offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
sourceLimit, metrics);
+      OffsetRange[] offsetRanges = 
offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
+      long totalNewMsgs = 
KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
+      LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());
+      if (totalNewMsgs <= 0) {
+        
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 
0);
+        return new InputBatch<>(Option.empty(), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
       }
-      return toInputBatch(offsetRanges);
+      
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 
totalNewMsgs);
+      JavaRDD<T> newDataRDD = toRDD(offsetRanges);
+      return new InputBatch<>(Option.of(newDataRDD), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
     } catch (org.apache.kafka.common.errors.TimeoutException e) {
       throw new HoodieSourceTimeoutException("Kafka Source timed out " + 
e.getMessage());
     }
   }
 
-  private InputBatch<JavaRDD<T>> toInputBatch(OffsetRange[] offsetRanges) {
-    long totalNewMsgs = 
KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
-    LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());
-    if (totalNewMsgs <= 0) {
-      
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 
0);
-      return new InputBatch<>(Option.empty(), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
-    }
-    
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 
totalNewMsgs);
-    JavaRDD<T> newDataRDD = toRDD(offsetRanges);
-    return new InputBatch<>(Option.of(newDataRDD), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
-  }
-
   abstract JavaRDD<T> toRDD(OffsetRange[] offsetRanges);
 
   @Override
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index 208e591c8f1..67927480454 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -19,15 +19,12 @@
 package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
 import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
-import org.apache.hudi.utilities.streamer.StreamContext;
 
 import com.google.protobuf.Message;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -54,13 +51,9 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
 
   private final String className;
 
-  public ProtoKafkaSource(TypedProperties props, JavaSparkContext 
sparkContext, SparkSession sparkSession,
-                          SchemaProvider schemaProvider, 
HoodieIngestionMetrics metrics) {
-    this(props, sparkContext, sparkSession, metrics, new 
DefaultStreamContext(schemaProvider, Option.empty()));
-  }
-
-  public ProtoKafkaSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, 
StreamContext streamContext) {
-    super(properties, sparkContext, sparkSession, SourceType.PROTO, metrics, 
streamContext);
+  public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext,
+                          SparkSession sparkSession, SchemaProvider 
schemaProvider, HoodieIngestionMetrics metrics) {
+    super(props, sparkContext, sparkSession, schemaProvider, SourceType.PROTO, 
metrics);
     checkRequiredConfigProperties(props, Collections.singletonList(
         ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
     props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index dfb07c718a0..cbc0722056b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -25,9 +25,6 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.callback.SourceCommitCallback;
 import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
-import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
-import org.apache.hudi.utilities.streamer.StreamContext;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
@@ -47,7 +44,6 @@ public abstract class Source<T> implements 
SourceCommitCallback, Serializable {
   protected transient TypedProperties props;
   protected transient JavaSparkContext sparkContext;
   protected transient SparkSession sparkSession;
-  protected transient Option<SourceProfileSupplier> sourceProfileSupplier;
   private transient SchemaProvider overriddenSchemaProvider;
 
   private final SourceType sourceType;
@@ -59,16 +55,11 @@ public abstract class Source<T> implements 
SourceCommitCallback, Serializable {
 
   protected Source(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
       SchemaProvider schemaProvider, SourceType sourceType) {
-    this(props, sparkContext, sparkSession, sourceType, new 
DefaultStreamContext(schemaProvider, Option.empty()));
-  }
-
-  protected Source(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession, SourceType sourceType, StreamContext streamContext) {
     this.props = props;
     this.sparkContext = sparkContext;
     this.sparkSession = sparkSession;
-    this.overriddenSchemaProvider = streamContext.getSchemaProvider();
+    this.overriddenSchemaProvider = schemaProvider;
     this.sourceType = sourceType;
-    this.sourceProfileSupplier = streamContext.getSourceProfileSupplier();
   }
 
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 32df651d556..d5faec3595e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -241,24 +241,7 @@ public class KafkaOffsetGen {
   }
 
   public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, 
long sourceLimit, HoodieIngestionMetrics metrics) {
-    // Come up with final set of OffsetRanges to read (account for new 
partitions, limit number of events)
-    long maxEventsToReadFromKafka = getLongWithAltKeys(props, 
KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE);
 
-    long numEvents;
-    if (sourceLimit == Long.MAX_VALUE) {
-      numEvents = maxEventsToReadFromKafka;
-      LOG.info("SourceLimit not configured, set numEvents to default value : " 
+ maxEventsToReadFromKafka);
-    } else {
-      numEvents = sourceLimit;
-    }
-
-    long minPartitions = getLongWithAltKeys(props, 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
-    LOG.info("getNextOffsetRanges set config " + 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
-
-    return getNextOffsetRanges(lastCheckpointStr, numEvents, minPartitions, 
metrics);
-  }
-
-  public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, 
long numEvents, long minPartitions, HoodieIngestionMetrics metrics) {
     // Obtain current metadata for the topic
     Map<TopicPartition, Long> fromOffsets;
     Map<TopicPartition, Long> toOffsets;
@@ -296,9 +279,29 @@ public class KafkaOffsetGen {
       // Obtain the latest offsets.
       toOffsets = consumer.endOffsets(topicPartitions);
     }
+
+    // Come up with final set of OffsetRanges to read (account for new 
partitions, limit number of events)
+    long maxEventsToReadFromKafka = getLongWithAltKeys(props, 
KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE);
+
+    long numEvents;
+    if (sourceLimit == Long.MAX_VALUE) {
+      numEvents = maxEventsToReadFromKafka;
+      LOG.info("SourceLimit not configured, set numEvents to default value : " 
+ maxEventsToReadFromKafka);
+    } else {
+      numEvents = sourceLimit;
+    }
+
+    // TODO(HUDI-4625) remove
+    if (numEvents < toOffsets.size()) {
+      throw new HoodieException("sourceLimit should not be less than the 
number of kafka partitions");
+    }
+
+    long minPartitions = getLongWithAltKeys(props, 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
+    LOG.info("getNextOffsetRanges set config " + 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
+
     return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, 
numEvents, minPartitions);
   }
-  
+
   /**
    * Fetch partition infos for given topic.
    *
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
deleted file mode 100644
index f8dabeb89c9..00000000000
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.streamer;
-
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.utilities.schema.SchemaProvider;
-
-/**
- * The default implementation for the StreamContext interface,
- * composes SchemaProvider and SourceProfileSupplier currently,
- * can be extended for other arguments in the future.
- */
-public class DefaultStreamContext implements StreamContext {
-
-  private final SchemaProvider schemaProvider;
-  private final Option<SourceProfileSupplier> sourceProfileSupplier;
-
-  public DefaultStreamContext(SchemaProvider schemaProvider, 
Option<SourceProfileSupplier> sourceProfileSupplier) {
-    this.schemaProvider = schemaProvider;
-    this.sourceProfileSupplier = sourceProfileSupplier;
-  }
-
-  @Override
-  public SchemaProvider getSchemaProvider() {
-    return schemaProvider;
-  }
-
-  @Override
-  public Option<SourceProfileSupplier> getSourceProfileSupplier() {
-    return sourceProfileSupplier;
-  }
-}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index ef31cc34ab5..8ecc937c5e7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -143,12 +143,8 @@ public class HoodieStreamer implements Serializable {
     this(cfg, jssc, fs, conf, Option.empty());
   }
 
-  public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
Configuration conf, Option<TypedProperties> propsOverride) throws IOException {
-    this(cfg, jssc, fs, conf, propsOverride, Option.empty());
-  }
-
   public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
Configuration conf,
-                        Option<TypedProperties> propsOverride, 
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
+                        Option<TypedProperties> propsOverride) throws 
IOException {
     this.properties = combineProperties(cfg, propsOverride, 
jssc.hadoopConfiguration());
     if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
       InitialCheckPointProvider checkPointProvider =
@@ -162,7 +158,7 @@ public class HoodieStreamer implements Serializable {
         cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, 
this.properties) : null);
     HoodieSparkEngineContext sparkEngineContext = new 
HoodieSparkEngineContext(jssc);
     this.ingestionService = Option.ofNullable(
-        cfg.runBootstrap ? null : new StreamSyncService(cfg, 
sparkEngineContext, fs, conf, Option.ofNullable(this.properties), 
sourceProfileSupplier));
+        cfg.runBootstrap ? null : new StreamSyncService(cfg, 
sparkEngineContext, fs, conf, Option.ofNullable(this.properties)));
   }
 
   private static TypedProperties combineProperties(Config cfg, 
Option<TypedProperties> propsOverride, Configuration hadoopConf) {
@@ -660,7 +656,7 @@ public class HoodieStreamer implements Serializable {
     private final Option<ConfigurationHotUpdateStrategy> 
configurationHotUpdateStrategyOpt;
 
     public StreamSyncService(Config cfg, HoodieSparkEngineContext 
hoodieSparkContext, FileSystem fs, Configuration conf,
-                             Option<TypedProperties> properties, 
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
+                             Option<TypedProperties> properties) throws 
IOException {
       super(HoodieIngestionConfig.newBuilder()
           .isContinuous(cfg.continuousMode)
           .withMinSyncInternalSeconds(cfg.minSyncIntervalSeconds).build());
@@ -712,18 +708,13 @@ public class HoodieStreamer implements Serializable {
           UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, 
hoodieSparkContext.jsc()),
           props, hoodieSparkContext.jsc(), cfg.transformerClassNames);
 
-      streamSync = new StreamSync(cfg, sparkSession, props, 
hoodieSparkContext, fs, conf, this::onInitializingWriteClient, new 
DefaultStreamContext(schemaProvider, sourceProfileSupplier));
+      streamSync = new StreamSync(cfg, sparkSession, schemaProvider, props, 
hoodieSparkContext, fs, conf, this::onInitializingWriteClient);
 
     }
 
     public StreamSyncService(HoodieStreamer.Config cfg, 
HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf)
         throws IOException {
-      this(cfg, hoodieSparkContext, fs, conf, Option.empty(), Option.empty());
-    }
-
-    public StreamSyncService(HoodieStreamer.Config cfg, 
HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, 
Option<TypedProperties> properties)
-            throws IOException {
-      this(cfg, hoodieSparkContext, fs, conf, properties, Option.empty());
+      this(cfg, hoodieSparkContext, fs, conf, Option.empty());
     }
 
     private void initializeTableTypeAndBaseFileFormat() {
@@ -737,7 +728,7 @@ public class HoodieStreamer implements Serializable {
       if (streamSync != null) {
         streamSync.close();
       }
-      streamSync = new StreamSync(cfg, sparkSession, props, 
hoodieSparkContext, fs, hiveConf, this::onInitializingWriteClient, new 
DefaultStreamContext(schemaProvider, Option.empty()));
+      streamSync = new StreamSync(cfg, sparkSession, schemaProvider, props, 
hoodieSparkContext, fs, hiveConf, this::onInitializingWriteClient);
     }
 
     @Override
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java
deleted file mode 100644
index d830cf5dee3..00000000000
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.utilities.streamer;
-
-import org.apache.hudi.ApiMaturityLevel;
-import org.apache.hudi.PublicAPIClass;
-import org.apache.hudi.PublicAPIMethod;
-
-/**
- * A profile containing details about how the next input batch in StreamSync 
should be consumed and written.
- * For eg: KafkaSourceProfile contains number of events to consume in this 
sync round.
- * S3SourceProfile contains the list of files to consume in this sync round.
- * HudiIncrementalSourceProfile contains the beginInstant and endInstant 
commit times to consume in this sync round etc.
- *
- * @param <T> The type for source context, varies based on sourceType as 
described above.
- */
-@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
-public interface SourceProfile<T> {
-
-  /**
-   * @return The maxBytes that will be consumed from the source in this sync 
round.
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  long getMaxSourceBytes();
-
-  /**
-   * @return The number of output partitions required in source RDD.
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  int getSourcePartitions();
-
-  /**
-   * @return The source specific context based on sourceType as described 
above.
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  T getSourceSpecificContext();
-}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java
deleted file mode 100644
index 34bfb8dff94..00000000000
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.utilities.streamer;
-
-import org.apache.hudi.ApiMaturityLevel;
-import org.apache.hudi.PublicAPIClass;
-import org.apache.hudi.PublicAPIMethod;
-
-/**
- * Supplier for SourceProfile
- */
-@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
-public interface SourceProfileSupplier {
-  @SuppressWarnings("rawtypes")
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  SourceProfile getSourceProfile();
-}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java
deleted file mode 100644
index bfe337ee3f2..00000000000
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.streamer;
-
-import org.apache.hudi.ApiMaturityLevel;
-import org.apache.hudi.PublicAPIClass;
-import org.apache.hudi.PublicAPIMethod;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.utilities.schema.SchemaProvider;
-
-/**
- * The context required to sync one batch of data to hoodie table using 
StreamSync.
- */
-@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
-public interface StreamContext {
-
-  /**
-   * The schema provider used for reading data from source and also writing to 
hoodie table.
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  SchemaProvider getSchemaProvider();
-
-  /**
-   * An optional stream profile supplying details regarding how the next input 
batch in StreamSync should be consumed and written.
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  Option<SourceProfileSupplier> getSourceProfileSupplier();
-}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 4b83ff92b7b..f87bf083854 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -259,19 +259,19 @@ public class StreamSync implements Serializable, 
Closeable {
   public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, 
SchemaProvider schemaProvider,
                     TypedProperties props, JavaSparkContext jssc, FileSystem 
fs, Configuration conf,
                     Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
-    this(cfg, sparkSession, props, new HoodieSparkEngineContext(jssc), fs, 
conf, onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider, 
Option.empty()));
+    this(cfg, sparkSession, schemaProvider, props, new 
HoodieSparkEngineContext(jssc), fs, conf, onInitializingHoodieWriteClient);
   }
 
-  public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
+  public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, 
SchemaProvider schemaProvider,
                     TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext, FileSystem fs, Configuration conf,
-                    Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient, StreamContext streamContext) throws 
IOException {
+                    Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
     this.cfg = cfg;
     this.hoodieSparkContext = hoodieSparkContext;
     this.sparkSession = sparkSession;
     this.fs = fs;
     this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
     this.props = props;
-    this.userProvidedSchemaProvider = streamContext.getSchemaProvider();
+    this.userProvidedSchemaProvider = schemaProvider;
     this.processedSchema = new SchemaSet();
     this.autoGenerateRecordKeys = 
KeyGenUtils.enableAutoGenerateRecordKeys(props);
     this.keyGenClassName = getKeyGeneratorClassName(new 
TypedProperties(props));
@@ -285,7 +285,7 @@ public class StreamSync implements Serializable, Closeable {
       this.errorWriteFailureStrategy = 
ErrorTableUtils.getErrorWriteFailureStrategy(props);
     }
     refreshTimeline();
-    Source source = UtilHelpers.createSource(cfg.sourceClassName, props, 
hoodieSparkContext.jsc(), sparkSession, metrics, streamContext);
+    Source source = UtilHelpers.createSource(cfg.sourceClassName, props, 
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics);
     this.formatAdapter = new SourceFormatAdapter(source, 
this.errorTableWriter, Option.of(props));
 
     Supplier<Option<Schema>> schemaSupplier = schemaProvider == null ? 
Option::empty : () -> Option.ofNullable(schemaProvider.getSourceSchema());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index 011a1f626b2..b5cbf2738f6 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -28,8 +28,6 @@ import 
org.apache.hudi.utilities.exception.HoodieStreamerException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
-import org.apache.hudi.utilities.streamer.SourceProfile;
-import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -54,7 +52,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Generic tests for all {@link KafkaSource} to ensure all implementations 
properly handle offsets, fetch limits, failure modes, etc.
@@ -63,7 +60,6 @@ abstract class BaseTestKafkaSource extends 
SparkClientFunctionalTestHarness {
   protected static final String TEST_TOPIC_PREFIX = "hoodie_test_";
 
   protected final HoodieIngestionMetrics metrics = 
mock(HoodieIngestionMetrics.class);
-  protected final Option<SourceProfileSupplier> sourceProfile = 
Option.of(mock(SourceProfileSupplier.class));
 
   protected SchemaProvider schemaProvider;
   protected KafkaTestUtils testUtils;
@@ -281,51 +277,4 @@ abstract class BaseTestKafkaSource extends 
SparkClientFunctionalTestHarness {
             + " either the data was aged out by Kafka or the topic may have 
been deleted before all the data in the topic was processed.",
         t.getMessage());
   }
-
-  @Test
-  public void testKafkaSourceWithOffsetsFromSourceProfile() {
-    // topic setup.
-    final String topic = TEST_TOPIC_PREFIX + "testKafkaSourceWithOffsetRanges";
-    testUtils.createTopic(topic, 2);
-    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
-
-    when(sourceProfile.get().getSourceProfile()).thenReturn(new 
TestSourceProfile(Long.MAX_VALUE, 4, 500));
-    SourceFormatAdapter kafkaSource = createSource(props);
-
-    // Test for empty data.
-    assertEquals(Option.empty(), 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 
Long.MAX_VALUE).getBatch());
-
-    // Publish messages and assert source has picked up all messages in 
offsetRanges supplied by input batch profile.
-    sendMessagesToKafka(topic, 1000, 2);
-    InputBatch<JavaRDD<GenericRecord>> fetch1 = 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
-    assertEquals(500, fetch1.getBatch().get().count());
-  }
-
-  static class TestSourceProfile implements SourceProfile<Long> {
-
-    private final long maxSourceBytes;
-    private final int sourcePartitions;
-    private final long numEvents;
-
-    public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long 
numEvents) {
-      this.maxSourceBytes = maxSourceBytes;
-      this.sourcePartitions = sourcePartitions;
-      this.numEvents = numEvents;
-    }
-
-    @Override
-    public long getMaxSourceBytes() {
-      return maxSourceBytes;
-    }
-
-    @Override
-    public int getSourcePartitions() {
-      return sourcePartitions;
-    }
-
-    @Override
-    public Long getSourceSpecificContext() {
-      return numEvents;
-    }
-  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 59a85a06e9c..14ffd31582a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -23,16 +23,14 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.config.HoodieStreamerConfig;
 import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.ErrorEvent;
 import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
 
@@ -62,10 +60,10 @@ import scala.Tuple2;
 import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH;
 import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TARGET_TABLE;
 import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET;
-import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitionsWithNullKafkaKey;
@@ -106,7 +104,7 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
 
   @Override
   SourceFormatAdapter createSource(TypedProperties props) {
-    return new SourceFormatAdapter(new JsonKafkaSource(props, jsc(), spark(), 
metrics, new DefaultStreamContext(schemaProvider, sourceProfile)));
+    return new SourceFormatAdapter(new JsonKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics));
   }
 
   // test whether empty messages can be filtered
@@ -358,13 +356,4 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     dfWithOffsetInfo.unpersist();
     dfWithOffsetInfoAndNullKafkaKey.unpersist();
   }
-
-  @Test
-  public void testCreateSource() throws IOException {
-    final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceCreation";
-    testUtils.createTopic(topic, 2);
-    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
-    Source jsonKafkaSource = 
UtilHelpers.createSource(JsonKafkaSource.class.getName(), props, jsc(), 
spark(), metrics, new DefaultStreamContext(schemaProvider, sourceProfile));
-    assertEquals(Source.SourceType.JSON, jsonKafkaSource.getSourceType());
-  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
index b56d87c9263..52376f89741 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
@@ -25,7 +25,6 @@ import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
 import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.test.proto.Nested;
 import org.apache.hudi.utilities.test.proto.Sample;
@@ -90,7 +89,7 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource 
{
   @Override
   SourceFormatAdapter createSource(TypedProperties props) {
     this.schemaProvider = new ProtoClassBasedSchemaProvider(props, jsc());
-    Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), 
metrics, new DefaultStreamContext(schemaProvider, sourceProfile));
+    Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics);
     return new SourceFormatAdapter(protoKafkaSource);
   }
 


Reply via email to