santoshsb commented on issue #5452: URL: https://github.com/apache/hudi/issues/5452#issuecomment-1239394929
@codope here is the output without the above mentioned config, have also added the code which am using for testing the fix. --------------ERROR-------------------- `22/09/07 18:53:08 ERROR BoundedInMemoryExecutor: error producing records) / 200] org.apache.hudi.exception.HoodieException: unable to read next record from parquet file at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'prefix' not found at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228) at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:480) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91) at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185) at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48) ... 8 more 22/09/07 18:53:09 ERROR BoundedInMemoryExecutor: error consuming records 1) / 1]` --------------CODE--------------------- `~/work/spark-3.2.1-bin-hadoop3.2/bin/spark-shell --jars `ls packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.13.0-SNAPSHOT.jar` --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor //Define a Patient FHIR resource, for simplicity have deleted most of the elements and retained a few val orgString = """{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"],"prefix":["Ms."]}]}""" val orgStringDf = spark.read.json(Seq(orgString).toDS) //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "patient_hudi", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated", DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true") //Write the orgStringDf to a Hudi table orgStringDf.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("/work/data/updateTst/hudi/json_schema_tst") //Read the Hudi table val patienthudi = spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst") //Printschema patienthudi.printSchema //Update: Based on our usecase add a new patient resource, this resource might contain new columns and might not have existing columns (normal use case with FHIR data) val updatedString = """{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"]}]}""" //Convert the new resource string into DF val updatedStringDf = spark.read.json(Seq(updatedString).toDS) //Check the schema of the new resource that is being added updatedStringDf.printSchema //Upsert the new resource updatedStringDf.write .format("org.apache.hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("/work/data/updateTst/hudi/json_schema_tst") //Read the Hudi table val patienthudiUpdated = spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst") //Print the schema after adding the new record patienthudiUpdated.printSchema` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org