This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6573930d4 [spark] Fix merge into update on source eq target condition
(#3665)
6573930d4 is described below
commit 6573930d4956aca05d67e8ec155ef82b0f2bdfc6
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Jul 3 16:19:10 2024 +0800
[spark] Fix merge into update on source eq target condition (#3665)
---
.../spark/catalyst/analysis/RowLevelHelper.scala | 4 ++--
.../paimon/spark/sql/MergeIntoTableTestBase.scala | 24 ++++++++++++++++++++++
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
index 9981e7d3c..eecf0542e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
@@ -66,8 +66,8 @@ trait RowLevelHelper extends SQLConfHelper {
case Assignment(key, value) => key == value
}
.exists {
- case EqualTo(left: AttributeReference, _) =>
- isTargetPrimaryKey(left)
+ case EqualTo(left: AttributeReference, right: AttributeReference) =>
+ isTargetPrimaryKey(left) || isTargetPrimaryKey(right)
case Assignment(key: AttributeReference, _) =>
isTargetPrimaryKey(key)
case _ => false
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index bf8e72f79..2fa49d1c1 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -621,4 +621,28 @@ abstract class MergeIntoTableTestBase extends
PaimonSparkTestBase {
assert(error.contains("Only support to MergeInto table with primary
keys."))
}
}
+
+ test(s"Paimon MergeInto: update on source eq target condition") {
+ withTable("source", "target") {
+ Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b",
"c").createOrReplaceTempView("source")
+
+ sql(s"""
+ |CREATE TABLE target (a INT, b INT, c STRING)
+ |TBLPROPERTIES ('primary-key'='a')
+ |""".stripMargin)
+ sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')")
+
+ sql(s"""
+ |MERGE INTO target
+ |USING source
+ |ON source.a = target.a
+ |WHEN MATCHED THEN
+ |UPDATE SET a = source.a, b = source.b, c = source.c
+ |""".stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM target ORDER BY a, b"),
+ Row(1, 100, "c11") :: Row(2, 20, "c2") :: Nil)
+ }
+ }
}