nsivabalan commented on code in PR #7971: URL: https://github.com/apache/hudi/pull/7971#discussion_r1108704365
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java: ########## @@ -77,16 +78,33 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa @Override JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) { + JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD; if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) { if (schemaProvider == null) { throw new HoodieException("Please provide a valid schema provider class when use ByteArrayDeserializer!"); } - AvroConvertor convertor = new AvroConvertor(schemaProvider.getSourceSchema()); - return KafkaUtils.<String, byte[]>createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, - LocationStrategies.PreferConsistent()).map(obj -> convertor.fromAvroBinary(obj.value())); + AvroConvertor convertor = getAvroConverter(true); + kafkaRDD = KafkaUtils.<String, byte[]>createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, + LocationStrategies.PreferConsistent()).map(obj -> + new ConsumerRecord<>(obj.topic(), obj.partition(), obj.offset(), + obj.key(), convertor.fromAvroBinary(obj.value()))); } else { - return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, - LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value()); + kafkaRDD = KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, + LocationStrategies.PreferConsistent()); } + return processForKafkaOffsets(kafkaRDD); } + + protected JavaRDD<GenericRecord> processForKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) { Review Comment: nit: rename to "mayBeAppendKafkaOffsets" ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java: ########## @@ -52,13 +61,30 @@ public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext @Override JavaRDD<String> toRDD(OffsetRange[] offsetRanges) { - JavaRDD<String> jsonStringRDD = KafkaUtils.createRDD(sparkContext, + JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD = KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()) - .filter(x -> !StringUtils.isNullOrEmpty((String) x.value())) - .map(x -> x.value().toString()); - return postProcess(jsonStringRDD); + .filter(x -> !StringUtils.isNullOrEmpty((String) x.value())); + + JavaRDD<String> result = shouldAppendKafkaOffsets() ? kafkaRDD.mapPartitions(partitionIterator -> { Review Comment: can we move the appending of offsets to a private method and keep this method lean (as you did for AvroKafkaSource) ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessorDisableSource.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema; + +import org.apache.hudi.common.util.Option; + +import org.apache.avro.Schema; + +public class SchemaProviderWithPostProcessorDisableSource extends SchemaProviderWithPostProcessor { + private final SchemaProvider schemaProvider; + private final Option<SchemaPostProcessor> schemaPostProcessor; + public static class Config { + public static final String DISABLE_POST_PROCESSOR_ON_SOURCE_SCHEMA = Review Comment: I mean, we should not name this class as SchemaProviderWithPostProcessor"Disable"Source. Consider this as general schema handling where optionaly post processor could be added for source and target schema. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java: ########## @@ -76,4 +86,14 @@ public void onCommit(String lastCkptStr) { offsetGen.commitOffsetToKafka(lastCkptStr); } } + + protected AvroConvertor getAvroConverter(Boolean preProcessedSchema) { + if (schemaProvider == null) { + throw new HoodieException("Needed schema provider for avro deserializer"); + } + if (preProcessedSchema && (schemaProvider instanceof SchemaProviderWithPostProcessorDisableSource)) { + return new AvroConvertor(((SchemaProviderWithPostProcessorDisableSource) schemaProvider).getSourceSchemaWithoutPostProcessor()); Review Comment: this is what I was pointing out earlier. We should call a generic method. internally whether post processor for source is enabled/disabled should not matter to callers. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java: ########## @@ -50,6 +56,10 @@ protected KafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spar this.metrics = metrics; } + protected boolean shouldAppendKafkaOffsets() { + return KafkaOffsetPostProcessor.class.getName().equals(props.getString(SCHEMA_POST_PROCESSOR_PROP, null)); Review Comment: there could be multiple entries in the schema post processor config right ? should we not parse the config value and find if "KafkaOffsetPostProcessor.class.getName()" is one of them ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org