[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836740#comment-17836740
 ] 

Jonathan Vexler edited comment on HUDI-7610 at 4/12/24 8:40 PM:


retest delete where delete precombine is less than insert
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  //val useFGReader = "true"
  val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number")
  val data = Seq((10, "A", 1), (10, "B", 1))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((12, "A", 2))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val deleteData = Seq((9, "B", 2))
  val deletes = spark.createDataFrame(deleteData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val updateData2 = Seq((11, "A", 3),(8, "B", 3))
  val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*)
  updates2.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "number")
  finalDf.show(100, false)
  finalDf.show(100, false)
} {code}
 

 

merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Avro, useFGReader: false, tableType: cow
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|12 |A  |2 |
|8  |B  |3 |
+---+---+--+ {code}
 

merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|12 |A  |2 |
|10 |B  |1 |
+---+---+--+ {code}
 

merger: Avro, useFGReader: true, tableType: mor
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|12 |A  |2 |
+---+---+--+ {code}
 

merger: Spark, useFGReader: true, tableType: mor
NPE

 

merger: Spark, useFGReader: false, tableType: mor
Caused by: org.apache.hudi.exception.HoodieException: Ordering value is null for


was (Author: JIRAUSER295101):
retest delete where delete precombine is less than insert
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  //val useFGReader = "true"
  val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number")
  val data = Seq((10, "A", 1), (10, "B", 1))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.ke

[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836738#comment-17836738
 ] 

Jonathan Vexler edited comment on HUDI-7610 at 4/12/24 8:40 PM:


retest because default payload changed in the last few days 
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  //val merger = classOf[HoodieSparkRecordMerger].getName
  val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  val tableType = "COPY_ON_WRITE"
  //val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number")
  val data = Seq((10, "A", 1), (10, "B", 1))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((9, "A", 2))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val deleteData = Seq((11, "B", 2))
  val deletes = spark.createDataFrame(deleteData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val updateData2 = Seq((11, "A", 3),(9, "B", 3))
  val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*)
  updates2.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "number")
  finalDf.show(100, false)
  finalDf.show(100, false)
} {code}
 

 

merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Avro, useFGReader: false, tableType: cow
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|11 |A  |3 |
|9  |B  |3 |
+---+---+--+ {code}
 

 

merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|11 |A  |3 |
|10 |B  |1 |
+---+---+--+ {code}
 

 

merger: Avro, useFGReader: true, tableType: mor
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|11 |A  |3 |
+---+---+--+ {code}
 

 

merger: Spark, useFGReader: true, tableType: mor
NPE

 

 

merger: Spark, useFGReader: false, tableType: mor
Caused by: org.apache.hudi.exception.HoodieException: Ordering value is null 
for record: null


was (Author: JIRAUSER295101):
retest because default payload changed in the last few days 


{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  //val merger = classOf[HoodieSparkRecordMerger].getName
  val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  val tableType = "COPY_ON_WRITE"
  //val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number")
  val data = Seq((10, "A", 1), (10, "B", 1))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key

[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836735#comment-17836735
 ] 

Jonathan Vexler edited comment on HUDI-7610 at 4/12/24 7:58 PM:


use hoodie is deleted:
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  //val merger = classOf[HoodieSparkRecordMerger].getName
  val merger = classOf[HoodieAvroRecordMerger].getName
  //val useFGReader = "true"
  val useFGReader = "false"
  val tableType = "COPY_ON_WRITE"
  //val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number", "_hoodie_is_deleted")
  val data = Seq((10, "A", 1, false), (10, "B", 1, false))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((9, "A", 2, false),(11, "B", 2, true))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val updateData2 = Seq((11, "A", 3, false),(9, "B", 3, false))
  val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*)
  updates2.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "number", "_hoodie_is_deleted")
  finalDf.show(100, false)
  finalDf.show(100, false)
} {code}
 
merger: Avro, useFGReader: false, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
merger: Spark, useFGReader: false, tableType: cow
{code:java}
+---+---+--+--+
|ts |key|number|_hoodie_is_deleted|
+---+---+--+--+
|11 |A  |3 |false |
|9  |B  |3 |false |
+---+---+--+--+ {code}
 

