This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bf015c71fc [spark] Fix duplicate column error when merging on _ROW_ID 
(#6727)
bf015c71fc is described below

commit bf015c71fc2cdad17f946d8127f3ec07f97616ec
Author: wayneli-vt <[email protected]>
AuthorDate: Wed Dec 3 14:55:35 2025 +0800

    [spark] Fix duplicate column error when merging on _ROW_ID (#6727)
---
 .../MergeIntoPaimonDataEvolutionTable.scala        | 19 +++++++++----
 .../paimon/spark/sql/RowTrackingTestBase.scala     | 33 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 6 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index 11cd875413..768e244a38 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -198,8 +198,16 @@ case class MergeIntoPaimonDataEvolutionTable(
     val updateColumnsSorted = updateColumns.toSeq.sortBy(
       s => targetTable.output.map(x => x.toString()).indexOf(s.toString()))
 
-    val assignments = redundantColumns.map(column => Assignment(column, 
column))
-    val output = updateColumnsSorted ++ redundantColumns
+    // Different Spark versions might produce duplicate attributes between 
`output` and
+    // `metadataOutput`, so manually deduplicate by `exprId`.
+    val metadataColumns = (targetRelation.output ++ 
targetRelation.metadataOutput)
+      .filter(attr => attr.name.equals(ROW_ID_NAME))
+      .groupBy(_.exprId)
+      .map { case (_, attrs) => attrs.head }
+      .toSeq
+
+    val assignments = metadataColumns.map(column => Assignment(column, column))
+    val output = updateColumnsSorted ++ metadataColumns
     val realUpdateActions = matchedActions
       .map(s => s.asInstanceOf[UpdateAction])
       .map(
@@ -217,10 +225,9 @@ case class MergeIntoPaimonDataEvolutionTable(
 
     val allReadFieldsOnTarget = allFields.filter(
       field =>
-        targetTable.output.exists(
-          attr => attr.toString().equals(field.toString()))) ++ 
redundantColumns
-    val allReadFieldsOnSource = allFields.filter(
-      field => sourceTable.output.exists(attr => 
attr.toString().equals(field.toString())))
+        targetTable.output.exists(attr => attr.exprId.equals(field.exprId))) 
++ metadataColumns
+    val allReadFieldsOnSource =
+      allFields.filter(field => sourceTable.output.exists(attr => 
attr.exprId.equals(field.exprId)))
 
     val targetReadPlan =
       touchedFileTargetRelation.copy(targetRelation.table, 
allReadFieldsOnTarget.toSeq)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index cda852f3a4..5fb3de0e07 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -327,6 +327,39 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  test("Data Evolution: merge into table with data-evolution on _ROW_ID") {
+    withTable("source", "target") {
+      sql(
+        "CREATE TABLE source (a INT, b INT, c STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+      sql(
+        "INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c33'), (5, 500, 
'c55'), (7, 700, 'c77'), (9, 900, 'c99')")
+
+      sql(
+        "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+      sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 
'c3')")
+
+      sql(s"""
+             |MERGE INTO target
+             |USING source
+             |ON target._ROW_ID = source._ROW_ID
+             |WHEN MATCHED AND target.a = 2 THEN UPDATE SET b = source.b + 
target.b
+             |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET b = source.b, c 
= source.c
+             |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a, b 
* 1.1, c)
+             |WHEN NOT MATCHED THEN INSERT (a, b, c) VALUES (a, b, c)
+             |""".stripMargin)
+
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY a"),
+        Seq(
+          Row(1, 10, "c1", 0, 2),
+          Row(2, 320, "c2", 1, 2),
+          Row(3, 500, "c55", 2, 2),
+          Row(7, 700, "c77", 3, 2),
+          Row(9, 990, "c99", 4, 2))
+      )
+    }
+  }
+
   test("Data Evolution: update table throws exception") {
     withTable("t") {
       sql(

Reply via email to