This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f06310877d [spark] Support UPDATE on Paimon append-only table in spark 
V2 write (#7097)
f06310877d is described below

commit f06310877d76518d9854ce11d4b5b6d497c75aee
Author: Kerwin Zhang <[email protected]>
AuthorDate: Wed Feb 4 12:04:48 2026 +0800

    [spark] Support UPDATE on Paimon append-only table in spark V2 write (#7097)
---
 .../apache/paimon/spark/sql/UpdateTableTest.scala  |  6 +++++
 .../apache/paimon/spark/sql/UpdateTableTest.scala  |  6 +++++
 .../catalyst/analysis/PaimonDeleteTable.scala      |  3 ---
 .../catalyst/analysis/PaimonUpdateTable.scala      | 27 +++++++++++++++-------
 .../spark/catalyst/analysis/RowLevelHelper.scala   | 11 +++++++--
 .../paimon/spark/write/PaimonBatchWrite.scala      |  1 +
 6 files changed, 41 insertions(+), 13 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
index bcc07bcb93..3a0f56cd48 100644
--- 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -25,3 +25,9 @@ class UpdateTableTest extends UpdateTableTestBase {
     super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
   }
 }
+
+class V2UpdateTableTest extends UpdateTableTestBase {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
index bcc07bcb93..3a0f56cd48 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -25,3 +25,9 @@ class UpdateTableTest extends UpdateTableTestBase {
     super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
   }
 }
+
+class V2UpdateTableTest extends UpdateTableTestBase {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 46297ec7a6..7c9aaddc24 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -18,12 +18,9 @@
 
 package org.apache.paimon.spark.catalyst.analysis
 
-import org.apache.paimon.spark.SparkTable
-import 
org.apache.paimon.spark.catalyst.optimizer.OptimizeMetadataOnlyDeleteFromPaimonTable
 import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
 import org.apache.paimon.table.FileStoreTable
 
-import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
index a60647eb5f..27206fb32f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
@@ -22,7 +22,7 @@ import 
org.apache.paimon.spark.commands.UpdatePaimonTableCommand
 import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, LogicalPlan, 
UpdateTable}
 import org.apache.spark.sql.catalyst.rules.Rule
 
 import scala.collection.JavaConverters._
@@ -42,8 +42,6 @@ object PaimonUpdateTable
         table.getTable match {
           case paimonTable: FileStoreTable =>
             val relation = PaimonRelation.getPaimonRelation(u.table)
-            val alignedExpressions =
-              generateAlignedExpressions(relation.output, 
assignments).zip(relation.output)
 
             val primaryKeys = paimonTable.primaryKeys().asScala.toSeq
             if (!validUpdateAssignment(u.table.outputSet, primaryKeys, 
assignments)) {
@@ -55,11 +53,24 @@ object PaimonUpdateTable
                 "Update operation is not supported when data evolution is 
enabled yet.")
             }
 
-            UpdatePaimonTableCommand(
-              relation,
-              paimonTable,
-              condition.getOrElse(TrueLiteral),
-              alignedExpressions)
+            val alignedExpressions =
+              generateAlignedExpressions(relation.output, 
assignments).zip(relation.output)
+
+            val alignedAssignments = alignedExpressions.map {
+              case (expression, field) => Assignment(field, expression)
+            }
+
+            val alignedUpdateTable = u.copy(assignments = alignedAssignments)
+
+            if (!shouldFallbackToV1Update(table, alignedUpdateTable)) {
+              alignedUpdateTable
+            } else {
+              UpdatePaimonTableCommand(
+                relation,
+                paimonTable,
+                condition.getOrElse(TrueLiteral),
+                alignedExpressions)
+            }
 
           case _ =>
             throw new RuntimeException("Update Operation is only supported for 
FileStoreTable.")
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
index b41ceed627..8ec139b607 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
@@ -24,9 +24,9 @@ import org.apache.paimon.table.{FileStoreTable, Table}
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
-import org.apache.spark.sql.catalyst.plans.logical.Assignment
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
 
-trait RowLevelHelper extends SQLConfHelper {
+trait RowLevelHelper extends SQLConfHelper with AssignmentAlignmentHelper {
 
   val operation: RowLevelOp
 
@@ -95,4 +95,11 @@ trait RowLevelHelper extends SQLConfHelper {
       table.getTable.asInstanceOf[FileStoreTable],
       condition)
   }
+
+  /** Determines if DataSourceV2 update is not supported for the given table. 
*/
+  protected def shouldFallbackToV1Update(table: SparkTable, updateTable: 
UpdateTable): Boolean = {
+    shouldFallbackToV1(table) ||
+    !updateTable.rewritable ||
+    !updateTable.aligned
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
index 1b58483e69..d546eebf4c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -112,6 +112,7 @@ case class PaimonBatchWrite(
 
   private def buildDeletedCommitMessage(
       deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = {
+    logInfo(s"[V2 Write] Building deleted commit message for 
${deletedFiles.size} files")
     deletedFiles
       .groupBy(f => (f.partition, f.bucket))
       .map {

Reply via email to