[ 
https://issues.apache.org/jira/browse/HUDI-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuan Huy Pham updated HUDI-2374:
--------------------------------
    Remaining Estimate: 48h
     Original Estimate: 48h

> AvroDFSSource does not use the overridden schema to deserialize Avro binaries.
> ------------------------------------------------------------------------------
>
>                 Key: HUDI-2374
>                 URL: https://issues.apache.org/jira/browse/HUDI-2374
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer
>    Affects Versions: 0.9.0
>            Reporter: Xuan Huy Pham
>            Priority: Major
>              Labels: patch
>             Fix For: 0.10.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Hi,
> I am not sure if the AvroDFSSource is intended to ignore the source schema 
> from designated schema provider class, but the current logic always uses the 
> Avro writer schema as reader schema.
>  Logic as of release-0.9.0, Class: 
> {{org.apache.hudi.utilities.sources.AvroDFSSource}}
> {code:java}
> public class AvroDFSSource extends AvroSource {
>   private final DFSPathSelector pathSelector;
>   public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, 
> SparkSession sparkSession,
>       SchemaProvider schemaProvider) throws IOException {
>     super(props, sparkContext, sparkSession, schemaProvider);
>     this.pathSelector = DFSPathSelector
>         .createSourceSelector(props, sparkContext.hadoopConfiguration());
>   }
>   @Override
>   protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> 
> lastCkptStr, long sourceLimit) {
>     Pair<Option<String>, String> selectPathsWithMaxModificationTime =
>         pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, 
> lastCkptStr, sourceLimit);
>     return selectPathsWithMaxModificationTime.getLeft()
>         .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), 
> selectPathsWithMaxModificationTime.getRight()))
>         .orElseGet(() -> new InputBatch<>(Option.empty(), 
> selectPathsWithMaxModificationTime.getRight()));
>   }
>   private JavaRDD<GenericRecord> fromFiles(String pathStr) {
>     sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch Avro 
> data from files");
>     JavaPairRDD<AvroKey, NullWritable> avroRDD = 
> sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class,
>         AvroKey.class, NullWritable.class, 
> sparkContext.hadoopConfiguration());
>     return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
>   }
> }
> {code}
> The {{schemaProvider}} parameter is completely ignored in the constructor, 
> making {{AvroKeyInputFormat}} always use writer schema to read.
> As a result, we often see this from DeltaStream logs:
> {code:java}
> 21/08/30 10:17:24 WARN AvroKeyInputFormat: Reader schema was not set. Use 
> AvroJob.setInputKeySchema() if desired.
> 21/08/30 10:17:24 INFO AvroKeyInputFormat: Using a reader schema equal to the 
> writer schema.
> {code}
> This [https://hudi.apache.org/blog/2021/08/16/kafka-custom-deserializer] is a 
> nice blog writing for AvroKafkaSource that supports BACKWARD_TRANSITIVE 
> schema evolution. For DFS data, I see this is the main blocker. If we pass 
> the source schema from {{schemaProvider}}, we should be able to have the same 
>  BACKWARD_TRANSITIVE schema evolution feature for DFS avro data.
>  
> Suggested Fix: Pass the source schema from {{schemaProvider}} to hadoop 
> configuration key {{avro.schema.input.key}}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to