This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 601e52245e [spark] Support UPDATE/DELETE by _ROW_ID for row lineage
(#6335)
601e52245e is described below
commit 601e52245e6be9eae7f752d89d58fdd700bdfedf
Author: Juntao Zhang <[email protected]>
AuthorDate: Fri Oct 10 12:00:14 2025 +0800
[spark] Support UPDATE/DELETE by _ROW_ID for row lineage (#6335)
---
docs/content/append-table/row-tracking.md | 4 ++++
.../main/scala/org/apache/paimon/spark/SparkTable.scala | 15 +++++++++------
.../spark/commands/UpdatePaimonTableCommand.scala | 11 ++++++++++-
.../apache/paimon/spark/sql/RowTrackingTestBase.scala | 17 +++++++++++++++++
4 files changed, 40 insertions(+), 7 deletions(-)
diff --git a/docs/content/append-table/row-tracking.md
b/docs/content/append-table/row-tracking.md
index fff59f5edf..c1c815c487 100644
--- a/docs/content/append-table/row-tracking.md
+++ b/docs/content/append-table/row-tracking.md
@@ -74,6 +74,8 @@ You will get the following result:
Then you can update and query the table again:
```sql
UPDATE t SET data = 'new-data-update' WHERE id = 11;
+-- Alternatively, update using the hidden row id `_ROW_ID`
+UPDATE t SET data = 'new-data-update' WHERE _ROW_ID = 0;
SELECT id, data, _ROW_ID, _SEQUENCE_NUMBER FROM t;
```
@@ -110,6 +112,8 @@ You can also delete from the table:
```sql
DELETE FROM t WHERE id = 11;
+-- Alternatively, delete using the hidden row id `_ROW_ID`
+DELETE FROM t WHERE _ROW_ID = 0;
```
You will get:
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index e79e148ebb..8b540127fe 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -122,18 +122,21 @@ case class SparkTable(table: Table)
override def metadataColumns: Array[MetadataColumn] = {
val partitionType = SparkTypeUtils.toSparkPartitionType(table)
- val _metadataColumns = ArrayBuffer[MetadataColumn](
- PaimonMetadataColumn.FILE_PATH,
- PaimonMetadataColumn.ROW_INDEX,
- PaimonMetadataColumn.PARTITION(partitionType),
- PaimonMetadataColumn.BUCKET
- )
+ val _metadataColumns = ArrayBuffer[MetadataColumn]()
if (coreOptions.rowTrackingEnabled()) {
_metadataColumns.append(PaimonMetadataColumn.ROW_ID)
_metadataColumns.append(PaimonMetadataColumn.SEQUENCE_NUMBER)
}
+ _metadataColumns.appendAll(
+ Seq(
+ PaimonMetadataColumn.FILE_PATH,
+ PaimonMetadataColumn.ROW_INDEX,
+ PaimonMetadataColumn.PARTITION(partitionType),
+ PaimonMetadataColumn.BUCKET
+ ))
+
_metadataColumns.toArray
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 8839d5c8ac..dc94777134 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -144,12 +144,21 @@ case class UpdatePaimonTableCommand(
sparkSession: SparkSession,
toUpdateScanRelation: LogicalPlan): Seq[CommitMessage] = {
var updateColumns = updateExpressions.zip(relation.output).map {
+ case (_, origin) if origin.name == ROW_ID_COLUMN =>
+ col(ROW_ID_COLUMN)
+ case (_, origin) if origin.name == SEQUENCE_NUMBER_COLUMN =>
+ toColumn(
+ optimizedIf(
+ condition,
+ Literal(null),
+ toExpression(sparkSession, col(SEQUENCE_NUMBER_COLUMN))))
+ .as(SEQUENCE_NUMBER_COLUMN)
case (update, origin) =>
val updated = optimizedIf(condition, update, origin)
toColumn(updated).as(origin.name, origin.metadata)
}
- if (coreOptions.rowTrackingEnabled()) {
+ if (coreOptions.rowTrackingEnabled() && !relation.outputSet.exists(_.name
== ROW_ID_COLUMN)) {
updateColumns ++= Seq(
col(ROW_ID_COLUMN),
toColumn(
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index c8a54d3342..09fa495fd9 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -74,6 +74,11 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 1))
)
+ sql("DELETE FROM t WHERE _ROW_ID = 2")
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 1, 0, 1))
+ )
}
}
@@ -92,6 +97,18 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
Seq(Row(1, 1, 0, 1), Row(2, 22, 1, 2), Row(3, 3, 2, 1))
)
+
+ sql("UPDATE t SET data = 222 WHERE _ROW_ID = 1")
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 1, 0, 1), Row(2, 222, 1, 3), Row(3, 3, 2, 1))
+ )
+
+ sql("UPDATE t SET data = 111 WHERE _SEQUENCE_NUMBER = 1")
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 111, 0, 4), Row(2, 222, 1, 3), Row(3, 111, 2, 4))
+ )
}
}