sathyaprakashg commented on a change in pull request #2012: URL: https://github.com/apache/hudi/pull/2012#discussion_r497257812
########## File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala ########## @@ -364,4 +366,40 @@ object AvroConversionHelper { } } } + + /** + * Remove namespace from fixed field. + * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field + * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177 + * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one + * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed + * + * @param schema Schema from which namespace needs to be removed for fixed fields + * @return input schema with namespace removed for fixed fields, if any + */ + def removeNamespaceFromFixedFields(schema: Schema): Schema ={ Review comment: @bvaradar In delta streamer, currently, we have below three flows 1) [No transformation](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335) 2) [Transformation with userProvidedSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L315) 3) [Transformation without userProvidedSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L323) Only schema converted from spark data type to avro schema has this namespace added to fixed fields. In delta streamer, currently we use user provided schema ([userProvidedSchemaProvider.targetSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L568)) to convert [bytes to avro](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java#L126), except for thrid flow (Transformation without userProvidedSchema). In such case, we [derive schema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L327) from spark data type. So, backward compatible issue arises only when we use transformer and no user provided schema. Below is example of avro fixed field with and without namespace. `{"name":"height","type":{"type":"fixed","name":"fixed","size":5,"logicalType":"decimal","precision":10,"scale":6}}` `{"name":"height","type":{"type":"fixed","name":"fixed","namespace":"hoodie.source.hoodie_source.height","size":5,"logicalType":"decimal","precision":10,"scale":6}}` Both of these result in same parquet schema `required fixed_len_byte_array(5) height (DECIMAL(10,6));` As we can see here, namespace in fixed field does not seem to have any impact on parquet schema. So, may be HoodieFileReader in MergeHelper file you referred shouldn't have any issue? In general, it looks parquet file in existing hudi dataset would not have issue. I tested in COPY ON WRITE table type and couldn't see any issue. But in case of MERGE ON READ table, I could see issue for thrid flow (Transformation without userProvidedSchema). Below is the stack trace. ``` 51511 [Executor task launch worker for task 502] ERROR org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner - Got exception when reading log file org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292) at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) at org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135) at org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146) at org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:157) at org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128) at org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:275) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:308) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:241) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81) at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:259) at org.apache.hudi.HoodieMergeOnReadRDD$$anon$2.<init>(HoodieMergeOnReadRDD.scala:164) at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:154) at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:67) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` In summary, it looks third flow (Transformation without userProvidedSchema) produces different output schema in log file when compared to two other flows if there are fixed fields and this means if we want to change from thrid flow to say first flow (by removing transformation), then we already have problem since log files in MERGE ON READ table will have different schema, if there are fixed fields. This PR may cause backward compatible issue for thrid flow but would make sure, we produce same schema regardless of which flow we use. Incase if you have better suggestion to make this work without causing issue in existing dataset for third flow, please let me know, happy to update PR. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org