santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1239394929

   @codope here is the output without the above mentioned config, have also 
added the code which am using for testing the fix.
   --------------ERROR--------------------
   `22/09/07 18:53:08 ERROR BoundedInMemoryExecutor: error producing records) / 
200]
   org.apache.hudi.exception.HoodieException: unable to read next record from 
parquet file
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
        at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema 
mismatch: Avro field 'prefix' not found
        at 
org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
        at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
        at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
        at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228)
        at 
org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74)
        at 
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:480)
        at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293)
        at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137)
        at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
        at 
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
        at 
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
        at 
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185)
        at 
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
        ... 8 more
   22/09/07 18:53:09 ERROR BoundedInMemoryExecutor: error consuming records 1) 
/ 1]`
   
   --------------CODE---------------------
   `~/work/spark-3.2.1-bin-hadoop3.2/bin/spark-shell  --jars `ls 
packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.13.0-SNAPSHOT.jar`
 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
   
   
   import org.apache.spark.sql.SaveMode
   import org.apache.spark.sql.functions._
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.DataSourceReadOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
   //Define a Patient FHIR resource, for simplicity have deleted most of the 
elements and retained a few
   val orgString = 
"""{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"],"prefix":["Ms."]}]}"""
   
   
   val orgStringDf = spark.read.json(Seq(orgString).toDS)
   
   //Specify common DataSourceWriteOptions in the single hudiOptions variable
   
   val hudiOptions = Map[String,String](
   HoodieWriteConfig.TABLE_NAME -> "patient_hudi",
   DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
   DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
   DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source",
   DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated",
   DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true")
   
   //Write the orgStringDf to a Hudi table
   orgStringDf.write
   .format("org.apache.hudi")
   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
   .options(hudiOptions)
   .mode(SaveMode.Overwrite)
   .save("/work/data/updateTst/hudi/json_schema_tst")
   
   //Read the Hudi table
   val patienthudi = 
spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")
   
   //Printschema
   patienthudi.printSchema
   
   
   //Update: Based on our usecase add a new patient resource, this resource 
might contain new columns and might not have existing columns (normal use case 
with FHIR data)
   
   val updatedString = 
"""{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"]}]}"""
   
   
   //Convert the new resource string into DF
   val updatedStringDf = spark.read.json(Seq(updatedString).toDS)
   
   //Check the schema of the new resource that is being added
   updatedStringDf.printSchema
   
   
   //Upsert the new resource
   updatedStringDf.write
   .format("org.apache.hudi")
   .options(hudiOptions)
   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
   .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, 
"org.apache.hudi.common.model.EmptyHoodieRecordPayload")
   .mode(SaveMode.Append)
   .save("/work/data/updateTst/hudi/json_schema_tst")
   
   //Read the Hudi table
   val patienthudiUpdated = 
spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")
   
   //Print the schema after adding the new record
   patienthudiUpdated.printSchema`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to