sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r497257812



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds 
namespace to fixed avro field
+   * 
https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without 
namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found 
hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed 
fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       @bvaradar 
   
   In delta streamer, currently, we have below three flows
   1) [No 
transformation](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335)
   2) [Transformation with 
userProvidedSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L315)
   3) [Transformation without 
userProvidedSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L323)
   
   Only schema converted from spark data type to avro schema has this namespace 
added to fixed fields. In delta streamer, currently we use user provided schema 
([userProvidedSchemaProvider.targetSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L568))
 to convert [bytes to 
avro](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java#L126),
 except for thrid flow (Transformation without userProvidedSchema). In such 
case, we [derive 
schema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L327)
 from spark data type. So, backward compatible issue arises only when we use 
transformer and no user provided schema.
   
   Below is example of avro fixed field with and without namespace.
   
   
`{"name":"height","type":{"type":"fixed","name":"fixed","size":5,"logicalType":"decimal","precision":10,"scale":6}}`
   
   
`{"name":"height","type":{"type":"fixed","name":"fixed","namespace":"hoodie.source.hoodie_source.height","size":5,"logicalType":"decimal","precision":10,"scale":6}}`
   
   Both of these result in same parquet schema
   `required fixed_len_byte_array(5) height (DECIMAL(10,6));`
   
   As we can see here, namespace in fixed field does not seem to have any 
impact on parquet schema. So, may be HoodieFileReader in MergeHelper file you 
referred shouldn't have any issue?  
   
   In general, it looks parquet file in existing hudi dataset would not have 
issue. So, we could rule out issue in COPY ON WRITE table type. But in case of 
MERGE ON READ table, I could see issue for thrid flow (Transformation without 
userProvidedSchema). Below is the stack trace.
   
   ```
   51511 [Executor task launch worker for task 502] ERROR 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner  - Got 
exception when reading log file
   org.apache.avro.AvroTypeException: Found 
hoodie.source.hoodie_source.height.fixed, expecting fixed
        at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at 
org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
        at 
org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
        at 
org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at 
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:157)
        at 
org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128)
        at 
org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:275)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:308)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:241)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
        at 
org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:259)
        at 
org.apache.hudi.HoodieMergeOnReadRDD$$anon$2.<init>(HoodieMergeOnReadRDD.scala:164)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:154)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:67)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   In summary, it looks third flow (Transformation without userProvidedSchema) 
produces different output schema in log file when compared to two other flows 
if there are fixed fields and this means if we want to change from thrid flow 
to say first flow (by removing transformation), then we already have problem 
since log files in MERGE ON READ table will have different schema, if there are 
fixed fields. This PR may cause backward compatible issue for thrid flow but 
would make sure, we produce same schema regardless of which flow we use.
   
   Incase if you have better suggestion to make this work without causing issue 
in existing dataset for third flow, please let me know, happy to update PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to