This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 9d2174acd6b Revert "[HUDI-7416] Add interface for StreamProfile to be used in StreamSync for reading and writing data (#10687)" (#10734) 9d2174acd6b is described below commit 9d2174acd6ba02ce580da74167ec6551bd3978be Author: Vinish Reddy <vinishreddygunne...@gmail.com> AuthorDate: Fri Feb 23 08:13:56 2024 +0530 Revert "[HUDI-7416] Add interface for StreamProfile to be used in StreamSync for reading and writing data (#10687)" (#10734) This reverts commit 93cd25fded8b0225ddfc54a49cc40fc5e4ad740c. --- .../org/apache/hudi/utilities/UtilHelpers.java | 19 -------- .../hudi/utilities/deltastreamer/DeltaSync.java | 4 +- .../hudi/utilities/sources/AvroKafkaSource.java | 10 ++-- .../hudi/utilities/sources/JsonKafkaSource.java | 13 ++---- .../apache/hudi/utilities/sources/KafkaSource.java | 39 +++++----------- .../hudi/utilities/sources/ProtoKafkaSource.java | 13 ++---- .../org/apache/hudi/utilities/sources/Source.java | 11 +---- .../utilities/sources/helpers/KafkaOffsetGen.java | 39 ++++++++-------- .../utilities/streamer/DefaultStreamContext.java | 48 ------------------- .../hudi/utilities/streamer/HoodieStreamer.java | 21 +++------ .../hudi/utilities/streamer/SourceProfile.java | 54 ---------------------- .../utilities/streamer/SourceProfileSupplier.java | 34 -------------- .../hudi/utilities/streamer/StreamContext.java | 44 ------------------ .../apache/hudi/utilities/streamer/StreamSync.java | 10 ++-- .../utilities/sources/BaseTestKafkaSource.java | 51 -------------------- .../utilities/sources/TestJsonKafkaSource.java | 17 ++----- .../utilities/sources/TestProtoKafkaSource.java | 3 +- 17 files changed, 60 insertions(+), 370 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 18af52d334e..18e92a8463c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -66,7 +66,6 @@ import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.Source; import org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor; import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor; -import org.apache.hudi.utilities.streamer.StreamContext; import org.apache.hudi.utilities.transform.ChainedTransformer; import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer; import org.apache.hudi.utilities.transform.Transformer; @@ -157,24 +156,6 @@ public class UtilHelpers { } } - public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc, - SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) - throws IOException { - try { - try { - return (Source) ReflectionUtils.loadClass(sourceClass, - new Class<?>[] {TypedProperties.class, JavaSparkContext.class, - SparkSession.class, - HoodieIngestionMetrics.class, streamContext.getClass()}, - cfg, jssc, sparkSession, metrics, streamContext); - } catch (HoodieException e) { - return createSource(sourceClass, cfg, jssc, sparkSession, streamContext.getSchemaProvider(), metrics); - } - } catch (Throwable e) { - throw new IOException("Could not load source class " + sourceClass, e); - } - } - public static JsonKafkaSourcePostProcessor createJsonKafkaSourcePostProcessor(String postProcessorClassNames, TypedProperties props) throws IOException { if (StringUtils.isNullOrEmpty(postProcessorClassNames)) { return null; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 4002d1579bb..c794db32510 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -22,9 +22,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.schema.SchemaProvider; -import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.hudi.utilities.streamer.HoodieStreamer; import org.apache.hudi.utilities.streamer.StreamSync; @@ -51,6 +49,6 @@ public class DeltaSync extends StreamSync { public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException { - super(cfg, sparkSession, props, hoodieSparkContext, fs, conf, onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider, Option.empty())); + super(cfg, sparkSession, schemaProvider, props, hoodieSparkContext, fs, conf, onInitializingHoodieWriteClient); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 46095590430..2bf92280faf 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -27,8 +27,6 @@ import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.AvroConvertor; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; -import org.apache.hudi.utilities.streamer.DefaultStreamContext; -import org.apache.hudi.utilities.streamer.StreamContext; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -71,11 +69,9 @@ public class AvroKafkaSource extends KafkaSource<GenericRecord> { public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) { - this(props, sparkContext, sparkSession, metrics, new DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, props, sparkContext), Option.empty())); - } - - public AvroKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) { - super(properties, sparkContext, sparkSession, SourceType.AVRO, metrics, streamContext); + super(props, sparkContext, sparkSession, + UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, props, sparkContext), + SourceType.AVRO, metrics); this.originalSchemaProvider = schemaProvider; props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index 0a609dde720..eb67abfee3a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -19,7 +19,6 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.config.JsonKafkaPostProcessorConfig; @@ -28,8 +27,6 @@ import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor; -import org.apache.hudi.utilities.streamer.DefaultStreamContext; -import org.apache.hudi.utilities.streamer.StreamContext; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -47,10 +44,10 @@ import java.util.LinkedList; import java.util.List; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; -import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN; +import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN; /** * Read json kafka data. @@ -59,11 +56,9 @@ public class JsonKafkaSource extends KafkaSource<String> { public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) { - this(properties, sparkContext, sparkSession, metrics, new DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, properties, sparkContext), Option.empty())); - } - - public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) { - super(properties, sparkContext, sparkSession, SourceType.JSON, metrics, streamContext); + super(properties, sparkContext, sparkSession, + UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, properties, sparkContext), + SourceType.JSON, metrics); properties.put("key.deserializer", StringDeserializer.class.getName()); properties.put("value.deserializer", StringDeserializer.class.getName()); this.offsetGen = new KafkaOffsetGen(props); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java index 52a6a1217cc..bb26d579582 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java @@ -26,8 +26,6 @@ import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; -import org.apache.hudi.utilities.streamer.SourceProfile; -import org.apache.hudi.utilities.streamer.StreamContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -52,9 +50,9 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> { protected final boolean shouldAddOffsets; protected KafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, - SourceType sourceType, HoodieIngestionMetrics metrics, StreamContext streamContext) { - super(props, sparkContext, sparkSession, sourceType, streamContext); - this.schemaProvider = streamContext.getSchemaProvider(); + SchemaProvider schemaProvider, SourceType sourceType, HoodieIngestionMetrics metrics) { + super(props, sparkContext, sparkSession, schemaProvider, sourceType); + this.schemaProvider = schemaProvider; this.metrics = metrics; this.shouldAddOffsets = KafkaOffsetPostProcessor.Config.shouldAddOffsets(props); } @@ -62,34 +60,21 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> { @Override protected InputBatch<JavaRDD<T>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) { try { - OffsetRange[] offsetRanges; - if (sourceProfileSupplier.isPresent() && sourceProfileSupplier.get().getSourceProfile() != null) { - SourceProfile<Long> kafkaSourceProfile = sourceProfileSupplier.get().getSourceProfile(); - offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, kafkaSourceProfile.getSourceSpecificContext(), kafkaSourceProfile.getSourcePartitions(), metrics); - LOG.info("About to read numEvents {} of size {} bytes in {} partitions from Kafka for topic {} with offsetRanges {}", - kafkaSourceProfile.getSourceSpecificContext(), kafkaSourceProfile.getMaxSourceBytes(), - kafkaSourceProfile.getSourcePartitions(), offsetGen.getTopicName(), offsetRanges); - } else { - offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); + OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); + long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges); + LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); + if (totalNewMsgs <= 0) { + metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 0); + return new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); } - return toInputBatch(offsetRanges); + metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, totalNewMsgs); + JavaRDD<T> newDataRDD = toRDD(offsetRanges); + return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); } catch (org.apache.kafka.common.errors.TimeoutException e) { throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage()); } } - private InputBatch<JavaRDD<T>> toInputBatch(OffsetRange[] offsetRanges) { - long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges); - LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); - if (totalNewMsgs <= 0) { - metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 0); - return new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); - } - metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, totalNewMsgs); - JavaRDD<T> newDataRDD = toRDD(offsetRanges); - return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); - } - abstract JavaRDD<T> toRDD(OffsetRange[] offsetRanges); @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java index 208e591c8f1..67927480454 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java @@ -19,15 +19,12 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig; import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; -import org.apache.hudi.utilities.streamer.DefaultStreamContext; -import org.apache.hudi.utilities.streamer.StreamContext; import com.google.protobuf.Message; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -54,13 +51,9 @@ public class ProtoKafkaSource extends KafkaSource<Message> { private final String className; - public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, - SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) { - this(props, sparkContext, sparkSession, metrics, new DefaultStreamContext(schemaProvider, Option.empty())); - } - - public ProtoKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) { - super(properties, sparkContext, sparkSession, SourceType.PROTO, metrics, streamContext); + public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext, + SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) { + super(props, sparkContext, sparkSession, schemaProvider, SourceType.PROTO, metrics); checkRequiredConfigProperties(props, Collections.singletonList( ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME)); props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java index dfb07c718a0..cbc0722056b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java @@ -25,9 +25,6 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.callback.SourceCommitCallback; import org.apache.hudi.utilities.schema.SchemaProvider; -import org.apache.hudi.utilities.streamer.DefaultStreamContext; -import org.apache.hudi.utilities.streamer.SourceProfileSupplier; -import org.apache.hudi.utilities.streamer.StreamContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; @@ -47,7 +44,6 @@ public abstract class Source<T> implements SourceCommitCallback, Serializable { protected transient TypedProperties props; protected transient JavaSparkContext sparkContext; protected transient SparkSession sparkSession; - protected transient Option<SourceProfileSupplier> sourceProfileSupplier; private transient SchemaProvider overriddenSchemaProvider; private final SourceType sourceType; @@ -59,16 +55,11 @@ public abstract class Source<T> implements SourceCommitCallback, Serializable { protected Source(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, SourceType sourceType) { - this(props, sparkContext, sparkSession, sourceType, new DefaultStreamContext(schemaProvider, Option.empty())); - } - - protected Source(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SourceType sourceType, StreamContext streamContext) { this.props = props; this.sparkContext = sparkContext; this.sparkSession = sparkSession; - this.overriddenSchemaProvider = streamContext.getSchemaProvider(); + this.overriddenSchemaProvider = schemaProvider; this.sourceType = sourceType; - this.sourceProfileSupplier = streamContext.getSourceProfileSupplier(); } @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 32df651d556..d5faec3595e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -241,24 +241,7 @@ public class KafkaOffsetGen { } public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieIngestionMetrics metrics) { - // Come up with final set of OffsetRanges to read (account for new partitions, limit number of events) - long maxEventsToReadFromKafka = getLongWithAltKeys(props, KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE); - long numEvents; - if (sourceLimit == Long.MAX_VALUE) { - numEvents = maxEventsToReadFromKafka; - LOG.info("SourceLimit not configured, set numEvents to default value : " + maxEventsToReadFromKafka); - } else { - numEvents = sourceLimit; - } - - long minPartitions = getLongWithAltKeys(props, KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS); - LOG.info("getNextOffsetRanges set config " + KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions); - - return getNextOffsetRanges(lastCheckpointStr, numEvents, minPartitions, metrics); - } - - public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long numEvents, long minPartitions, HoodieIngestionMetrics metrics) { // Obtain current metadata for the topic Map<TopicPartition, Long> fromOffsets; Map<TopicPartition, Long> toOffsets; @@ -296,9 +279,29 @@ public class KafkaOffsetGen { // Obtain the latest offsets. toOffsets = consumer.endOffsets(topicPartitions); } + + // Come up with final set of OffsetRanges to read (account for new partitions, limit number of events) + long maxEventsToReadFromKafka = getLongWithAltKeys(props, KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE); + + long numEvents; + if (sourceLimit == Long.MAX_VALUE) { + numEvents = maxEventsToReadFromKafka; + LOG.info("SourceLimit not configured, set numEvents to default value : " + maxEventsToReadFromKafka); + } else { + numEvents = sourceLimit; + } + + // TODO(HUDI-4625) remove + if (numEvents < toOffsets.size()) { + throw new HoodieException("sourceLimit should not be less than the number of kafka partitions"); + } + + long minPartitions = getLongWithAltKeys(props, KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS); + LOG.info("getNextOffsetRanges set config " + KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions); + return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents, minPartitions); } - + /** * Fetch partition infos for given topic. * diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java deleted file mode 100644 index f8dabeb89c9..00000000000 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.utilities.streamer; - -import org.apache.hudi.common.util.Option; -import org.apache.hudi.utilities.schema.SchemaProvider; - -/** - * The default implementation for the StreamContext interface, - * composes SchemaProvider and SourceProfileSupplier currently, - * can be extended for other arguments in the future. - */ -public class DefaultStreamContext implements StreamContext { - - private final SchemaProvider schemaProvider; - private final Option<SourceProfileSupplier> sourceProfileSupplier; - - public DefaultStreamContext(SchemaProvider schemaProvider, Option<SourceProfileSupplier> sourceProfileSupplier) { - this.schemaProvider = schemaProvider; - this.sourceProfileSupplier = sourceProfileSupplier; - } - - @Override - public SchemaProvider getSchemaProvider() { - return schemaProvider; - } - - @Override - public Option<SourceProfileSupplier> getSourceProfileSupplier() { - return sourceProfileSupplier; - } -} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index ef31cc34ab5..8ecc937c5e7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -143,12 +143,8 @@ public class HoodieStreamer implements Serializable { this(cfg, jssc, fs, conf, Option.empty()); } - public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, Option<TypedProperties> propsOverride) throws IOException { - this(cfg, jssc, fs, conf, propsOverride, Option.empty()); - } - public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, - Option<TypedProperties> propsOverride, Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException { + Option<TypedProperties> propsOverride) throws IOException { this.properties = combineProperties(cfg, propsOverride, jssc.hadoopConfiguration()); if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) { InitialCheckPointProvider checkPointProvider = @@ -162,7 +158,7 @@ public class HoodieStreamer implements Serializable { cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, this.properties) : null); HoodieSparkEngineContext sparkEngineContext = new HoodieSparkEngineContext(jssc); this.ingestionService = Option.ofNullable( - cfg.runBootstrap ? null : new StreamSyncService(cfg, sparkEngineContext, fs, conf, Option.ofNullable(this.properties), sourceProfileSupplier)); + cfg.runBootstrap ? null : new StreamSyncService(cfg, sparkEngineContext, fs, conf, Option.ofNullable(this.properties))); } private static TypedProperties combineProperties(Config cfg, Option<TypedProperties> propsOverride, Configuration hadoopConf) { @@ -660,7 +656,7 @@ public class HoodieStreamer implements Serializable { private final Option<ConfigurationHotUpdateStrategy> configurationHotUpdateStrategyOpt; public StreamSyncService(Config cfg, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, - Option<TypedProperties> properties, Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException { + Option<TypedProperties> properties) throws IOException { super(HoodieIngestionConfig.newBuilder() .isContinuous(cfg.continuousMode) .withMinSyncInternalSeconds(cfg.minSyncIntervalSeconds).build()); @@ -712,18 +708,13 @@ public class HoodieStreamer implements Serializable { UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, hoodieSparkContext.jsc()), props, hoodieSparkContext.jsc(), cfg.transformerClassNames); - streamSync = new StreamSync(cfg, sparkSession, props, hoodieSparkContext, fs, conf, this::onInitializingWriteClient, new DefaultStreamContext(schemaProvider, sourceProfileSupplier)); + streamSync = new StreamSync(cfg, sparkSession, schemaProvider, props, hoodieSparkContext, fs, conf, this::onInitializingWriteClient); } public StreamSyncService(HoodieStreamer.Config cfg, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf) throws IOException { - this(cfg, hoodieSparkContext, fs, conf, Option.empty(), Option.empty()); - } - - public StreamSyncService(HoodieStreamer.Config cfg, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, Option<TypedProperties> properties) - throws IOException { - this(cfg, hoodieSparkContext, fs, conf, properties, Option.empty()); + this(cfg, hoodieSparkContext, fs, conf, Option.empty()); } private void initializeTableTypeAndBaseFileFormat() { @@ -737,7 +728,7 @@ public class HoodieStreamer implements Serializable { if (streamSync != null) { streamSync.close(); } - streamSync = new StreamSync(cfg, sparkSession, props, hoodieSparkContext, fs, hiveConf, this::onInitializingWriteClient, new DefaultStreamContext(schemaProvider, Option.empty())); + streamSync = new StreamSync(cfg, sparkSession, schemaProvider, props, hoodieSparkContext, fs, hiveConf, this::onInitializingWriteClient); } @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java deleted file mode 100644 index d830cf5dee3..00000000000 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.utilities.streamer; - -import org.apache.hudi.ApiMaturityLevel; -import org.apache.hudi.PublicAPIClass; -import org.apache.hudi.PublicAPIMethod; - -/** - * A profile containing details about how the next input batch in StreamSync should be consumed and written. - * For eg: KafkaSourceProfile contains number of events to consume in this sync round. - * S3SourceProfile contains the list of files to consume in this sync round. - * HudiIncrementalSourceProfile contains the beginInstant and endInstant commit times to consume in this sync round etc. - * - * @param <T> The type for source context, varies based on sourceType as described above. - */ -@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) -public interface SourceProfile<T> { - - /** - * @return The maxBytes that will be consumed from the source in this sync round. - */ - @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - long getMaxSourceBytes(); - - /** - * @return The number of output partitions required in source RDD. - */ - @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - int getSourcePartitions(); - - /** - * @return The source specific context based on sourceType as described above. - */ - @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - T getSourceSpecificContext(); -} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java deleted file mode 100644 index 34bfb8dff94..00000000000 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.utilities.streamer; - -import org.apache.hudi.ApiMaturityLevel; -import org.apache.hudi.PublicAPIClass; -import org.apache.hudi.PublicAPIMethod; - -/** - * Supplier for SourceProfile - */ -@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) -public interface SourceProfileSupplier { - @SuppressWarnings("rawtypes") - @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - SourceProfile getSourceProfile(); -} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java deleted file mode 100644 index bfe337ee3f2..00000000000 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.utilities.streamer; - -import org.apache.hudi.ApiMaturityLevel; -import org.apache.hudi.PublicAPIClass; -import org.apache.hudi.PublicAPIMethod; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.utilities.schema.SchemaProvider; - -/** - * The context required to sync one batch of data to hoodie table using StreamSync. - */ -@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) -public interface StreamContext { - - /** - * The schema provider used for reading data from source and also writing to hoodie table. - */ - @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - SchemaProvider getSchemaProvider(); - - /** - * An optional stream profile supplying details regarding how the next input batch in StreamSync should be consumed and written. - */ - @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - Option<SourceProfileSupplier> getSourceProfileSupplier(); -} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 4b83ff92b7b..f87bf083854 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -259,19 +259,19 @@ public class StreamSync implements Serializable, Closeable { public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException { - this(cfg, sparkSession, props, new HoodieSparkEngineContext(jssc), fs, conf, onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider, Option.empty())); + this(cfg, sparkSession, schemaProvider, props, new HoodieSparkEngineContext(jssc), fs, conf, onInitializingHoodieWriteClient); } - public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, + public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, - Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient, StreamContext streamContext) throws IOException { + Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException { this.cfg = cfg; this.hoodieSparkContext = hoodieSparkContext; this.sparkSession = sparkSession; this.fs = fs; this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient; this.props = props; - this.userProvidedSchemaProvider = streamContext.getSchemaProvider(); + this.userProvidedSchemaProvider = schemaProvider; this.processedSchema = new SchemaSet(); this.autoGenerateRecordKeys = KeyGenUtils.enableAutoGenerateRecordKeys(props); this.keyGenClassName = getKeyGeneratorClassName(new TypedProperties(props)); @@ -285,7 +285,7 @@ public class StreamSync implements Serializable, Closeable { this.errorWriteFailureStrategy = ErrorTableUtils.getErrorWriteFailureStrategy(props); } refreshTimeline(); - Source source = UtilHelpers.createSource(cfg.sourceClassName, props, hoodieSparkContext.jsc(), sparkSession, metrics, streamContext); + Source source = UtilHelpers.createSource(cfg.sourceClassName, props, hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics); this.formatAdapter = new SourceFormatAdapter(source, this.errorTableWriter, Option.of(props)); Supplier<Option<Schema>> schemaSupplier = schemaProvider == null ? Option::empty : () -> Option.ofNullable(schemaProvider.getSourceSchema()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java index 011a1f626b2..b5cbf2738f6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java @@ -28,8 +28,6 @@ import org.apache.hudi.utilities.exception.HoodieStreamerException; import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.streamer.SourceFormatAdapter; -import org.apache.hudi.utilities.streamer.SourceProfile; -import org.apache.hudi.utilities.streamer.SourceProfileSupplier; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -54,7 +52,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Generic tests for all {@link KafkaSource} to ensure all implementations properly handle offsets, fetch limits, failure modes, etc. @@ -63,7 +60,6 @@ abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness { protected static final String TEST_TOPIC_PREFIX = "hoodie_test_"; protected final HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class); - protected final Option<SourceProfileSupplier> sourceProfile = Option.of(mock(SourceProfileSupplier.class)); protected SchemaProvider schemaProvider; protected KafkaTestUtils testUtils; @@ -281,51 +277,4 @@ abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness { + " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.", t.getMessage()); } - - @Test - public void testKafkaSourceWithOffsetsFromSourceProfile() { - // topic setup. - final String topic = TEST_TOPIC_PREFIX + "testKafkaSourceWithOffsetRanges"; - testUtils.createTopic(topic, 2); - TypedProperties props = createPropsForKafkaSource(topic, null, "earliest"); - - when(sourceProfile.get().getSourceProfile()).thenReturn(new TestSourceProfile(Long.MAX_VALUE, 4, 500)); - SourceFormatAdapter kafkaSource = createSource(props); - - // Test for empty data. - assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); - - // Publish messages and assert source has picked up all messages in offsetRanges supplied by input batch profile. - sendMessagesToKafka(topic, 1000, 2); - InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900); - assertEquals(500, fetch1.getBatch().get().count()); - } - - static class TestSourceProfile implements SourceProfile<Long> { - - private final long maxSourceBytes; - private final int sourcePartitions; - private final long numEvents; - - public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long numEvents) { - this.maxSourceBytes = maxSourceBytes; - this.sourcePartitions = sourcePartitions; - this.numEvents = numEvents; - } - - @Override - public long getMaxSourceBytes() { - return maxSourceBytes; - } - - @Override - public int getSourcePartitions() { - return sourcePartitions; - } - - @Override - public Long getSourceSpecificContext() { - return numEvents; - } - } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index 59a85a06e9c..14ffd31582a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -23,16 +23,14 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.InProcessTimeGenerator; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; -import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.config.HoodieStreamerConfig; import org.apache.hudi.utilities.config.KafkaSourceConfig; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.streamer.BaseErrorTableWriter; -import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.hudi.utilities.streamer.ErrorEvent; import org.apache.hudi.utilities.streamer.SourceFormatAdapter; @@ -62,10 +60,10 @@ import scala.Tuple2; import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH; import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TARGET_TABLE; import static org.apache.hudi.utilities.config.KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET; -import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN; +import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN; import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords; import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions; import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitionsWithNullKafkaKey; @@ -106,7 +104,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { @Override SourceFormatAdapter createSource(TypedProperties props) { - return new SourceFormatAdapter(new JsonKafkaSource(props, jsc(), spark(), metrics, new DefaultStreamContext(schemaProvider, sourceProfile))); + return new SourceFormatAdapter(new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics)); } // test whether empty messages can be filtered @@ -358,13 +356,4 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { dfWithOffsetInfo.unpersist(); dfWithOffsetInfoAndNullKafkaKey.unpersist(); } - - @Test - public void testCreateSource() throws IOException { - final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceCreation"; - testUtils.createTopic(topic, 2); - TypedProperties props = createPropsForKafkaSource(topic, null, "earliest"); - Source jsonKafkaSource = UtilHelpers.createSource(JsonKafkaSource.class.getName(), props, jsc(), spark(), metrics, new DefaultStreamContext(schemaProvider, sourceProfile)); - assertEquals(Source.SourceType.JSON, jsonKafkaSource.getSourceType()); - } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java index b56d87c9263..52376f89741 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java @@ -25,7 +25,6 @@ import org.apache.hudi.utilities.config.KafkaSourceConfig; import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig; import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; -import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.hudi.utilities.streamer.SourceFormatAdapter; import org.apache.hudi.utilities.test.proto.Nested; import org.apache.hudi.utilities.test.proto.Sample; @@ -90,7 +89,7 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource { @Override SourceFormatAdapter createSource(TypedProperties props) { this.schemaProvider = new ProtoClassBasedSchemaProvider(props, jsc()); - Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), metrics, new DefaultStreamContext(schemaProvider, sourceProfile)); + Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), schemaProvider, metrics); return new SourceFormatAdapter(protoKafkaSource); }