[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jonathan Vexler updated HUDI-7610: -- Description: Here is a test that can be run on master: {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "rider", "driver", "fare", "number") val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7), (10, "2", "rider-B", "driver-B", 27.70, 1), (10, "3", "rider-C", "driver-C", 33.90, 10), (-1, "4", "rider-D", "driver-D", 34.15, 6), (10, "5", "rider-E", "driver-E", 17.85, 10)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9), (9, "2", "rider-Y", "driver-Y", 27.70, 7)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6)) val deletes = spark.createDataFrame(deletesData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, "4", "rider-DD", "driver-DD", 34.15, 5)) val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*) secondUpdates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number") finalDf.show(100,false) }{code} There are 4 different outcomes: merger: Avro, useFGReader: false, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow {code:java} +---+---++-+-+--+ |ts |key|rider |driver |fare |number| +---+---++-+-+--+ |11 |1 |rider-X |driver-X |19.1 |9 | |14 |5 |rider-Z |driver-Z |17.85|3 | |10 |3 |rider-C |driver-C |33.9 |10| |10 |2 |rider-B |driver-B |27.7 |1 | |-10|4 |rider-DD|driver-DD|34.15|5 | +---+---++-+-+--+{code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+---++-+--+ |ts |key|rider |driver |fare |number| +---+---+---++-+--+ |11 |1 |rider-X|driver-X|19.1 |9 | |14 |5 |rider-Z|driver-Z|17.85|3 | |-1 |4 |rider-D|driver-D|34.15|6 | |10 |3 |rider-C|driver-C|33.9 |10| |10 |2 |rider-B|driver-B|27.7 |1 | +---+---+---++-+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java}
[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jonathan Vexler updated HUDI-7610: -- Description: Here is a test that can be run on master: {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "rider", "driver", "fare", "number") val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7), (10, "2", "rider-B", "driver-B", 27.70, 1), (10, "3", "rider-C", "driver-C", 33.90, 10), (-1, "4", "rider-D", "driver-D", 34.15, 6), (10, "5", "rider-E", "driver-E", 17.85, 10)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9), (9, "2", "rider-Y", "driver-Y", 27.70, 7)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6)) val deletes = spark.createDataFrame(deletesData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, "4", "rider-DD", "driver-DD", 34.15, 5)) val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*) secondUpdates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number") finalDf.show(100,false) }{code} There are 4 different outcomes: merger: Avro, useFGReader: false, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow {code:java} +---+---++-+-+--+ |ts |key|rider |driver |fare |number| +---+---++-+-+--+ |11 |1 |rider-X |driver-X |19.1 |9 | |14 |5 |rider-Z |driver-Z |17.85|3 | |10 |3 |rider-C |driver-C |33.9 |10| |10 |2 |rider-B |driver-B |27.7 |1 | |-10|4 |rider-DD|driver-DD|34.15|5 | +---+---++-+-+--+{code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+---++-+--+ |ts |key|rider |driver |fare |number| +---+---+---++-+--+ |11 |1 |rider-X|driver-X|19.1 |9 | |14 |5 |rider-Z|driver-Z|17.85|3 | |-1 |4 |rider-D|driver-D|34.15|6 | |10 |3 |rider-C|driver-C|33.9 |10| |10 |2 |rider-B|driver-B|27.7 |1 | +---+---+---++-+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java}
[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jonathan Vexler updated HUDI-7610: -- Description: Here is a test that can be run on master: {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "rider", "driver", "fare", "number") val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7), (10, "2", "rider-B", "driver-B", 27.70, 1), (10, "3", "rider-C", "driver-C", 33.90, 10), (-1, "4", "rider-D", "driver-D", 34.15, 6), (10, "5", "rider-E", "driver-E", 17.85, 10)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9), (9, "2", "rider-Y", "driver-Y", 27.70, 7)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6)) val deletes = spark.createDataFrame(deletesData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, "4", "rider-DD", "driver-DD", 34.15, 5)) val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*) secondUpdates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number") finalDf.show(100,false) }{code} There are 4 different outcomes: merger: Avro, useFGReader: false, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow {code:java} +---+---++-+-+--+ |ts |key|rider |driver |fare |number| +---+---++-+-+--+ |11 |1 |rider-X |driver-X |19.1 |9 | |14 |5 |rider-Z |driver-Z |17.85|3 | |10 |3 |rider-C |driver-C |33.9 |10| |10 |2 |rider-B |driver-B |27.7 |1 | |-10|4 |rider-DD|driver-DD|34.15|5 | +---+---++-+-+--+{code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+---++-+--+ |ts |key|rider |driver |fare |number| +---+---+---++-+--+ |11 |1 |rider-X|driver-X|19.1 |9 | |14 |5 |rider-Z|driver-Z|17.85|3 | |-1 |4 |rider-D|driver-D|34.15|6 | |10 |3 |rider-C|driver-C|33.9 |10| |10 |2 |rider-B|driver-B|27.7 |1 | +---+---+---++-+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java}
[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jonathan Vexler updated HUDI-7610: -- Description: Here is a test that can be run on master: {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "rider", "driver", "fare", "number") val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7), (10, "2", "rider-B", "driver-B", 27.70, 1), (10, "3", "rider-C", "driver-C", 33.90, 10), (-1, "4", "rider-D", "driver-D", 34.15, 6), (10, "5", "rider-E", "driver-E", 17.85, 10)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9), (9, "2", "rider-Y", "driver-Y", 27.70, 7)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6)) val deletes = spark.createDataFrame(deletesData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, "4", "rider-DD", "driver-DD", 34.15, 5)) val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*) secondUpdates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number") finalDf.show(100,false) }{code} There are 4 different outcomes: merger: Avro, useFGReader: false, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow {code:java} +---+---++-+-+--+ |ts |key|rider |driver |fare |number| +---+---++-+-+--+ |11 |1 |rider-X |driver-X |19.1 |9 | |14 |5 |rider-Z |driver-Z |17.85|3 | |10 |3 |rider-C |driver-C |33.9 |10| |10 |2 |rider-B |driver-B |27.7 |1 | |-10|4 |rider-DD|driver-DD|34.15|5 | +---+---++-+-+--+{code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+---++-+--+ |ts |key|rider |driver |fare |number| +---+---+---++-+--+ |11 |1 |rider-X|driver-X|19.1 |9 | |14 |5 |rider-Z|driver-Z|17.85|3 | |-1 |4 |rider-D|driver-D|34.15|6 | |10 |3 |rider-C|driver-C|33.9 |10| |10 |2 |rider-B|driver-B|27.7 |1 | +---+---+---++-+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java}