merger: Avro, useFGReader: false, tableType: mor
merger: Spark, useFGReader: false, tableType: mor
{code:java}
+---+---+--+--+
|ts |key|number|_hoodie_is_deleted|
+---+---+--+--+
|11 |A  |3 |false |
|10 |B  |1 |false |
+---+---+--+--+ {code}
 

 

merger: Avro, useFGReader: true, tableType: mor
merger: Spark, useFGReader: true, tableType: mor
{code:java}
+---+---+--+--+
|ts |key|number|_hoodie_is_deleted|
+---+---+--+--+
|11 |A  |3 |false |
+---+---+--+--+ {code}
 


was (Author: JIRAUSER295101):
use hoodie is deleted:
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  //val merger = classOf[HoodieSparkRecordMerger].getName
  val merger = classOf[HoodieAvroRecordMerger].getName
  //val useFGReader = "true"
  val useFGReader = "false"
  val tableType = "COPY_ON_WRITE"
  //val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number", "_hoodie_is_deleted")
  val data = Seq((10, "A", 1, false), (10, "B", 1, false))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.O

[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Ethan Guo (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836729#comment-17836729
 ] 

Ethan Guo edited comment on HUDI-7610 at 4/12/24 7:03 PM:
--

Based on offline discussion, immediately we see two issues:

(1) DELETE operation does not route preCombine/ordering field values to the 
delete records
(2) HoodieSparkRecordMerger does not handle deletes based on the 
preCombine/ordering field (it mixes the logic of overwrite with the latest, 
i.e., commit time-based, and default hudi payload, i.e., order time-based)

Let's create separate JIRAs for the above issues.

A few more things to try to see if there are other issues:

(1) use UPSERT operation with _hoodie_is_deleted field for deletes
(2) for Avro merger, default payload class is the 
OverwriteWithLatestAvroPayload; change the payload to 
DefaultHoodieRecordPayload and check results


was (Author: guoyihua):
Based on offline discussion, immediately we see two issues:

(1) DELETE operation does not route preCombine/ordering field values to the 
delete records
(2) HoodieSparkRecordMerger does not handle deletes based on the 
preCombine/ordering field (it mixes the logic of overwrite with the latest, 
i.e., commit time-based, and default hudi payload, i.e., order time-based)

A few more things to try to see if there are other issues:

(1) use UPSERT operation with _hoodie_is_deleted field for deletes
(2) for Avro merger, default payload class is the 
OverwriteWithLatestAvroPayload; change the payload to 
DefaultHoodieRecordPayload and check results

> Delete records are inconsistent depending on MOR/COW, Avro/Spark record 
> merger, new filegroup reader enabled/disabled
> -
>
> Key: HUDI-7610
> URL: https://issues.apache.org/jira/browse/HUDI-7610
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: reader-core
>Reporter: Jonathan Vexler
>Priority: Blocker
> Fix For: 1.0.0
>
>
> Here is a test that can be run on master:
>  
> {code:java}
> @Test
> def showDeleteIsInconsistent(): Unit = {
>   val merger = classOf[HoodieSparkRecordMerger].getName
>   //val merger = classOf[HoodieAvroRecordMerger].getName
>   val useFGReader = "true"
>   //val useFGReader = "false"
>   //val tableType = "COPY_ON_WRITE"
>   val tableType = "MERGE_ON_READ"
>   val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
>   val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
> (10, "2", "rider-B", "driver-B", 27.70, 1),
> (10, "3", "rider-C", "driver-C", 33.90, 10),
> (-1, "4", "rider-D", "driver-D", 34.15, 6),
> (10, "5", "rider-E", "driver-E", 17.85, 10))
>   val inserts = spark.createDataFrame(data).toDF(columns: _*)
>   inserts.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(TABLE_TYPE.key(), tableType).
> option("hoodie.table.name", "test_table").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Overwrite).
> save(basePath)
>   val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
> (9, "2", "rider-Y", "driver-Y", 27.70, 7))
>   val updates = spark.createDataFrame(updateData).toDF(columns: _*)
>   updates.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
> option(TABLE_TYPE.key(), tableType).
> option(OPERATION.key(), "upsert").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Append).
> save(basePath)
>   val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
>   val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
>   deletes.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
> option(TABLE_TYPE.key(), tableType).
> option(OPERATION.key(), "delete").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Append).
> save(basePath)
>   val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), 
> (-10, "4", "rider-DD", "driver-DD", 34