This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a10a44892c [spark] Add MergeIntoTable alignment methods for v2 write
(#7144)
a10a44892c is described below
commit a10a44892cd5e9dbac705762ed6774674357692f
Author: Kerwin Zhang <[email protected]>
AuthorDate: Tue Feb 3 21:26:19 2026 +0800
[spark] Add MergeIntoTable alignment methods for v2 write (#7144)
---
.../analysis/AssignmentAlignmentHelper.scala | 93 ++++++++++++--
.../catalyst/analysis/PaimonMergeIntoBase.scala | 2 +-
.../spark/sql/AssignmentAlignmentHelperTest.scala | 143 +++++++++++++++++++++
3 files changed, 228 insertions(+), 10 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
index 86c6881aa4..13ce64b86b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
@@ -18,11 +18,13 @@
package org.apache.paimon.spark.catalyst.analysis
+import
org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
CreateNamedStruct, Expression, GetStructField, Literal, NamedExpression}
-import org.apache.spark.sql.catalyst.plans.logical.Assignment
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction,
InsertAction, InsertStarAction, MergeAction, MergeIntoTable, UpdateAction,
UpdateStarAction}
import org.apache.spark.sql.types.StructType
trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper {
@@ -50,31 +52,76 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with
ExpressionHelper {
*/
protected def generateAlignedExpressions(
attrs: Seq[Attribute],
- assignments: Seq[Assignment]): Seq[Expression] = {
+ assignments: Seq[Assignment],
+ isInsert: Boolean = false): Seq[Expression] = {
val attrUpdates = assignments.map(a => AttrUpdate(toRefSeq(a.key),
a.value))
- recursiveAlignUpdates(attrs, attrUpdates)
+ recursiveAlignUpdates(attrs, attrUpdates, Nil, isInsert)
}
protected def alignAssignments(
attrs: Seq[Attribute],
- assignments: Seq[Assignment]): Seq[Assignment] = {
- generateAlignedExpressions(attrs, assignments).zip(attrs).map {
+ assignments: Seq[Assignment],
+ isInsert: Boolean = false): Seq[Assignment] = {
+ generateAlignedExpressions(attrs, assignments, isInsert).zip(attrs).map {
case (expression, field) => Assignment(field, expression)
}
}
+ /**
+ * Align assignments in a MergeAction based on the target table's output
attributes.
+ * - DeleteAction: returns as-is
+ * - UpdateAction: aligns assignments for update
+ * - InsertAction: aligns assignments for insert
+ */
+ protected def alignMergeAction(action: MergeAction, targetOutput:
Seq[Attribute]): MergeAction = {
+ action match {
+ case d @ DeleteAction(_) => d
+ case u @ UpdateAction(_, assignments) =>
+ u.copy(assignments = alignAssignments(targetOutput, assignments))
+ case i @ InsertAction(_, assignments) =>
+ i.copy(assignments = alignAssignments(targetOutput, assignments,
isInsert = true))
+ case _: UpdateStarAction =>
+ throw new RuntimeException("UpdateStarAction should not be here.")
+ case _: InsertStarAction =>
+ throw new RuntimeException("InsertStarAction should not be here.")
+ case _ =>
+ throw new RuntimeException(s"Can't recognize this action: $action")
+ }
+ }
+
+ /**
+ * Align all MergeActions in a MergeIntoTable based on the target table's
output attributes.
+ * Returns a new MergeIntoTable with aligned matchedActions,
notMatchedActions, and
+ * notMatchedBySourceActions.
+ */
+ protected def alignMergeIntoTable(
+ m: MergeIntoTable,
+ targetOutput: Seq[Attribute]): MergeIntoTable = {
+ m.copy(
+ matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
+ notMatchedActions = m.notMatchedActions.map(alignMergeAction(_,
targetOutput)),
+ notMatchedBySourceActions =
m.notMatchedBySourceActions.map(alignMergeAction(_, targetOutput))
+ )
+ }
+
private def recursiveAlignUpdates(
targetAttrs: Seq[NamedExpression],
updates: Seq[AttrUpdate],
- namePrefix: Seq[String] = Nil): Seq[Expression] = {
+ namePrefix: Seq[String] = Nil,
+ isInsert: Boolean = false): Seq[Expression] = {
// build aligned updated expression for each target attr
targetAttrs.map {
targetAttr =>
val headMatchedUpdates = updates.filter(u => resolver(u.ref.head,
targetAttr.name))
if (headMatchedUpdates.isEmpty) {
- // when no matched update, return the attr as is
- targetAttr
+ if (isInsert) {
+ // For Insert, use default value or NULL for missing columns
+ getDefaultValueOrNull(targetAttr)
+ } else {
+ // For Update, return the attr as is
+ targetAttr
+ }
} else {
val exactMatchedUpdate = headMatchedUpdates.find(_.ref.size == 1)
if (exactMatchedUpdate.isDefined) {
@@ -101,7 +148,11 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with
ExpressionHelper {
val newUpdates = updates.map(u => u.copy(ref = u.ref.tail))
// process StructType's nested fields recursively
val updatedFieldExprs =
- recursiveAlignUpdates(fieldExprs, newUpdates, namePrefix :+
targetAttr.name)
+ recursiveAlignUpdates(
+ fieldExprs,
+ newUpdates,
+ namePrefix :+ targetAttr.name,
+ isInsert)
// build updated struct expression
CreateNamedStruct(fields.zip(updatedFieldExprs).flatMap {
@@ -117,4 +168,28 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with
ExpressionHelper {
}
}
+ /** Get the default value expression for an attribute, or NULL if no default
value is defined. */
+ private def getDefaultValueOrNull(attr: NamedExpression): Expression = {
+ attr match {
+ case a: Attribute if
a.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) =>
+ val defaultValueStr =
a.metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)
+ parseAndResolveDefaultValue(defaultValueStr, a)
+ case _ =>
+ Literal(null, attr.dataType)
+ }
+ }
+
+ /** Parse the default value string and resolve it to an expression. */
+ private def parseAndResolveDefaultValue(defaultValueStr: String, attr:
Attribute): Expression = {
+ try {
+ val spark = SparkSession.active
+ val parsed =
spark.sessionState.sqlParser.parseExpression(defaultValueStr)
+ castIfNeeded(parsed, attr.dataType)
+ } catch {
+ case _: Exception =>
+ // If parsing fails, fall back to NULL
+ Literal(null, attr.dataType)
+ }
+ }
+
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
index 8a52273eea..f9a186e70b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
@@ -110,7 +110,7 @@ trait PaimonMergeIntoBase
u.copy(assignments = alignAssignments(targetOutput, assignments))
case i @ InsertAction(_, assignments) =>
- i.copy(assignments = alignAssignments(targetOutput, assignments))
+ i.copy(assignments = alignAssignments(targetOutput, assignments,
isInsert = true))
case _: UpdateStarAction =>
throw new RuntimeException(s"UpdateStarAction should not be here.")
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AssignmentAlignmentHelperTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AssignmentAlignmentHelperTest.scala
new file mode 100644
index 0000000000..725e408457
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AssignmentAlignmentHelperTest.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction,
InsertAction, MergeIntoTable, UpdateAction}
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder,
StringType}
+
+import scala.reflect.ClassTag
+
+/**
+ * Test suite for [[AssignmentAlignmentHelper]] methods:
+ * - alignMergeAction (with isInsert parameter)
+ */
+class AssignmentAlignmentHelperTest extends PaimonSparkTestBase with
AssignmentAlignmentHelper {
+
+ /** Assert assignment key name and value type. */
+ private def assertAssignment[T <: Expression: ClassTag](
+ assignment: Assignment,
+ expectedKeyName: String): Unit = {
+ assert(assignment.key.asInstanceOf[AttributeReference].name ==
expectedKeyName)
+ val expectedType = implicitly[ClassTag[T]].runtimeClass
+ assert(
+ expectedType.isInstance(assignment.value),
+ s"Expected value type ${expectedType.getSimpleName}, " +
+ s"but got ${assignment.value.getClass.getSimpleName}"
+ )
+ }
+
+ /** Assert assignment key name and literal value. */
+ private def assertLiteralValue(
+ assignment: Assignment,
+ expectedKeyName: String,
+ expectedValue: Any): Unit = {
+ assertAssignment[Literal](assignment, expectedKeyName)
+ assert(
+ assignment.value.asInstanceOf[Literal].value == expectedValue,
+ s"Expected literal value $expectedValue, but got
${assignment.value.asInstanceOf[Literal].value}"
+ )
+ }
+
+ test("alignMergeAction: DeleteAction should remain unchanged") {
+ val condition = Some(Literal(true))
+ val deleteAction = DeleteAction(condition)
+
+ val targetOutput = Seq(
+ AttributeReference("a", IntegerType)(),
+ AttributeReference("b", IntegerType)(),
+ AttributeReference("c", StringType)()
+ )
+
+ val aligned = alignMergeAction(deleteAction, targetOutput)
+
+ assert(aligned.isInstanceOf[DeleteAction])
+ assert(aligned.asInstanceOf[DeleteAction].condition == condition)
+ }
+
+ test("alignMergeAction: UpdateAction should keep missing columns as-is") {
+ val targetA = AttributeReference("a", IntegerType)()
+ val targetB = AttributeReference("b", IntegerType)()
+ val targetC = AttributeReference("c", StringType)()
+ val targetOutput = Seq(targetA, targetB, targetC)
+
+ // Only update column 'a', 'b' and 'c' should be kept as is
+ val assignments = Seq(Assignment(targetA, Literal(100)))
+ val updateAction = UpdateAction(None, assignments)
+
+ val aligned = alignMergeAction(updateAction, targetOutput)
+
+ assert(aligned.isInstanceOf[UpdateAction])
+ val alignedAssignments = aligned.asInstanceOf[UpdateAction].assignments
+ assert(alignedAssignments.size == 3)
+ assertAssignment[Literal](alignedAssignments(0), "a") // a = 100
+ assertAssignment[AttributeReference](alignedAssignments(1), "b") // b = b
(unchanged)
+ assertAssignment[AttributeReference](alignedAssignments(2), "c") // c = c
(unchanged)
+ }
+
+ test("alignMergeAction: InsertAction should use NULL for missing columns") {
+ val targetA = AttributeReference("a", IntegerType)()
+ val targetB = AttributeReference("b", IntegerType)()
+ val targetC = AttributeReference("c", StringType)()
+ val targetOutput = Seq(targetA, targetB, targetC)
+
+ // Only insert column 'a', 'b' and 'c' should be NULL
+ val sourceA = AttributeReference("a", IntegerType)()
+ val assignments = Seq(Assignment(targetA, sourceA))
+ val insertAction = InsertAction(None, assignments)
+
+ val aligned = alignMergeAction(insertAction, targetOutput)
+
+ assert(aligned.isInstanceOf[InsertAction])
+ val alignedAssignments = aligned.asInstanceOf[InsertAction].assignments
+ assert(alignedAssignments.size == 3)
+ assertAssignment[AttributeReference](alignedAssignments(0), "a") // a =
source.a
+ assertLiteralValue(alignedAssignments(1), "b", null) // b = NULL (isInsert
mode)
+ assertLiteralValue(alignedAssignments(2), "c", null) // c = NULL (isInsert
mode)
+ }
+
+ test("alignMergeAction: InsertAction should use default value for missing
columns") {
+ val targetA = AttributeReference("a", IntegerType)()
+ // Column 'b' has default value 100
+ val metadataWithDefault = new MetadataBuilder()
+ .putString("CURRENT_DEFAULT", "100")
+ .build()
+ val targetB = AttributeReference("b", IntegerType, nullable = true,
metadataWithDefault)()
+ val targetC = AttributeReference("c", StringType)()
+ val targetOutput = Seq(targetA, targetB, targetC)
+
+ // Only insert column 'a', 'b' should use default value 100, 'c' should be
NULL
+ val sourceA = AttributeReference("a", IntegerType)()
+ val assignments = Seq(Assignment(targetA, sourceA))
+ val insertAction = InsertAction(None, assignments)
+
+ val aligned = alignMergeAction(insertAction, targetOutput)
+
+ assert(aligned.isInstanceOf[InsertAction])
+ val alignedAssignments = aligned.asInstanceOf[InsertAction].assignments
+ assert(alignedAssignments.size == 3)
+ assertAssignment[AttributeReference](alignedAssignments(0), "a") // a =
source.a
+ assertLiteralValue(alignedAssignments(1), "b", 100) // b = 100 (default
value)
+ assertLiteralValue(alignedAssignments(2), "c", null) // c = NULL (no
default)
+ }
+}