Jonathan Vexler created HUDI-9375:
-------------------------------------

             Summary: Partial merging for mor does not work correctly for event 
time based merging if different fields are updated
                 Key: HUDI-9375
                 URL: https://issues.apache.org/jira/browse/HUDI-9375
             Project: Apache Hudi
          Issue Type: Bug
          Components: spark-sql
            Reporter: Jonathan Vexler


 

For cow this will fail because  Seq(1, "a1", 12.0, 1023, "a1: updated desc1"),

the 12.0 should never be updated because the coresponding ordering value 999 is 
lower than the base file. But for mor, this test does not fail. 
It seems like it will merge the two log files and have the partial result

Seq([EMPTY, [EMPTY], 12.0, 999, [EMPTY])  +  Seq([EMPTY], [EMPTY], [EMPTY], 
1023, "a1: updated desc1") = Seq([EMPTY], [EMPTY], 12.0, 1023, "a1: updated 
desc1")

and will not consider the 999 when merging with the base file
{code:java}
spark.sql(
  s"""
     |create table $tableName (
     | id int,
     | name string,
     | price double,
     | _ts int,
     | description string
     |) using hudi
     |tblproperties(
     | type ='$tableType',
     | primaryKey = 'id',
     | preCombineField = '_ts',
     | recordMergeMode = '$mergeMode'
     |)
     |location '$basePath'
  """.stripMargin)
val structFields = scala.collection.immutable.List(
  StructField("id", IntegerType, nullable = true),
  StructField("name", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("_ts", IntegerType, nullable = true),
  StructField("description", StringType, nullable = true))
spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," +
  "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')")


spark.sql(
  s"""
     |merge into $tableName t0
     |using ( select 1 as id, 'a1' as name, 12.0 as price, 999 as ts
     |union select 3 as id, 'a3' as name, 25.0 as price, 1260 as ts) s0
     |on t0.id = s0.id
     |when matched then update set price = s0.price, _ts = s0.ts
     |""".stripMargin)

validateTableSchema(tableName, structFields)
checkAnswer(s"select id, name, price, _ts, description from $tableName")(
  Seq(1, "a1", 10.0, 1000, "a1: desc1"),
  Seq(2, "a2", 20.0, 1200, "a2: desc2"),
  Seq(3, "a3", 25.0, 1260, "a3: desc3")
) 



// Partial updates using MERGE INTO statement with changed fields: 
"description" and "_ts"
spark.sql(
  s"""
     |merge into $tableName t0
     |using ( select 1 as id, 'a1' as name, 'a1: updated desc1' as 
new_description, 1023 as _ts
     |union select 2 as id, 'a2' as name, 'a2: updated desc2' as 
new_description, 1270 as _ts) s0
     |on t0.id = s0.id
     |when matched then update set description = s0.new_description, _ts = 
s0._ts
     |""".stripMargin)

validateTableSchema(tableName, structFields)
checkAnswer(s"select id, name, price, _ts, description from $tableName")(
  Seq(1, "a1", 12.0, 1023, "a1: updated desc1"),
  Seq(2, "a2", 20.0, 1270, "a2: updated desc2"),
  Seq(3, "a3", 25.0, 1260, "a3: desc3")
){code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to