joao-miranda commented on issue #6552: URL: https://github.com/apache/hudi/issues/6552#issuecomment-1233993157
Thank you for your quick reply. It doesn't seem to be either of those options for this scenario. I created a unit test and added the full code so you can understand our use case better: ``` test("test_hudi_upgrade") { println("=============test_hudi_upgrade=============") val yesterday_date = 1654520683000L val today_date = 1654607083000L val tomorrow_date = 1654608083000L val testDF_upgrade: DataFrame = sparkSession.createDataFrame( List( (null, yesterday_date, 1, "Clark Kent", "Superman"), (null, yesterday_date, 2, "Bruce Wayne", "Batman"), (null, yesterday_date, 3, "Diana Prince", "Wonder Woman"), (null, yesterday_date, 4, "Hal Jordan", "Green Lantern"), ("I", today_date, 5, "Barry Allen", "The Flash"), ("I", today_date, 6, "Arthur Curry", "Aquaman"), ("D", today_date, 2, "Bruce Wayne", "Detective Comics"), ("U", today_date, 4, "John Stewart", "Green Lantern"), ("U", tomorrow_date, 4, "Guy Gardner", "Green Lantern") ) ).toDF("Op", "ts", "id", "name", "alias") val expectedDF_upgrade = sparkSession.createDataFrame( List( (yesterday_date, 1, "Clark Kent", "Superman"), (yesterday_date, 3, "Diana Prince", "Wonder Woman"), (tomorrow_date, 4, "Guy Gardner", "Green Lantern"), (today_date, 5, "Barry Allen", "The Flash"), (today_date, 6, "Arthur Curry", "Aquaman"), ) ) .toDF("ts", "id", "name", "alias") .orderBy("id") val testDirectory = "file:/tmp/hudi/" val testDatabase = "testDatabase" val testTable = "testTable" val tableName = "upgrade_table" var hudiOptions = scala.collection.mutable.Map[String, String]( HoodieWriteConfig.TABLE_NAME -> tableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "ts", DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY -> classOf[AWSDmsAvroPayload].getName, DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[CustomKeyGenerator].getName, DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "" ) // Write the DataFrame as a Hudi dataset testDF_upgrade .dropDuplicates() .write .format("org.apache.hudi") .options(hudiOptions) .mode(SaveMode.Append) .save(s"$testDirectory/$testDatabase/$tableName") val actualDF_upgrade = sparkSession.read.format("org.apache.hudi").load( s"$testDirectory/$testDatabase/$tableName" ) .orderBy("id") println("=============Expected DF=============") expectedDF_upgrade.show(false) println("=============Actual DF=============") actualDF_upgrade.show(false) assertSmallDatasetEquality(actualDF_upgrade.select("ts", "id", "name", "alias"), expectedDF_upgrade, true, false, true) } ``` This runs succesfully with Hudi 0.10.0: ``` =============test_hudi_upgrade============= =============Expected DF============= +-------------+---+------------+-------------+ |ts |id |name |alias | +-------------+---+------------+-------------+ |1654520683000|1 |Clark Kent |Superman | |1654520683000|3 |Diana Prince|Wonder Woman | |1654608083000|4 |Guy Gardner |Green Lantern| |1654607083000|5 |Barry Allen |The Flash | |1654607083000|6 |Arthur Curry|Aquaman | +-------------+---+------------+-------------+ =============Actual DF============= +-------------------+---------------------+------------------+----------------------+--------------------------------------------------------------------------+----+-------------+---+------------+-------------+ |_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |Op |ts |id |name |alias | +-------------------+---------------------+------------------+----------------------+--------------------------------------------------------------------------+----+-------------+---+------------+-------------+ |20220901100706768 |20220901100706768_0_3|1 | |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|null|1654520683000|1 |Clark Kent |Superman | |20220901100706768 |20220901100706768_0_4|3 | |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|null|1654520683000|3 |Diana Prince|Wonder Woman | |20220901100706768 |20220901100706768_0_5|4 | |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|U |1654608083000|4 |Guy Gardner |Green Lantern| |20220901100706768 |20220901100706768_0_1|5 | |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|I |1654607083000|5 |Barry Allen |The Flash | |20220901100706768 |20220901100706768_0_2|6 | |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|I |1654607083000|6 |Arthur Curry|Aquaman | +-------------------+---------------------+------------------+----------------------+--------------------------------------------------------------------------+----+-------------+---+------------+-------------+ [info] RawCDCMergeTest: [info] - test_hudi_upgrade ``` and fails with Hudi 0.12.0: ``` =============test_hudi_upgrade============= 32131 [consumer-thread-1] ERROR org.apache.hudi.io.HoodieWriteHandle - Error writing record HoodieRecord{key=HoodieKey { recordKey=2 partitionPath=}, currentLocation='null', newLocation='null'} java.util.NoSuchElementException: No value present in Option at org.apache.hudi.common.util.Option.get(Option.java:89) at org.apache.hudi.common.model.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:72) at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90) at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 32345 [pool-6-thread-3-ScalaTest-running-RawCDCMergeTest] ERROR org.apache.hudi.HoodieSparkSqlWriter$ - UPSERT failed with errors [info] RawCDCMergeTest: [info] - test_hudi_upgrade *** FAILED *** [info] org.apache.spark.sql.AnalysisException: cannot resolve '`id`' given input columns: []; [info] 'Sort ['id ASC NULLS FIRST], true [info] +- Relation[] org.apache.hudi.EmptyRelation@5144bb56 [info] at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) [info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155) [info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:341) [info] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:341) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:338) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:407) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:243) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:405) [info] ... ``` Please let me know if you need any other information. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org