xiarixiaoyao commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1240122505

   @santoshsb   you need use schema evolution and 
hoodie.datasource.write.reconcile.schema, see the follow codes
   
   ```
     def perf(spark: SparkSession) = {
       import org.apache.spark.sql.SaveMode
       import org.apache.spark.sql.functions._
       import org.apache.hudi.DataSourceWriteOptions
       import org.apache.hudi.DataSourceReadOptions
       import org.apache.hudi.config.HoodieWriteConfig
       import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
       //Define a Patient FHIR resource, for simplicity have deleted most of 
the elements and retained a few
       val orgString = 
"""{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"],"prefix":["Ms."]}]}"""
       val sqlContext = spark.sqlContext
       import sqlContext.implicits._
       val orgStringDf = spark.read.json(Seq(orgString).toDS)
   
       //Specify common DataSourceWriteOptions in the single hudiOptions 
variable
   
       val hudiOptions = Map[String,String](
         HoodieWriteConfig.TABLE_NAME -> "patient_hudi",
         DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
         DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
         DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source",
         DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated",
         DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true")
   
       //Write the orgStringDf to a Hudi table
       orgStringDf.write
         .format("org.apache.hudi")
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
         .options(hudiOptions)
         .mode(SaveMode.Overwrite)
         .save("/work/data/updateTst/hudi/json_schema_tst")
       //Read the Hudi table
       val patienthudi = 
spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")
   
       //Printschema
       patienthudi.printSchema
       //Update: Based on our usecase add a new patient resource, this resource 
might contain new columns and might not have existing columns (normal use case 
with FHIR data)
   
       val updatedString = 
"""{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"]}]}"""
   
       //Convert the new resource string into DF
       val updatedStringDf = spark.read.json(Seq(updatedString).toDS)
   
       //Check the schema of the new resource that is being added
       updatedStringDf.printSchema
   
       //Upsert the new resource
       spark.sql("set hoodie.schema.on.read.enable=true")
       updatedStringDf.write
         .format("org.apache.hudi")
         .options(hudiOptions)
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
         .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, 
"org.apache.hudi.common.model.EmptyHoodieRecordPayload")
         .option("hoodie.datasource.write.reconcile.schema", "true")
         .mode(SaveMode.Append)
         .save("/work/data/updateTst/hudi/json_schema_tst")
   
       //Read the Hudi table
       val patienthudiUpdated = 
spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")
   
       //Print the schema after adding the new record
       patienthudiUpdated.printSchema
   
     }
   ```
   patienthudiUpdated.schema:
     |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- name: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- family: string (nullable = true)
    |    |    |-- given: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- prefix: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- use: string (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)
   
   i think it should be ok , thanks
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to