Jonathan Vexler created HUDI-9375:
-------------------------------------
Summary: Partial merging for mor does not work correctly for event
time based merging if different fields are updated
Key: HUDI-9375
URL: https://issues.apache.org/jira/browse/HUDI-9375
Project: Apache Hudi
Issue Type: Bug
Components: spark-sql
Reporter: Jonathan Vexler
For cow this will fail because Seq(1, "a1", 12.0, 1023, "a1: updated desc1"),
the 12.0 should never be updated because the coresponding ordering value 999 is
lower than the base file. But for mor, this test does not fail.
It seems like it will merge the two log files and have the partial result
Seq([EMPTY, [EMPTY], 12.0, 999, [EMPTY]) + Seq([EMPTY], [EMPTY], [EMPTY],
1023, "a1: updated desc1") = Seq([EMPTY], [EMPTY], 12.0, 1023, "a1: updated
desc1")
and will not consider the 999 when merging with the base file
{code:java}
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| _ts int,
| description string
|) using hudi
|tblproperties(
| type ='$tableType',
| primaryKey = 'id',
| preCombineField = '_ts',
| recordMergeMode = '$mergeMode'
|)
|location '$basePath'
""".stripMargin)
val structFields = scala.collection.immutable.List(
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("_ts", IntegerType, nullable = true),
StructField("description", StringType, nullable = true))
spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," +
"(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')")
spark.sql(
s"""
|merge into $tableName t0
|using ( select 1 as id, 'a1' as name, 12.0 as price, 999 as ts
|union select 3 as id, 'a3' as name, 25.0 as price, 1260 as ts) s0
|on t0.id = s0.id
|when matched then update set price = s0.price, _ts = s0.ts
|""".stripMargin)
validateTableSchema(tableName, structFields)
checkAnswer(s"select id, name, price, _ts, description from $tableName")(
Seq(1, "a1", 10.0, 1000, "a1: desc1"),
Seq(2, "a2", 20.0, 1200, "a2: desc2"),
Seq(3, "a3", 25.0, 1260, "a3: desc3")
)
// Partial updates using MERGE INTO statement with changed fields:
"description" and "_ts"
spark.sql(
s"""
|merge into $tableName t0
|using ( select 1 as id, 'a1' as name, 'a1: updated desc1' as
new_description, 1023 as _ts
|union select 2 as id, 'a2' as name, 'a2: updated desc2' as
new_description, 1270 as _ts) s0
|on t0.id = s0.id
|when matched then update set description = s0.new_description, _ts =
s0._ts
|""".stripMargin)
validateTableSchema(tableName, structFields)
checkAnswer(s"select id, name, price, _ts, description from $tableName")(
Seq(1, "a1", 12.0, 1023, "a1: updated desc1"),
Seq(2, "a2", 20.0, 1270, "a2: updated desc2"),
Seq(3, "a3", 25.0, 1260, "a3: desc3")
){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)