nsivabalan commented on code in PR #7971:
URL: https://github.com/apache/hudi/pull/7971#discussion_r1108704365


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java:
##########
@@ -77,16 +78,33 @@ public AvroKafkaSource(TypedProperties props, 
JavaSparkContext sparkContext, Spa
 
   @Override
   JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
+    JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD;
     if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
       if (schemaProvider == null) {
         throw new HoodieException("Please provide a valid schema provider 
class when use ByteArrayDeserializer!");
       }
-      AvroConvertor convertor = new 
AvroConvertor(schemaProvider.getSourceSchema());
-      return KafkaUtils.<String, byte[]>createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
-              LocationStrategies.PreferConsistent()).map(obj -> 
convertor.fromAvroBinary(obj.value()));
+      AvroConvertor convertor = getAvroConverter(true);
+      kafkaRDD = KafkaUtils.<String, byte[]>createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
+          LocationStrategies.PreferConsistent()).map(obj ->
+          new ConsumerRecord<>(obj.topic(), obj.partition(), obj.offset(),
+              obj.key(), convertor.fromAvroBinary(obj.value())));
     } else {
-      return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), 
offsetRanges,
-              LocationStrategies.PreferConsistent()).map(obj -> 
(GenericRecord) obj.value());
+      kafkaRDD = KafkaUtils.createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
+          LocationStrategies.PreferConsistent());
     }
+    return processForKafkaOffsets(kafkaRDD);
   }
+
+  protected JavaRDD<GenericRecord> 
processForKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {

Review Comment:
   nit: rename to "mayBeAppendKafkaOffsets"



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java:
##########
@@ -52,13 +61,30 @@ public JsonKafkaSource(TypedProperties properties, 
JavaSparkContext sparkContext
 
   @Override
   JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
-    JavaRDD<String> jsonStringRDD = KafkaUtils.createRDD(sparkContext,
+    JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD = 
KafkaUtils.createRDD(sparkContext,
             offsetGen.getKafkaParams(),
             offsetRanges,
             LocationStrategies.PreferConsistent())
-        .filter(x -> !StringUtils.isNullOrEmpty((String) x.value()))
-        .map(x -> x.value().toString());
-    return postProcess(jsonStringRDD);
+        .filter(x -> !StringUtils.isNullOrEmpty((String) x.value()));
+
+    JavaRDD<String> result = shouldAppendKafkaOffsets() ? 
kafkaRDD.mapPartitions(partitionIterator -> {

Review Comment:
   can we move the appending of offsets to a private method and keep this 
method lean (as you did for AvroKafkaSource)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessorDisableSource.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+
+public class SchemaProviderWithPostProcessorDisableSource extends 
SchemaProviderWithPostProcessor {
+  private final SchemaProvider schemaProvider;
+  private final Option<SchemaPostProcessor> schemaPostProcessor;
+  public static class Config {
+    public static final String DISABLE_POST_PROCESSOR_ON_SOURCE_SCHEMA =

Review Comment:
   I mean, we should not name this class as 
SchemaProviderWithPostProcessor"Disable"Source.
   Consider this as general schema handling where optionaly post processor 
could be added for source and target schema. 
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java:
##########
@@ -76,4 +86,14 @@ public void onCommit(String lastCkptStr) {
       offsetGen.commitOffsetToKafka(lastCkptStr);
     }
   }
+
+  protected AvroConvertor getAvroConverter(Boolean preProcessedSchema) {
+    if (schemaProvider == null) {
+      throw new HoodieException("Needed schema provider for avro 
deserializer");
+    }
+    if (preProcessedSchema && (schemaProvider instanceof 
SchemaProviderWithPostProcessorDisableSource)) {
+      return new AvroConvertor(((SchemaProviderWithPostProcessorDisableSource) 
schemaProvider).getSourceSchemaWithoutPostProcessor());

Review Comment:
   this is what I was pointing out earlier. We should call a generic method. 
internally whether post processor for source is enabled/disabled should not 
matter to callers.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java:
##########
@@ -50,6 +56,10 @@ protected KafkaSource(TypedProperties props, 
JavaSparkContext sparkContext, Spar
     this.metrics = metrics;
   }
 
+  protected boolean shouldAppendKafkaOffsets() {
+    return 
KafkaOffsetPostProcessor.class.getName().equals(props.getString(SCHEMA_POST_PROCESSOR_PROP,
 null));

Review Comment:
   there could be multiple entries in the schema post processor config right ? 
should we not parse the config value and find if 
"KafkaOffsetPostProcessor.class.getName()" is one of them ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to