[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836740#comment-17836740 ] Jonathan Vexler edited comment on HUDI-7610 at 4/12/24 8:40 PM: retest delete where delete precombine is less than insert {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName //val useFGReader = "true" val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number") val data = Seq((10, "A", 1), (10, "B", 1)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((12, "A", 2)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val deleteData = Seq((9, "B", 2)) val deletes = spark.createDataFrame(deleteData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val updateData2 = Seq((11, "A", 3),(8, "B", 3)) val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*) updates2.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "number") finalDf.show(100, false) finalDf.show(100, false) } {code} merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Avro, useFGReader: false, tableType: cow {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |12 |A |2 | |8 |B |3 | +---+---+--+ {code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |12 |A |2 | |10 |B |1 | +---+---+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |12 |A |2 | +---+---+--+ {code} merger: Spark, useFGReader: true, tableType: mor NPE merger: Spark, useFGReader: false, tableType: mor Caused by: org.apache.hudi.exception.HoodieException: Ordering value is null for was (Author: JIRAUSER295101): retest delete where delete precombine is less than insert {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName //val useFGReader = "true" val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number") val data = Seq((10, "A", 1), (10, "B", 1)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.ke
[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836738#comment-17836738 ] Jonathan Vexler edited comment on HUDI-7610 at 4/12/24 8:40 PM: retest because default payload changed in the last few days {code:java} @Test def showDeleteIsInconsistent(): Unit = { //val merger = classOf[HoodieSparkRecordMerger].getName val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" val tableType = "COPY_ON_WRITE" //val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number") val data = Seq((10, "A", 1), (10, "B", 1)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((9, "A", 2)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val deleteData = Seq((11, "B", 2)) val deletes = spark.createDataFrame(deleteData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val updateData2 = Seq((11, "A", 3),(9, "B", 3)) val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*) updates2.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "number") finalDf.show(100, false) finalDf.show(100, false) } {code} merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Avro, useFGReader: false, tableType: cow {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |11 |A |3 | |9 |B |3 | +---+---+--+ {code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |11 |A |3 | |10 |B |1 | +---+---+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |11 |A |3 | +---+---+--+ {code} merger: Spark, useFGReader: true, tableType: mor NPE merger: Spark, useFGReader: false, tableType: mor Caused by: org.apache.hudi.exception.HoodieException: Ordering value is null for record: null was (Author: JIRAUSER295101): retest because default payload changed in the last few days {code:java} @Test def showDeleteIsInconsistent(): Unit = { //val merger = classOf[HoodieSparkRecordMerger].getName val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" val tableType = "COPY_ON_WRITE" //val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number") val data = Seq((10, "A", 1), (10, "B", 1)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key
[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836735#comment-17836735 ] Jonathan Vexler edited comment on HUDI-7610 at 4/12/24 7:58 PM: use hoodie is deleted: {code:java} @Test def showDeleteIsInconsistent(): Unit = { //val merger = classOf[HoodieSparkRecordMerger].getName val merger = classOf[HoodieAvroRecordMerger].getName //val useFGReader = "true" val useFGReader = "false" val tableType = "COPY_ON_WRITE" //val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number", "_hoodie_is_deleted") val data = Seq((10, "A", 1, false), (10, "B", 1, false)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((9, "A", 2, false),(11, "B", 2, true)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val updateData2 = Seq((11, "A", 3, false),(9, "B", 3, false)) val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*) updates2.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "number", "_hoodie_is_deleted") finalDf.show(100, false) finalDf.show(100, false) } {code} merger: Avro, useFGReader: false, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Spark, useFGReader: true, tableType: cow merger: Spark, useFGReader: false, tableType: cow {code:java} +---+---+--+--+ |ts |key|number|_hoodie_is_deleted| +---+---+--+--+ |11 |A |3 |false | |9 |B |3 |false | +---+---+--+--+ {code} merger: Avro, useFGReader: false, tableType: mor merger: Spark, useFGReader: false, tableType: mor {code:java} +---+---+--+--+ |ts |key|number|_hoodie_is_deleted| +---+---+--+--+ |11 |A |3 |false | |10 |B |1 |false | +---+---+--+--+ {code} merger: Avro, useFGReader: true, tableType: mor merger: Spark, useFGReader: true, tableType: mor {code:java} +---+---+--+--+ |ts |key|number|_hoodie_is_deleted| +---+---+--+--+ |11 |A |3 |false | +---+---+--+--+ {code} was (Author: JIRAUSER295101): use hoodie is deleted: {code:java} @Test def showDeleteIsInconsistent(): Unit = { //val merger = classOf[HoodieSparkRecordMerger].getName val merger = classOf[HoodieAvroRecordMerger].getName //val useFGReader = "true" val useFGReader = "false" val tableType = "COPY_ON_WRITE" //val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number", "_hoodie_is_deleted") val data = Seq((10, "A", 1, false), (10, "B", 1, false)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.O
[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836729#comment-17836729 ] Ethan Guo edited comment on HUDI-7610 at 4/12/24 7:03 PM: -- Based on offline discussion, immediately we see two issues: (1) DELETE operation does not route preCombine/ordering field values to the delete records (2) HoodieSparkRecordMerger does not handle deletes based on the preCombine/ordering field (it mixes the logic of overwrite with the latest, i.e., commit time-based, and default hudi payload, i.e., order time-based) Let's create separate JIRAs for the above issues. A few more things to try to see if there are other issues: (1) use UPSERT operation with _hoodie_is_deleted field for deletes (2) for Avro merger, default payload class is the OverwriteWithLatestAvroPayload; change the payload to DefaultHoodieRecordPayload and check results was (Author: guoyihua): Based on offline discussion, immediately we see two issues: (1) DELETE operation does not route preCombine/ordering field values to the delete records (2) HoodieSparkRecordMerger does not handle deletes based on the preCombine/ordering field (it mixes the logic of overwrite with the latest, i.e., commit time-based, and default hudi payload, i.e., order time-based) A few more things to try to see if there are other issues: (1) use UPSERT operation with _hoodie_is_deleted field for deletes (2) for Avro merger, default payload class is the OverwriteWithLatestAvroPayload; change the payload to DefaultHoodieRecordPayload and check results > Delete records are inconsistent depending on MOR/COW, Avro/Spark record > merger, new filegroup reader enabled/disabled > - > > Key: HUDI-7610 > URL: https://issues.apache.org/jira/browse/HUDI-7610 > Project: Apache Hudi > Issue Type: Bug > Components: reader-core >Reporter: Jonathan Vexler >Priority: Blocker > Fix For: 1.0.0 > > > Here is a test that can be run on master: > > {code:java} > @Test > def showDeleteIsInconsistent(): Unit = { > val merger = classOf[HoodieSparkRecordMerger].getName > //val merger = classOf[HoodieAvroRecordMerger].getName > val useFGReader = "true" > //val useFGReader = "false" > //val tableType = "COPY_ON_WRITE" > val tableType = "MERGE_ON_READ" > val columns = Seq("ts", "key", "rider", "driver", "fare", "number") > val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7), > (10, "2", "rider-B", "driver-B", 27.70, 1), > (10, "3", "rider-C", "driver-C", 33.90, 10), > (-1, "4", "rider-D", "driver-D", 34.15, 6), > (10, "5", "rider-E", "driver-E", 17.85, 10)) > val inserts = spark.createDataFrame(data).toDF(columns: _*) > inserts.write.format("hudi"). > option(RECORDKEY_FIELD.key(), "key"). > option(PRECOMBINE_FIELD.key(), "ts"). > option(TABLE_TYPE.key(), tableType). > option("hoodie.table.name", "test_table"). > option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). > option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). > option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). > mode(SaveMode.Overwrite). > save(basePath) > val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9), > (9, "2", "rider-Y", "driver-Y", 27.70, 7)) > val updates = spark.createDataFrame(updateData).toDF(columns: _*) > updates.write.format("hudi"). > option(RECORDKEY_FIELD.key(), "key"). > option(PRECOMBINE_FIELD.key(), "ts"). > option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). > option(TABLE_TYPE.key(), tableType). > option(OPERATION.key(), "upsert"). > option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). > option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). > option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). > mode(SaveMode.Append). > save(basePath) > val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6)) > val deletes = spark.createDataFrame(deletesData).toDF(columns: _*) > deletes.write.format("hudi"). > option(RECORDKEY_FIELD.key(), "key"). > option(PRECOMBINE_FIELD.key(), "ts"). > option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). > option(TABLE_TYPE.key(), tableType). > option(OPERATION.key(), "delete"). > option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). > option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). > option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). > mode(SaveMode.Append). > save(basePath) > val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), > (-10, "4", "rider-DD", "driver-DD", 34