This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 42f12a20b5 [spark] keep the rowId unchanged when updating the row 
tracking and deletion vectors table. (#6756)
42f12a20b5 is described below

commit 42f12a20b5c011307d153572306814f112101ed7
Author: Xiao Zhu <[email protected]>
AuthorDate: Sat Dec 6 21:58:56 2025 +0800

    [spark] keep the rowId unchanged when updating the row tracking and 
deletion vectors table. (#6756)
---
 .../spark/commands/UpdatePaimonTableCommand.scala  | 35 +++++------
 .../paimon/spark/sql/RowTrackingTestBase.scala     | 72 ++++++++++++++--------
 2 files changed, 63 insertions(+), 44 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 5d89b3d2f1..32565c1219 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -26,7 +26,7 @@ import org.apache.paimon.table.sink.CommitMessage
 import org.apache.paimon.table.source.DataSplit
 import org.apache.paimon.types.RowKind
 
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{Column, Row, SparkSession}
 import org.apache.spark.sql.PaimonUtils.createDataset
 import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If, 
Literal}
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
@@ -127,32 +127,26 @@ case class UpdatePaimonTableCommand(
   private def writeOnlyUpdatedData(
       sparkSession: SparkSession,
       touchedDataSplits: Array[DataSplit]): Seq[CommitMessage] = {
-    val updateColumns = updateExpressions.zip(relation.output).map {
-      case (update, origin) =>
-        toColumn(update).as(origin.name, origin.metadata)
-    }
+    val updateColumns = getUpdateColumns(sparkSession)
 
     val toUpdateScanRelation = createNewScanPlan(touchedDataSplits, relation, 
Some(condition))
     val data = createDataset(sparkSession, 
toUpdateScanRelation).select(updateColumns: _*)
-    writer.write(data)
+    writer.withRowTracking().write(data)
   }
 
   private def writeUpdatedAndUnchangedData(
       sparkSession: SparkSession,
       toUpdateScanRelation: LogicalPlan): Seq[CommitMessage] = {
+    val updateColumns = getUpdateColumns(sparkSession)
 
-    def rowIdCol = col(ROW_ID_COLUMN)
-
-    def sequenceNumberCol = toColumn(
-      optimizedIf(
-        condition,
-        Literal(null),
-        toExpression(sparkSession, col(SEQUENCE_NUMBER_COLUMN))))
-      .as(SEQUENCE_NUMBER_COLUMN)
+    val data = createDataset(sparkSession, 
toUpdateScanRelation).select(updateColumns: _*)
+    writer.withRowTracking().write(data)
+  }
 
+  private def getUpdateColumns(sparkSession: SparkSession): Seq[Column] = {
     var updateColumns = updateExpressions.zip(relation.output).map {
       case (_, origin) if origin.name == ROW_ID_COLUMN => rowIdCol
-      case (_, origin) if origin.name == SEQUENCE_NUMBER_COLUMN => 
sequenceNumberCol
+      case (_, origin) if origin.name == SEQUENCE_NUMBER_COLUMN => 
sequenceNumberCol(sparkSession)
       case (update, origin) =>
         val updated = optimizedIf(condition, update, origin)
         toColumn(updated).as(origin.name, origin.metadata)
@@ -164,14 +158,19 @@ case class UpdatePaimonTableCommand(
         updateColumns ++= Seq(rowIdCol)
       }
       if (!outputSet.exists(_.name == SEQUENCE_NUMBER_COLUMN)) {
-        updateColumns ++= Seq(sequenceNumberCol)
+        updateColumns ++= Seq(sequenceNumberCol(sparkSession))
       }
     }
 
-    val data = createDataset(sparkSession, 
toUpdateScanRelation).select(updateColumns: _*)
-    writer.withRowTracking().write(data)
+    updateColumns
   }
 
+  private def rowIdCol = col(ROW_ID_COLUMN)
+
+  private def sequenceNumberCol(sparkSession: SparkSession) = toColumn(
+    optimizedIf(condition, Literal(null), toExpression(sparkSession, 
col(SEQUENCE_NUMBER_COLUMN))))
+    .as(SEQUENCE_NUMBER_COLUMN)
+
   private def optimizedIf(
       predicate: Expression,
       trueValue: Expression,
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index aab69e9ea1..611a936b6f 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -73,43 +73,63 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
 
   test("Row Tracking: delete table") {
     withTable("t") {
+      // only enable row tracking
       sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      runAndCheckAnswer()
+      sql("DROP TABLE t")
 
-      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM 
range(1, 4)")
-      sql("DELETE FROM t WHERE id = 2")
-      checkAnswer(
-        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
-        Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 1))
-      )
-      sql("DELETE FROM t WHERE _ROW_ID = 2")
-      checkAnswer(
-        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
-        Seq(Row(1, 1, 0, 1))
-      )
+      // enable row tracking and deletion vectors
+      sql(
+        "CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'deletion-vectors.enabled' = 'true')")
+      runAndCheckAnswer()
+
+      def runAndCheckAnswer(): Unit = {
+        sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM 
range(1, 4)")
+        sql("DELETE FROM t WHERE id = 2")
+        checkAnswer(
+          sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+          Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 1))
+        )
+        sql("DELETE FROM t WHERE _ROW_ID = 2")
+        checkAnswer(
+          sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+          Seq(Row(1, 1, 0, 1))
+        )
+      }
     }
   }
 
   test("Row Tracking: update table") {
     withTable("t") {
+      // only enable row tracking
       sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      runAndCheckAnswer()
+      sql("DROP TABLE t")
 
-      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM 
range(1, 4)")
-      checkAnswer(
-        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
-        Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 1), Row(3, 3, 2, 1))
-      )
+      // enable row tracking and deletion vectors
+      sql(
+        "CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'deletion-vectors.enabled' = 'true')")
+      runAndCheckAnswer()
 
-      sql("UPDATE t SET data = 22 WHERE id = 2")
-      checkAnswer(
-        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
-        Seq(Row(1, 1, 0, 1), Row(2, 22, 1, 2), Row(3, 3, 2, 1))
-      )
+      def runAndCheckAnswer(): Unit = {
+        sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM 
range(1, 4)")
+        checkAnswer(
+          sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+          Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 1), Row(3, 3, 2, 1))
+        )
 
-      sql("UPDATE t SET data = 222 WHERE _ROW_ID = 1")
-      checkAnswer(
-        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
-        Seq(Row(1, 1, 0, 1), Row(2, 222, 1, 3), Row(3, 3, 2, 1))
-      )
+        sql("UPDATE t SET data = 22 WHERE id = 2")
+        checkAnswer(
+          sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+          Seq(Row(1, 1, 0, 1), Row(2, 22, 1, 2), Row(3, 3, 2, 1))
+        )
+
+        sql("UPDATE t SET data = 222 WHERE _ROW_ID = 1")
+        checkAnswer(
+          sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+          Seq(Row(1, 1, 0, 1), Row(2, 222, 1, 3), Row(3, 3, 2, 1))
+        )
+      }
     }
   }
 

Reply via email to