This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 35bd94c60d [spark] Support partial insert for data evolution merge
into (#6149)
35bd94c60d is described below
commit 35bd94c60d819caf862fa27d069cb4b5399f23e3
Author: YeJunHao <[email protected]>
AuthorDate: Tue Aug 26 14:45:00 2025 +0800
[spark] Support partial insert for data evolution merge into (#6149)
---
.../catalyst/analysis/PaimonMergeIntoBase.scala | 3 --
.../MergeIntoPaimonDataEvolutionTable.scala | 11 +++++--
.../paimon/spark/sql/RowLineageTestBase.scala | 37 ++++++++++++++++++++++
3 files changed, 46 insertions(+), 5 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
index ea69d32751..8a52273eea 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
@@ -110,9 +110,6 @@ trait PaimonMergeIntoBase
u.copy(assignments = alignAssignments(targetOutput, assignments))
case i @ InsertAction(_, assignments) =>
- if (assignments.length != targetOutput.length && dataEvolutionEnabled)
{
- throw new RuntimeException("Can't align the table's columns in
insert clause.")
- }
i.copy(assignments = alignAssignments(targetOutput, assignments))
case _: UpdateStarAction =>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index b5aa82f148..d26426d6c4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -31,7 +31,7 @@ import org.apache.paimon.table.source.DataSplit
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.PaimonUtils._
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference,
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference,
Expression, Literal}
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral,
TrueLiteral}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter}
import org.apache.spark.sql.catalyst.plans.logical._
@@ -258,7 +258,14 @@ case class MergeIntoPaimonDataEvolutionTable(
case insertAction: InsertAction =>
Keep(
insertAction.condition.getOrElse(TrueLiteral),
- insertAction.assignments.map(a => a.value))
+ insertAction.assignments.map(
+ a =>
+ if (
+ !a.value.isInstanceOf[AttributeReference] ||
joinPlan.output.exists(
+ attr => attr.toString().equals(a.value.toString()))
+ ) a.value
+ else Literal(null))
+ )
}.toSeq,
notMatchedBySourceInstructions = Nil,
checkCardinality = false,
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
index da43a2e91b..d8a3516a80 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
@@ -221,6 +221,43 @@ abstract class RowLineageTestBase extends
PaimonSparkTestBase {
}
}
+ test("Data Evolution: insert into table with data-evolution partial insert")
{
+ withTable("s", "t") {
+ sql("CREATE TABLE s (id INT, b INT)")
+ sql("INSERT INTO s VALUES (1, 11), (2, 22)")
+
+ sql(
+ "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c
FROM range(2, 4)")
+
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN NOT MATCHED THEN INSERT (id, b) VALUES (-1, b)
+ |""".stripMargin)
+
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN NOT MATCHED THEN INSERT (b) VALUES (b)
+ |""".stripMargin)
+
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN NOT MATCHED THEN INSERT (id, c) VALUES (3, 4)
+ |""".stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Seq(Row(null, 11, null), Row(-1, 11, null), Row(2, 2, 2), Row(3, 3,
3), Row(3, null, 4))
+ )
+ }
+ }
+
test("Data Evolution: merge into table with data-evolution") {
withTable("s", "t") {
sql("CREATE TABLE s (id INT, b INT)")