santoshsb commented on issue #5452: URL: https://github.com/apache/hudi/issues/5452#issuecomment-1111697151
Thanks @yihua, here are the detailed spark shell commands we used `./spark-shell --jars '/Users/balamats/work/hudi/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0-SNAPSHOT.jar' --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor //Define a Patient FHIR resource, for simplicity have deleted most of the elements and retained a few val orgString = """{"resourceType":"Patient","id":"4ad86a5c-926e-439b-9352-f8ac9ab780f1","lastUpdated":"2022-03-11T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd21481","gender":"male","birthDate":"1974-01-05","maritalStatus":{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/v3-MaritalStatus","code":"M","display":"M"}],"text":"M"}}""" //Convert to dataframe val orgStringDf = spark.read.json(Seq(orgString).toDS) //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "patient_hudi", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated", DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true" ) //Write the orgStringDf to a Hudi table orgStringDf.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("/Users/balamats/work/data/updateTst/json_schema_tst/hudi") //Read the Hudi table val patienthudi = spark.read.format("hudi").load("/Users/balamats/work/data/updateTst/json_schema_tst/hudi") //Printschema patienthudi.printSchema root |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- birthDate: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- maritalStatus: struct (nullable = true) | |-- coding: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- code: string (nullable = true) | | | |-- display: string (nullable = true) | | | |-- system: string (nullable = true) | |-- text: string (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true) //Select fields to verify patienthudi.select("id","gender","maritalStatus").show(false) +------------------------------------+------+---------------------------------------------------------------------+ |id |gender|maritalStatus | +------------------------------------+------+---------------------------------------------------------------------+ |4ad86a5c-926e-439b-9352-f8ac9ab780f1|male |{[{M, M, http://terminology.hl7.org/CodeSystem/v3-MaritalStatus}], M}| +------------------------------------+------+---------------------------------------------------------------------+ //Update: Based on our usecase add a new patient resource, this resource might contain new columns and might not have existing columns (normal use case with FHIR data) val updatedString = """{"resourceType":"Patient","id":"596c7a94-bada-4303-85d4-7067c586999e","lastUpdated":"2022-04-20T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","gender":"female","birthDate":"2005-08-30","multipleBirthBoolean":true}""" //Convert the new resource string into DF val updatedStringDf = spark.read.json(Seq(updatedString).toDS) //Check the schema of the new resource that is being added updatedStringDf.printSchema root |-- birthDate: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- multipleBirthBoolean: boolean (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true) //Upsert the new resource updatedStringDf.write .format("org.apache.hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("/Users/balamats/work/data/updateTst/json_schema_tst/hudi") //Read the Hudi table val patienthudi = spark.read.format("hudi").load("/Users/balamats/work/data/updateTst/json_schema_tst/hudi") //Print the schema after adding the new record patienthudi.printSchema root |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- birthDate: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- multipleBirthBoolean: boolean (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true) //Select fields to verify patienthudi.select("id","gender","maritalStatus").show(false) org.apache.spark.sql.AnalysisException: cannot resolve 'maritalStatus' given input columns: [_hoodie_commit_seqno, _hoodie_commit_time, _hoodie_file_name, _hoodie_partition_path, _hoodie_record_key, birthDate, gender, id, lastUpdated, multipleBirthBoolean, resourceType, source]; 'Project [id#130, gender#129, 'maritalStatus] +- Relation [_hoodie_commit_time#123,_hoodie_commit_seqno#124,_hoodie_record_key#125,_hoodie_partition_path#126,_hoodie_file_name#127,birthDate#128,gender#129,id#130,lastUpdated#131,multipleBirthBoolean#132,resourceType#133,source#134] parquet at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:179) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:175) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:535) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:535) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:181) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:193) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:193) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:204) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:209) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:209) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:214) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:323) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:214) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUpWithPruning(QueryPlan.scala:181) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:161) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:175) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:91) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:182) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:205) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:202) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88) at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3734) at org.apache.spark.sql.Dataset.select(Dataset.scala:1454) at org.apache.spark.sql.Dataset.select(Dataset.scala:1471) ... 49 elided` Our expectation after adding the second row was, 1. The new column "multipleBirthBoolean" should have been added to the schema and would be null for the previous entry. 2. The existing "maritalStatus" column present in the destination schema added by the first entry should be present after adding the second entry and should have been null for the second entry. We might be missing some config or we feel that when we add a new entry it should contain all the columns present in the destination schema regardless if they are NULL they should be present, If we do need a uber schema we didn't find the spark code to convert our second dataframe "updatedStringDf" to add those columns with NULL values, basically reading the uber schema and merging it into "updatedStringDf" with NULL values. We did try these commands while creating the second dataframe `val updatedStringDf = spark.read.schema(patientHudi.schema).json(Seq(updatedString).toDS)` But than the new schema for the updatedStringDf misses the "multipleBirthBoolean" column present in the second entry. `root |-- birthDate: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- maritalStatus: struct (nullable = true) | |-- coding: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- code: string (nullable = true) | | | |-- display: string (nullable = true) | | | |-- system: string (nullable = true) | |-- text: string (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true)` Thanks for the help. Santosh -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org