This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 00173452f1 [test][spark] Add more row lineage test cases (#6051)
00173452f1 is described below

commit 00173452f1f0b261f08cc543cf5fd66a852ca62f
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Aug 11 12:53:40 2025 +0800

    [test][spark] Add more row lineage test cases (#6051)
---
 docs/content/append-table/row-tracking.md          |  7 +-
 .../spark/commands/UpdatePaimonTableCommand.scala  | 27 ++++---
 .../paimon/spark/sql/RowLineageTestBase.scala      | 94 ++++++++++++++++++++++
 3 files changed, 115 insertions(+), 13 deletions(-)

diff --git a/docs/content/append-table/row-tracking.md 
b/docs/content/append-table/row-tracking.md
index 16c9d76bcb..fd8ffb9a72 100644
--- a/docs/content/append-table/row-tracking.md
+++ b/docs/content/append-table/row-tracking.md
@@ -28,13 +28,13 @@ under the License.
 
 ## What is row tracking
 
-Row tracking allows Paimon to track row-level lineage in a Paimon append 
table. Once enabled on a Paimom table, two more hidden columns will be added to 
the table schema:
+Row tracking allows Paimon to track row-level lineage in a Paimon append 
table. Once enabled on a Paimon table, two more hidden columns will be added to 
the table schema:
 - `_ROW_ID`: BIGINT, this is a unique identifier for each row in the table. It 
is used to track the lineage of the row and can be used to identify the row in 
case of updates or merge into.
 - `_SEQUENCE_NUMBER`: BIGINT, this is field indicates which `version` of this 
record is. It actually is the snapshot-id of the snapshot that this row belongs 
to. It is used to track the lineage of the row version.
 
 ## Enable row tracking
 
-To enable row-tracking, you must config `row-tracking.enabled` to `true` in 
the table options when creating a append table.
+To enable row-tracking, you must config `row-tracking.enabled` to `true` in 
the table options when creating an append table.
 Consider an example via Flink SQL:
 ```sql
 CREATE TABLE part_t (
@@ -111,6 +111,5 @@ You will get:
 `_ROW_ID` and `_SEQUENCE_NUMBER` fields follows the following rules:
 - Whenever we read from one table with row tracking enabled, the `_ROW_ID` and 
`_SEQUENCE_NUMBER` will be `NOT NULL`.
 - If we append records to row-tracking table in the first time, we don't 
actually write them to the data file, they are lazy assigned by committer.
-- If one row moved from one file to another file for **any reasion**, the 
`_ROW_ID` column should be copied to the target file. The `_SEQUENCE_NUMBER` 
field should be set to `NULL` if the record is changed, otherwise, copy it too.
+- If one row moved from one file to another file for **any reason**, the 
`_ROW_ID` column should be copied to the target file. The `_SEQUENCE_NUMBER` 
field should be set to `NULL` if the record is changed, otherwise, copy it too.
 - Whenever we read from a row-tracking table, we firstly read `_ROW_ID` and 
`_SEQUENCE_NUMBER` from the data file, then we read the value columns from the 
data file. If they found `NULL`, we read from `DataFileMeta` to fall back to 
the lazy assigned values. Anyway, it has no way to be `NULL`.
-
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index faf0e389ac..e7d43aa936 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -26,10 +26,10 @@ import org.apache.paimon.table.sink.CommitMessage
 import org.apache.paimon.table.source.DataSplit
 import org.apache.paimon.types.RowKind
 
-import org.apache.spark.sql.{Column, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.PaimonUtils.createDataset
 import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If, 
Literal}
-import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
 import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, 
Project, SupportsSubquery}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.functions.{col, lit}
@@ -150,19 +150,15 @@ case class UpdatePaimonTableCommand(
       toUpdateScanRelation: DataSourceV2Relation): Seq[CommitMessage] = {
     var updateColumns = updateExpressions.zip(relation.output).map {
       case (update, origin) =>
-        val updated = if (condition == TrueLiteral) {
-          update
-        } else {
-          If(condition, update, origin)
-        }
+        val updated = optimizedIf(condition, update, origin)
         toColumn(updated).as(origin.name, origin.metadata)
     }
 
     if (coreOptions.rowTrackingEnabled()) {
-      updateColumns = updateColumns ++ Seq(
+      updateColumns ++= Seq(
         col(SpecialFields.ROW_ID.name()),
         toColumn(
-          If(
+          optimizedIf(
             condition,
             Literal(null),
             toExpression(sparkSession, 
col(SpecialFields.SEQUENCE_NUMBER.name()))))
@@ -173,4 +169,17 @@ case class UpdatePaimonTableCommand(
     val data = createDataset(sparkSession, 
toUpdateScanRelation).select(updateColumns: _*)
     dvSafeWriter.withRowLineage().write(data)
   }
+
+  private def optimizedIf(
+      predicate: Expression,
+      trueValue: Expression,
+      falseValue: Expression): Expression = {
+    if (predicate == TrueLiteral) {
+      trueValue
+    } else if (predicate == FalseLiteral) {
+      falseValue
+    } else {
+      If(predicate, trueValue, falseValue)
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
index 113f6e3ad3..d82f343056 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
@@ -82,6 +82,20 @@ abstract class RowLineageTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  test("Row Lineage: update table without condition") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM 
range(1, 4)")
+
+      sql("UPDATE t SET data = 22")
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+        Seq(Row(1, 22, 0, 2), Row(2, 22, 1, 2), Row(3, 22, 2, 2))
+      )
+    }
+  }
+
   test("Row Lineage: merge into table") {
     withTable("s", "t") {
       sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
@@ -128,4 +142,84 @@ abstract class RowLineageTestBase extends 
PaimonSparkTestBase {
       )
     }
   }
+
+  test("Row Lineage: merge into table with only delete") {
+    withTable("s", "t") {
+      sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      sql("INSERT INTO s VALUES (1, 11), (2, 22)")
+
+      sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM 
range(2, 4)")
+
+      sql("""
+            |MERGE INTO t
+            |USING s
+            |ON t.id = s.id
+            |WHEN MATCHED THEN DELETE
+            |""".stripMargin)
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+        Seq(Row(3, 3, 1, 1))
+      )
+    }
+  }
+
+  test("Row Lineage: merge into table with only update") {
+    withTable("s", "t") {
+      sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      sql("INSERT INTO s VALUES (1, 11), (2, 22)")
+
+      sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM 
range(2, 4)")
+
+      sql("""
+            |MERGE INTO t
+            |USING s
+            |ON t.id = s.id
+            |WHEN MATCHED THEN UPDATE SET *
+            |""".stripMargin)
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+        Seq(Row(2, 22, 0, 2), Row(3, 3, 1, 1))
+      )
+    }
+  }
+
+  test("Row Lineage: merge into table not matched by source") {
+    if (gteqSpark3_4) {
+      withTable("source", "target") {
+        sql(
+          "CREATE TABLE source (a INT, b INT, c STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+        sql(
+          "INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c33'), (5, 
500, 'c55'), (7, 700, 'c77'), (9, 900, 'c99')")
+
+        sql(
+          "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+        sql(
+          "INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 
'c3'), (4, 40, 'c4'), (5, 50, 'c5')")
+
+        sql(s"""
+               |MERGE INTO target
+               |USING source
+               |ON target.a = source.a
+               |WHEN MATCHED AND target.a = 5 THEN UPDATE SET b = source.b + 
target.b
+               |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET *
+               |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a, 
b * 1.1, c)
+               |WHEN NOT MATCHED THEN INSERT *
+               |WHEN NOT MATCHED BY SOURCE AND a = 2 THEN UPDATE SET b = b * 10
+               |WHEN NOT MATCHED BY SOURCE THEN DELETE
+               |""".stripMargin)
+        checkAnswer(
+          sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY a"),
+          Seq(
+            Row(1, 10, "c1", 0, 1),
+            Row(2, 200, "c2", 1, 2),
+            Row(3, 300, "c33", 2, 2),
+            Row(5, 550, "c5", 4, 2),
+            Row(7, 700, "c77", 9, 2),
+            Row(9, 990, "c99", 10, 2))
+        )
+      }
+    }
+  }
 }

Reply via email to