This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 601e52245e [spark] Support UPDATE/DELETE by _ROW_ID for row lineage 
(#6335)
601e52245e is described below

commit 601e52245e6be9eae7f752d89d58fdd700bdfedf
Author: Juntao Zhang <[email protected]>
AuthorDate: Fri Oct 10 12:00:14 2025 +0800

    [spark] Support UPDATE/DELETE by _ROW_ID for row lineage (#6335)
---
 docs/content/append-table/row-tracking.md               |  4 ++++
 .../main/scala/org/apache/paimon/spark/SparkTable.scala | 15 +++++++++------
 .../spark/commands/UpdatePaimonTableCommand.scala       | 11 ++++++++++-
 .../apache/paimon/spark/sql/RowTrackingTestBase.scala   | 17 +++++++++++++++++
 4 files changed, 40 insertions(+), 7 deletions(-)

diff --git a/docs/content/append-table/row-tracking.md 
b/docs/content/append-table/row-tracking.md
index fff59f5edf..c1c815c487 100644
--- a/docs/content/append-table/row-tracking.md
+++ b/docs/content/append-table/row-tracking.md
@@ -74,6 +74,8 @@ You will get the following result:
 Then you can update and query the table again:
 ```sql
 UPDATE t SET data = 'new-data-update' WHERE id = 11;
+-- Alternatively, update using the hidden row id `_ROW_ID`
+UPDATE t SET data = 'new-data-update' WHERE _ROW_ID = 0;
 SELECT id, data, _ROW_ID, _SEQUENCE_NUMBER FROM t;
 ```
 
@@ -110,6 +112,8 @@ You can also delete from the table:
 
 ```sql
 DELETE FROM t WHERE id = 11;
+-- Alternatively, delete using the hidden row id `_ROW_ID`
+DELETE FROM t WHERE _ROW_ID = 0;
 ```
 
 You will get:
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index e79e148ebb..8b540127fe 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -122,18 +122,21 @@ case class SparkTable(table: Table)
   override def metadataColumns: Array[MetadataColumn] = {
     val partitionType = SparkTypeUtils.toSparkPartitionType(table)
 
-    val _metadataColumns = ArrayBuffer[MetadataColumn](
-      PaimonMetadataColumn.FILE_PATH,
-      PaimonMetadataColumn.ROW_INDEX,
-      PaimonMetadataColumn.PARTITION(partitionType),
-      PaimonMetadataColumn.BUCKET
-    )
+    val _metadataColumns = ArrayBuffer[MetadataColumn]()
 
     if (coreOptions.rowTrackingEnabled()) {
       _metadataColumns.append(PaimonMetadataColumn.ROW_ID)
       _metadataColumns.append(PaimonMetadataColumn.SEQUENCE_NUMBER)
     }
 
+    _metadataColumns.appendAll(
+      Seq(
+        PaimonMetadataColumn.FILE_PATH,
+        PaimonMetadataColumn.ROW_INDEX,
+        PaimonMetadataColumn.PARTITION(partitionType),
+        PaimonMetadataColumn.BUCKET
+      ))
+
     _metadataColumns.toArray
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 8839d5c8ac..dc94777134 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -144,12 +144,21 @@ case class UpdatePaimonTableCommand(
       sparkSession: SparkSession,
       toUpdateScanRelation: LogicalPlan): Seq[CommitMessage] = {
     var updateColumns = updateExpressions.zip(relation.output).map {
+      case (_, origin) if origin.name == ROW_ID_COLUMN =>
+        col(ROW_ID_COLUMN)
+      case (_, origin) if origin.name == SEQUENCE_NUMBER_COLUMN =>
+        toColumn(
+          optimizedIf(
+            condition,
+            Literal(null),
+            toExpression(sparkSession, col(SEQUENCE_NUMBER_COLUMN))))
+          .as(SEQUENCE_NUMBER_COLUMN)
       case (update, origin) =>
         val updated = optimizedIf(condition, update, origin)
         toColumn(updated).as(origin.name, origin.metadata)
     }
 
-    if (coreOptions.rowTrackingEnabled()) {
+    if (coreOptions.rowTrackingEnabled() && !relation.outputSet.exists(_.name 
== ROW_ID_COLUMN)) {
       updateColumns ++= Seq(
         col(ROW_ID_COLUMN),
         toColumn(
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index c8a54d3342..09fa495fd9 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -74,6 +74,11 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
         sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
         Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 1))
       )
+      sql("DELETE FROM t WHERE _ROW_ID = 2")
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+        Seq(Row(1, 1, 0, 1))
+      )
     }
   }
 
@@ -92,6 +97,18 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
         sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
         Seq(Row(1, 1, 0, 1), Row(2, 22, 1, 2), Row(3, 3, 2, 1))
       )
+
+      sql("UPDATE t SET data = 222 WHERE _ROW_ID = 1")
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+        Seq(Row(1, 1, 0, 1), Row(2, 222, 1, 3), Row(3, 3, 2, 1))
+      )
+
+      sql("UPDATE t SET data = 111 WHERE _SEQUENCE_NUMBER = 1")
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+        Seq(Row(1, 111, 0, 4), Row(2, 222, 1, 3), Row(3, 111, 2, 4))
+      )
     }
   }
 

Reply via email to