This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a10a44892c [spark] Add MergeIntoTable alignment methods for v2 write 
(#7144)
a10a44892c is described below

commit a10a44892cd5e9dbac705762ed6774674357692f
Author: Kerwin Zhang <[email protected]>
AuthorDate: Tue Feb 3 21:26:19 2026 +0800

    [spark] Add MergeIntoTable alignment methods for v2 write (#7144)
---
 .../analysis/AssignmentAlignmentHelper.scala       |  93 ++++++++++++--
 .../catalyst/analysis/PaimonMergeIntoBase.scala    |   2 +-
 .../spark/sql/AssignmentAlignmentHelperTest.scala  | 143 +++++++++++++++++++++
 3 files changed, 228 insertions(+), 10 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
index 86c6881aa4..13ce64b86b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
@@ -18,11 +18,13 @@
 
 package org.apache.paimon.spark.catalyst.analysis
 
+import 
org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
CreateNamedStruct, Expression, GetStructField, Literal, NamedExpression}
-import org.apache.spark.sql.catalyst.plans.logical.Assignment
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, 
InsertAction, InsertStarAction, MergeAction, MergeIntoTable, UpdateAction, 
UpdateStarAction}
 import org.apache.spark.sql.types.StructType
 
 trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper {
@@ -50,31 +52,76 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with 
ExpressionHelper {
    */
   protected def generateAlignedExpressions(
       attrs: Seq[Attribute],
-      assignments: Seq[Assignment]): Seq[Expression] = {
+      assignments: Seq[Assignment],
+      isInsert: Boolean = false): Seq[Expression] = {
     val attrUpdates = assignments.map(a => AttrUpdate(toRefSeq(a.key), 
a.value))
-    recursiveAlignUpdates(attrs, attrUpdates)
+    recursiveAlignUpdates(attrs, attrUpdates, Nil, isInsert)
   }
 
   protected def alignAssignments(
       attrs: Seq[Attribute],
-      assignments: Seq[Assignment]): Seq[Assignment] = {
-    generateAlignedExpressions(attrs, assignments).zip(attrs).map {
+      assignments: Seq[Assignment],
+      isInsert: Boolean = false): Seq[Assignment] = {
+    generateAlignedExpressions(attrs, assignments, isInsert).zip(attrs).map {
       case (expression, field) => Assignment(field, expression)
     }
   }
 
+  /**
+   * Align assignments in a MergeAction based on the target table's output 
attributes.
+   *   - DeleteAction: returns as-is
+   *   - UpdateAction: aligns assignments for update
+   *   - InsertAction: aligns assignments for insert
+   */
+  protected def alignMergeAction(action: MergeAction, targetOutput: 
Seq[Attribute]): MergeAction = {
+    action match {
+      case d @ DeleteAction(_) => d
+      case u @ UpdateAction(_, assignments) =>
+        u.copy(assignments = alignAssignments(targetOutput, assignments))
+      case i @ InsertAction(_, assignments) =>
+        i.copy(assignments = alignAssignments(targetOutput, assignments, 
isInsert = true))
+      case _: UpdateStarAction =>
+        throw new RuntimeException("UpdateStarAction should not be here.")
+      case _: InsertStarAction =>
+        throw new RuntimeException("InsertStarAction should not be here.")
+      case _ =>
+        throw new RuntimeException(s"Can't recognize this action: $action")
+    }
+  }
+
+  /**
+   * Align all MergeActions in a MergeIntoTable based on the target table's 
output attributes.
+   * Returns a new MergeIntoTable with aligned matchedActions, 
notMatchedActions, and
+   * notMatchedBySourceActions.
+   */
+  protected def alignMergeIntoTable(
+      m: MergeIntoTable,
+      targetOutput: Seq[Attribute]): MergeIntoTable = {
+    m.copy(
+      matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
+      notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, 
targetOutput)),
+      notMatchedBySourceActions = 
m.notMatchedBySourceActions.map(alignMergeAction(_, targetOutput))
+    )
+  }
+
   private def recursiveAlignUpdates(
       targetAttrs: Seq[NamedExpression],
       updates: Seq[AttrUpdate],
-      namePrefix: Seq[String] = Nil): Seq[Expression] = {
+      namePrefix: Seq[String] = Nil,
+      isInsert: Boolean = false): Seq[Expression] = {
 
     // build aligned updated expression for each target attr
     targetAttrs.map {
       targetAttr =>
         val headMatchedUpdates = updates.filter(u => resolver(u.ref.head, 
targetAttr.name))
         if (headMatchedUpdates.isEmpty) {
-          // when no matched update, return the attr as is
-          targetAttr
+          if (isInsert) {
+            // For Insert, use default value or NULL for missing columns
+            getDefaultValueOrNull(targetAttr)
+          } else {
+            // For Update, return the attr as is
+            targetAttr
+          }
         } else {
           val exactMatchedUpdate = headMatchedUpdates.find(_.ref.size == 1)
           if (exactMatchedUpdate.isDefined) {
@@ -101,7 +148,11 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with 
ExpressionHelper {
                 val newUpdates = updates.map(u => u.copy(ref = u.ref.tail))
                 // process StructType's nested fields recursively
                 val updatedFieldExprs =
-                  recursiveAlignUpdates(fieldExprs, newUpdates, namePrefix :+ 
targetAttr.name)
+                  recursiveAlignUpdates(
+                    fieldExprs,
+                    newUpdates,
+                    namePrefix :+ targetAttr.name,
+                    isInsert)
 
                 // build updated struct expression
                 CreateNamedStruct(fields.zip(updatedFieldExprs).flatMap {
@@ -117,4 +168,28 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with 
ExpressionHelper {
     }
   }
 
+  /** Get the default value expression for an attribute, or NULL if no default 
value is defined. */
+  private def getDefaultValueOrNull(attr: NamedExpression): Expression = {
+    attr match {
+      case a: Attribute if 
a.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) =>
+        val defaultValueStr = 
a.metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)
+        parseAndResolveDefaultValue(defaultValueStr, a)
+      case _ =>
+        Literal(null, attr.dataType)
+    }
+  }
+
+  /** Parse the default value string and resolve it to an expression. */
+  private def parseAndResolveDefaultValue(defaultValueStr: String, attr: 
Attribute): Expression = {
+    try {
+      val spark = SparkSession.active
+      val parsed = 
spark.sessionState.sqlParser.parseExpression(defaultValueStr)
+      castIfNeeded(parsed, attr.dataType)
+    } catch {
+      case _: Exception =>
+        // If parsing fails, fall back to NULL
+        Literal(null, attr.dataType)
+    }
+  }
+
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
index 8a52273eea..f9a186e70b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
@@ -110,7 +110,7 @@ trait PaimonMergeIntoBase
         u.copy(assignments = alignAssignments(targetOutput, assignments))
 
       case i @ InsertAction(_, assignments) =>
-        i.copy(assignments = alignAssignments(targetOutput, assignments))
+        i.copy(assignments = alignAssignments(targetOutput, assignments, 
isInsert = true))
 
       case _: UpdateStarAction =>
         throw new RuntimeException(s"UpdateStarAction should not be here.")
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AssignmentAlignmentHelperTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AssignmentAlignmentHelperTest.scala
new file mode 100644
index 0000000000..725e408457
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AssignmentAlignmentHelperTest.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, 
InsertAction, MergeIntoTable, UpdateAction}
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, 
StringType}
+
+import scala.reflect.ClassTag
+
+/**
+ * Test suite for [[AssignmentAlignmentHelper]] methods:
+ *   - alignMergeAction (with isInsert parameter)
+ */
+class AssignmentAlignmentHelperTest extends PaimonSparkTestBase with 
AssignmentAlignmentHelper {
+
+  /** Assert assignment key name and value type. */
+  private def assertAssignment[T <: Expression: ClassTag](
+      assignment: Assignment,
+      expectedKeyName: String): Unit = {
+    assert(assignment.key.asInstanceOf[AttributeReference].name == 
expectedKeyName)
+    val expectedType = implicitly[ClassTag[T]].runtimeClass
+    assert(
+      expectedType.isInstance(assignment.value),
+      s"Expected value type ${expectedType.getSimpleName}, " +
+        s"but got ${assignment.value.getClass.getSimpleName}"
+    )
+  }
+
+  /** Assert assignment key name and literal value. */
+  private def assertLiteralValue(
+      assignment: Assignment,
+      expectedKeyName: String,
+      expectedValue: Any): Unit = {
+    assertAssignment[Literal](assignment, expectedKeyName)
+    assert(
+      assignment.value.asInstanceOf[Literal].value == expectedValue,
+      s"Expected literal value $expectedValue, but got 
${assignment.value.asInstanceOf[Literal].value}"
+    )
+  }
+
+  test("alignMergeAction: DeleteAction should remain unchanged") {
+    val condition = Some(Literal(true))
+    val deleteAction = DeleteAction(condition)
+
+    val targetOutput = Seq(
+      AttributeReference("a", IntegerType)(),
+      AttributeReference("b", IntegerType)(),
+      AttributeReference("c", StringType)()
+    )
+
+    val aligned = alignMergeAction(deleteAction, targetOutput)
+
+    assert(aligned.isInstanceOf[DeleteAction])
+    assert(aligned.asInstanceOf[DeleteAction].condition == condition)
+  }
+
+  test("alignMergeAction: UpdateAction should keep missing columns as-is") {
+    val targetA = AttributeReference("a", IntegerType)()
+    val targetB = AttributeReference("b", IntegerType)()
+    val targetC = AttributeReference("c", StringType)()
+    val targetOutput = Seq(targetA, targetB, targetC)
+
+    // Only update column 'a', 'b' and 'c' should be kept as is
+    val assignments = Seq(Assignment(targetA, Literal(100)))
+    val updateAction = UpdateAction(None, assignments)
+
+    val aligned = alignMergeAction(updateAction, targetOutput)
+
+    assert(aligned.isInstanceOf[UpdateAction])
+    val alignedAssignments = aligned.asInstanceOf[UpdateAction].assignments
+    assert(alignedAssignments.size == 3)
+    assertAssignment[Literal](alignedAssignments(0), "a") // a = 100
+    assertAssignment[AttributeReference](alignedAssignments(1), "b") // b = b 
(unchanged)
+    assertAssignment[AttributeReference](alignedAssignments(2), "c") // c = c 
(unchanged)
+  }
+
+  test("alignMergeAction: InsertAction should use NULL for missing columns") {
+    val targetA = AttributeReference("a", IntegerType)()
+    val targetB = AttributeReference("b", IntegerType)()
+    val targetC = AttributeReference("c", StringType)()
+    val targetOutput = Seq(targetA, targetB, targetC)
+
+    // Only insert column 'a', 'b' and 'c' should be NULL
+    val sourceA = AttributeReference("a", IntegerType)()
+    val assignments = Seq(Assignment(targetA, sourceA))
+    val insertAction = InsertAction(None, assignments)
+
+    val aligned = alignMergeAction(insertAction, targetOutput)
+
+    assert(aligned.isInstanceOf[InsertAction])
+    val alignedAssignments = aligned.asInstanceOf[InsertAction].assignments
+    assert(alignedAssignments.size == 3)
+    assertAssignment[AttributeReference](alignedAssignments(0), "a") // a = 
source.a
+    assertLiteralValue(alignedAssignments(1), "b", null) // b = NULL (isInsert 
mode)
+    assertLiteralValue(alignedAssignments(2), "c", null) // c = NULL (isInsert 
mode)
+  }
+
+  test("alignMergeAction: InsertAction should use default value for missing 
columns") {
+    val targetA = AttributeReference("a", IntegerType)()
+    // Column 'b' has default value 100
+    val metadataWithDefault = new MetadataBuilder()
+      .putString("CURRENT_DEFAULT", "100")
+      .build()
+    val targetB = AttributeReference("b", IntegerType, nullable = true, 
metadataWithDefault)()
+    val targetC = AttributeReference("c", StringType)()
+    val targetOutput = Seq(targetA, targetB, targetC)
+
+    // Only insert column 'a', 'b' should use default value 100, 'c' should be 
NULL
+    val sourceA = AttributeReference("a", IntegerType)()
+    val assignments = Seq(Assignment(targetA, sourceA))
+    val insertAction = InsertAction(None, assignments)
+
+    val aligned = alignMergeAction(insertAction, targetOutput)
+
+    assert(aligned.isInstanceOf[InsertAction])
+    val alignedAssignments = aligned.asInstanceOf[InsertAction].assignments
+    assert(alignedAssignments.size == 3)
+    assertAssignment[AttributeReference](alignedAssignments(0), "a") // a = 
source.a
+    assertLiteralValue(alignedAssignments(1), "b", 100) // b = 100 (default 
value)
+    assertLiteralValue(alignedAssignments(2), "c", null) // c = NULL (no 
default)
+  }
+}

Reply via email to