[ https://issues.apache.org/jira/browse/HUDI-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xuan Huy Pham updated HUDI-2374: -------------------------------- Remaining Estimate: 48h Original Estimate: 48h > AvroDFSSource does not use the overridden schema to deserialize Avro binaries. > ------------------------------------------------------------------------------ > > Key: HUDI-2374 > URL: https://issues.apache.org/jira/browse/HUDI-2374 > Project: Apache Hudi > Issue Type: Bug > Components: DeltaStreamer > Affects Versions: 0.9.0 > Reporter: Xuan Huy Pham > Priority: Major > Labels: patch > Fix For: 0.10.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Hi, > I am not sure if the AvroDFSSource is intended to ignore the source schema > from designated schema provider class, but the current logic always uses the > Avro writer schema as reader schema. > Logic as of release-0.9.0, Class: > {{org.apache.hudi.utilities.sources.AvroDFSSource}} > {code:java} > public class AvroDFSSource extends AvroSource { > private final DFSPathSelector pathSelector; > public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, > SparkSession sparkSession, > SchemaProvider schemaProvider) throws IOException { > super(props, sparkContext, sparkSession, schemaProvider); > this.pathSelector = DFSPathSelector > .createSourceSelector(props, sparkContext.hadoopConfiguration()); > } > @Override > protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> > lastCkptStr, long sourceLimit) { > Pair<Option<String>, String> selectPathsWithMaxModificationTime = > pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, > lastCkptStr, sourceLimit); > return selectPathsWithMaxModificationTime.getLeft() > .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), > selectPathsWithMaxModificationTime.getRight())) > .orElseGet(() -> new InputBatch<>(Option.empty(), > selectPathsWithMaxModificationTime.getRight())); > } > private JavaRDD<GenericRecord> fromFiles(String pathStr) { > sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch Avro > data from files"); > JavaPairRDD<AvroKey, NullWritable> avroRDD = > sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class, > AvroKey.class, NullWritable.class, > sparkContext.hadoopConfiguration()); > return avroRDD.keys().map(r -> ((GenericRecord) r.datum())); > } > } > {code} > The {{schemaProvider}} parameter is completely ignored in the constructor, > making {{AvroKeyInputFormat}} always use writer schema to read. > As a result, we often see this from DeltaStream logs: > {code:java} > 21/08/30 10:17:24 WARN AvroKeyInputFormat: Reader schema was not set. Use > AvroJob.setInputKeySchema() if desired. > 21/08/30 10:17:24 INFO AvroKeyInputFormat: Using a reader schema equal to the > writer schema. > {code} > This [https://hudi.apache.org/blog/2021/08/16/kafka-custom-deserializer] is a > nice blog writing for AvroKafkaSource that supports BACKWARD_TRANSITIVE > schema evolution. For DFS data, I see this is the main blocker. If we pass > the source schema from {{schemaProvider}}, we should be able to have the same > BACKWARD_TRANSITIVE schema evolution feature for DFS avro data. > > Suggested Fix: Pass the source schema from {{schemaProvider}} to hadoop > configuration key {{avro.schema.input.key}} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)