This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f06310877d [spark] Support UPDATE on Paimon append-only table in spark
V2 write (#7097)
f06310877d is described below
commit f06310877d76518d9854ce11d4b5b6d497c75aee
Author: Kerwin Zhang <[email protected]>
AuthorDate: Wed Feb 4 12:04:48 2026 +0800
[spark] Support UPDATE on Paimon append-only table in spark V2 write (#7097)
---
.../apache/paimon/spark/sql/UpdateTableTest.scala | 6 +++++
.../apache/paimon/spark/sql/UpdateTableTest.scala | 6 +++++
.../catalyst/analysis/PaimonDeleteTable.scala | 3 ---
.../catalyst/analysis/PaimonUpdateTable.scala | 27 +++++++++++++++-------
.../spark/catalyst/analysis/RowLevelHelper.scala | 11 +++++++--
.../paimon/spark/write/PaimonBatchWrite.scala | 1 +
6 files changed, 41 insertions(+), 13 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
index bcc07bcb93..3a0f56cd48 100644
---
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -25,3 +25,9 @@ class UpdateTableTest extends UpdateTableTestBase {
super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
}
}
+
+class V2UpdateTableTest extends UpdateTableTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+ }
+}
diff --git
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
index bcc07bcb93..3a0f56cd48 100644
---
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
+++
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -25,3 +25,9 @@ class UpdateTableTest extends UpdateTableTestBase {
super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
}
}
+
+class V2UpdateTableTest extends UpdateTableTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 46297ec7a6..7c9aaddc24 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -18,12 +18,9 @@
package org.apache.paimon.spark.catalyst.analysis
-import org.apache.paimon.spark.SparkTable
-import
org.apache.paimon.spark.catalyst.optimizer.OptimizeMetadataOnlyDeleteFromPaimonTable
import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
import org.apache.paimon.table.FileStoreTable
-import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable,
LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
index a60647eb5f..27206fb32f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
@@ -22,7 +22,7 @@ import
org.apache.paimon.spark.commands.UpdatePaimonTableCommand
import org.apache.paimon.table.FileStoreTable
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, LogicalPlan,
UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import scala.collection.JavaConverters._
@@ -42,8 +42,6 @@ object PaimonUpdateTable
table.getTable match {
case paimonTable: FileStoreTable =>
val relation = PaimonRelation.getPaimonRelation(u.table)
- val alignedExpressions =
- generateAlignedExpressions(relation.output,
assignments).zip(relation.output)
val primaryKeys = paimonTable.primaryKeys().asScala.toSeq
if (!validUpdateAssignment(u.table.outputSet, primaryKeys,
assignments)) {
@@ -55,11 +53,24 @@ object PaimonUpdateTable
"Update operation is not supported when data evolution is
enabled yet.")
}
- UpdatePaimonTableCommand(
- relation,
- paimonTable,
- condition.getOrElse(TrueLiteral),
- alignedExpressions)
+ val alignedExpressions =
+ generateAlignedExpressions(relation.output,
assignments).zip(relation.output)
+
+ val alignedAssignments = alignedExpressions.map {
+ case (expression, field) => Assignment(field, expression)
+ }
+
+ val alignedUpdateTable = u.copy(assignments = alignedAssignments)
+
+ if (!shouldFallbackToV1Update(table, alignedUpdateTable)) {
+ alignedUpdateTable
+ } else {
+ UpdatePaimonTableCommand(
+ relation,
+ paimonTable,
+ condition.getOrElse(TrueLiteral),
+ alignedExpressions)
+ }
case _ =>
throw new RuntimeException("Update Operation is only supported for
FileStoreTable.")
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
index b41ceed627..8ec139b607 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
@@ -24,9 +24,9 @@ import org.apache.paimon.table.{FileStoreTable, Table}
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
-import org.apache.spark.sql.catalyst.plans.logical.Assignment
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
-trait RowLevelHelper extends SQLConfHelper {
+trait RowLevelHelper extends SQLConfHelper with AssignmentAlignmentHelper {
val operation: RowLevelOp
@@ -95,4 +95,11 @@ trait RowLevelHelper extends SQLConfHelper {
table.getTable.asInstanceOf[FileStoreTable],
condition)
}
+
+ /** Determines if DataSourceV2 update is not supported for the given table.
*/
+ protected def shouldFallbackToV1Update(table: SparkTable, updateTable:
UpdateTable): Boolean = {
+ shouldFallbackToV1(table) ||
+ !updateTable.rewritable ||
+ !updateTable.aligned
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
index 1b58483e69..d546eebf4c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -112,6 +112,7 @@ case class PaimonBatchWrite(
private def buildDeletedCommitMessage(
deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = {
+ logInfo(s"[V2 Write] Building deleted commit message for
${deletedFiles.size} files")
deletedFiles
.groupBy(f => (f.partition, f.bucket))
.map {