This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8a7839b682 [spark] Refactor UPDATE command to compute aligned
expressions in analysis phase (#7149)
8a7839b682 is described below
commit 8a7839b6822eb1236f71f7564047c8e75cf82e23
Author: Kerwin Zhang <[email protected]>
AuthorDate: Thu Jan 29 16:41:52 2026 +0800
[spark] Refactor UPDATE command to compute aligned expressions in analysis
phase (#7149)
---
.../paimon/spark/catalyst/analysis/PaimonUpdateTable.scala | 8 ++++++--
.../paimon/spark/commands/UpdatePaimonTableCommand.scala | 12 ++++--------
2 files changed, 10 insertions(+), 10 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
index 63d19379cc..a60647eb5f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
@@ -41,21 +41,25 @@ object PaimonUpdateTable
table.getTable match {
case paimonTable: FileStoreTable =>
+ val relation = PaimonRelation.getPaimonRelation(u.table)
+ val alignedExpressions =
+ generateAlignedExpressions(relation.output,
assignments).zip(relation.output)
+
val primaryKeys = paimonTable.primaryKeys().asScala.toSeq
if (!validUpdateAssignment(u.table.outputSet, primaryKeys,
assignments)) {
throw new RuntimeException("Can't update the primary key
column.")
}
- val relation = PaimonRelation.getPaimonRelation(u.table)
if (paimonTable.coreOptions().dataEvolutionEnabled()) {
throw new RuntimeException(
"Update operation is not supported when data evolution is
enabled yet.")
}
+
UpdatePaimonTableCommand(
relation,
paimonTable,
condition.getOrElse(TrueLiteral),
- assignments)
+ alignedExpressions)
case _ =>
throw new RuntimeException("Update Operation is only supported for
FileStoreTable.")
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 32565c1219..1feb0f1f76 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -18,7 +18,6 @@
package org.apache.paimon.spark.commands
-import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
import org.apache.paimon.spark.schema.PaimonMetadataColumn.{ROW_ID_COLUMN,
SEQUENCE_NUMBER_COLUMN}
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
@@ -28,7 +27,7 @@ import org.apache.paimon.types.RowKind
import org.apache.spark.sql.{Column, Row, SparkSession}
import org.apache.spark.sql.PaimonUtils.createDataset
-import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If,
Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
Expression, If, Literal}
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral,
TrueLiteral}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -38,15 +37,12 @@ case class UpdatePaimonTableCommand(
relation: DataSourceV2Relation,
override val table: FileStoreTable,
condition: Expression,
- assignments: Seq[Assignment])
+ alignedExpressions: Seq[(Expression, Attribute)])
extends PaimonRowLevelCommand
- with AssignmentAlignmentHelper
with SupportsSubquery {
- private lazy val updateExpressions = {
- generateAlignedExpressions(relation.output,
assignments).zip(relation.output).map {
- case (expr, attr) => Alias(expr, attr.name)()
- }
+ private lazy val updateExpressions = alignedExpressions.map {
+ case (expr, attr) => Alias(expr, attr.name)()
}
override def run(sparkSession: SparkSession): Seq[Row] = {