santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1111697151

   Thanks @yihua, here are the detailed spark shell commands we used 
   
   `./spark-shell --jars 
'/Users/balamats/work/hudi/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0-SNAPSHOT.jar'
 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
   
   import org.apache.spark.sql.SaveMode
   import org.apache.spark.sql.functions._
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.DataSourceReadOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
   //Define a Patient FHIR resource, for simplicity have deleted most of the 
elements and retained a few
   val orgString = 
"""{"resourceType":"Patient","id":"4ad86a5c-926e-439b-9352-f8ac9ab780f1","lastUpdated":"2022-03-11T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd21481","gender":"male","birthDate":"1974-01-05","maritalStatus":{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/v3-MaritalStatus","code":"M","display":"M"}],"text":"M"}}""";
   
   //Convert to dataframe
   val orgStringDf = spark.read.json(Seq(orgString).toDS)
   
   //Specify common DataSourceWriteOptions in the single hudiOptions variable
   val hudiOptions = Map[String,String](
     HoodieWriteConfig.TABLE_NAME -> "patient_hudi",
     DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", 
     DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
     DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source",
     DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated",
    DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true"
   )
   
   //Write the orgStringDf to a Hudi table
   orgStringDf.write
       .format("org.apache.hudi")
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .options(hudiOptions)
       .mode(SaveMode.Overwrite)
       .save("/Users/balamats/work/data/updateTst/json_schema_tst/hudi")
   
   //Read the Hudi table
   val patienthudi  = 
spark.read.format("hudi").load("/Users/balamats/work/data/updateTst/json_schema_tst/hudi")
   
   //Printschema
   patienthudi.printSchema
   root
    |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- birthDate: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- maritalStatus: struct (nullable = true)
    |    |-- coding: array (nullable = true)
    |    |    |-- element: struct (containsNull = true)
    |    |    |    |-- code: string (nullable = true)
    |    |    |    |-- display: string (nullable = true)
    |    |    |    |-- system: string (nullable = true)
    |    |-- text: string (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)
   
    //Select fields to verify
    patienthudi.select("id","gender","maritalStatus").show(false)
   
+------------------------------------+------+---------------------------------------------------------------------+
   |id                                  |gender|maritalStatus                   
                                     |
   
+------------------------------------+------+---------------------------------------------------------------------+
   |4ad86a5c-926e-439b-9352-f8ac9ab780f1|male  |{[{M, M, 
http://terminology.hl7.org/CodeSystem/v3-MaritalStatus}], M}|
   
+------------------------------------+------+---------------------------------------------------------------------+
   
   //Update: Based on our usecase add a new patient resource, this resource 
might contain new columns and might not have existing columns (normal use case 
with FHIR data)
   
   val updatedString =  
"""{"resourceType":"Patient","id":"596c7a94-bada-4303-85d4-7067c586999e","lastUpdated":"2022-04-20T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","gender":"female","birthDate":"2005-08-30","multipleBirthBoolean":true}"""
   
   //Convert the new resource string into DF
   val updatedStringDf = spark.read.json(Seq(updatedString).toDS)
   
   //Check the schema of the new resource that is being added
   updatedStringDf.printSchema
   root
    |-- birthDate: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- multipleBirthBoolean: boolean (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)
   
   
   //Upsert the new resource
   updatedStringDf.write
       .format("org.apache.hudi")
       .options(hudiOptions)
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, 
"org.apache.hudi.common.model.EmptyHoodieRecordPayload")
       .mode(SaveMode.Append)
       .save("/Users/balamats/work/data/updateTst/json_schema_tst/hudi")
   
   //Read the Hudi table
   val patienthudi  = 
spark.read.format("hudi").load("/Users/balamats/work/data/updateTst/json_schema_tst/hudi")
   
   //Print the schema after adding the new record
   patienthudi.printSchema
   root
    |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- birthDate: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- multipleBirthBoolean: boolean (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)
   
    //Select fields to verify
    patienthudi.select("id","gender","maritalStatus").show(false)
   org.apache.spark.sql.AnalysisException: cannot resolve 'maritalStatus' given 
input columns: [_hoodie_commit_seqno, _hoodie_commit_time, _hoodie_file_name, 
_hoodie_partition_path, _hoodie_record_key, birthDate, gender, id, lastUpdated, 
multipleBirthBoolean, resourceType, source];
   'Project [id#130, gender#129, 'maritalStatus]
   +- Relation 
[_hoodie_commit_time#123,_hoodie_commit_seqno#124,_hoodie_record_key#125,_hoodie_partition_path#126,_hoodie_file_name#127,birthDate#128,gender#129,id#130,lastUpdated#131,multipleBirthBoolean#132,resourceType#133,source#134]
 parquet
   
     at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
     at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:179)
     at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:175)
     at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:535)
     at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
     at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:535)
     at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:181)
     at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:193)
     at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
     at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:193)
     at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:204)
     at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:209)
     at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
     at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
     at scala.collection.TraversableLike.map(TraversableLike.scala:286)
     at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
     at scala.collection.AbstractTraversable.map(Traversable.scala:108)
     at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:209)
     at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:214)
     at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:323)
     at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:214)
     at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUpWithPruning(QueryPlan.scala:181)
     at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:161)
     at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:175)
     at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94)
     at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263)
     at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94)
     at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:91)
     at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:182)
     at 
org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:205)
     at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
     at 
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:202)
     at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88)
     at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
     at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
     at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88)
     at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86)
     at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78)
     at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
     at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3734)
     at org.apache.spark.sql.Dataset.select(Dataset.scala:1454)
     at org.apache.spark.sql.Dataset.select(Dataset.scala:1471)
     ... 49 elided`
   
   Our expectation after adding the second row was, 
   1. The new column "multipleBirthBoolean" should have been added to the 
schema and would be null for the previous entry.
   2. The existing "maritalStatus" column present in the destination schema 
added by the first entry should be present after adding the second entry and 
should have been null for the second entry. 
   
   We might be missing some config or we feel that when we add a new entry it 
should contain all the columns present in the destination schema regardless if 
they are NULL they should be present, If we do need a uber schema we didn't 
find the spark code to convert our second dataframe "updatedStringDf" to add 
those columns with NULL values, basically reading the uber schema and merging 
it into "updatedStringDf" with NULL values. We did try these commands while 
creating the second dataframe
   
   `val updatedStringDf = 
spark.read.schema(patientHudi.schema).json(Seq(updatedString).toDS)`
   
   But than the new schema for the updatedStringDf misses the 
"multipleBirthBoolean" column present in the second entry.
   
   `root
    |-- birthDate: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- maritalStatus: struct (nullable = true)
    |    |-- coding: array (nullable = true)
    |    |    |-- element: struct (containsNull = true)
    |    |    |    |-- code: string (nullable = true)
    |    |    |    |-- display: string (nullable = true)
    |    |    |    |-- system: string (nullable = true)
    |    |-- text: string (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)`
   
   Thanks for the help.
   Santosh
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to