This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 419f881167 [HUDI-4643] MergeInto syntax WHEN MATCHED is optional but 
must be set (#6443)
419f881167 is described below

commit 419f881167bc9800a93d0e121cb376cf092bfb6f
Author: 董可伦 <dongkelu...@inspur.com>
AuthorDate: Sat Aug 20 12:34:46 2022 +0800

    [HUDI-4643] MergeInto syntax WHEN MATCHED is optional but must be set 
(#6443)
---
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  6 +-
 .../spark/sql/hudi/TestMergeIntoTable2.scala       | 94 +++++++++++++++++++++-
 2 files changed, 98 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 6cd96d7064..5676b72aef 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -153,7 +153,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     if (mergeInto.matchedActions.nonEmpty) { // Do the upsert
       executeUpsert(sourceDF, parameters)
     } else { // If there is no match actions in the statement, execute insert 
operation only.
-      executeInsertOnly(sourceDF, parameters)
+      val targetDF = Dataset.ofRows(sparkSession, mergeInto.targetTable)
+      val primaryKeys = 
hoodieCatalogTable.tableConfig.getRecordKeyFieldProp.split(",")
+      // Only records that are not included in the target table can be inserted
+      val insertSourceDF = sourceDF.join(targetDF, primaryKeys,"leftanti")
+      executeInsertOnly(insertSourceDF, parameters)
     }
     sparkSession.catalog.refreshTable(targetTableIdentify.unquotedString)
     Seq.empty[Row]
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index e162368dac..1c2dc0aab6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -258,7 +258,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            | )
        """.stripMargin)
 
-      // Insert data to source table
+      // Insert data
       spark.sql(s"insert into $tableName select 1, 'a1', 1, 10, '2021-03-21'")
       checkAnswer(s"select id, name, price, ts, dt from $tableName")(
         Seq(1, "a1", 1.0, 10, "2021-03-21")
@@ -544,4 +544,96 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test only insert when source table contains history") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  dt string
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // Insert data
+      spark.sql(s"insert into $tableName select 1, 'a1', 1, 10, '2022-08-18'")
+      checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+        Seq(1, "a1", 1.0, 10, "2022-08-18")
+      )
+
+      // Insert data which not matched insert-condition.
+      spark.sql(
+        s"""
+           | merge into $tableName as t0
+           | using (
+           |  select 1 as id, 'a1' as name, 11 as price, 110 as ts, 
'2022-08-19' as dt union all
+           |  select 2 as id, 'a2' as name, 10 as price, 100 as ts, 
'2022-08-18' as dt
+           | ) as s0
+           | on t0.id = s0.id
+           | when not matched then insert *
+         """.stripMargin
+      )
+
+      checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+        Seq(1, "a1", 1.0, 10, "2022-08-18"),
+        Seq(2, "a2", 10.0, 100, "2022-08-18")
+      )
+    }
+  }
+
+  test("Test only insert when source table contains history and target table 
has multiple keys") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create table with multiple keys
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id1 int,
+           |  id2 int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  dt string
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey ='id1,id2',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      spark.sql("set hoodie.merge.allow.duplicate.on.inserts = true")
+      // Insert data
+      spark.sql(s"insert into $tableName select 1, 1, 'a1', 1, 10, 
'2022-08-18'")
+      checkAnswer(s"select id1, id2, name, price, ts, dt from $tableName")(
+        Seq(1, 1, "a1", 1.0, 10, "2022-08-18")
+      )
+
+      // Insert data which not matched insert-condition.
+      spark.sql(
+        s"""
+           | merge into $tableName as t0
+           | using (
+           |  select 1 as id1, 1 as id2, 'a1' as name, 11 as price, 110 as ts, 
'2022-08-19' as dt union all
+           |  select 1 as id1, 2 as id2, 'a2' as name, 10 as price, 100 as ts, 
'2022-08-18' as dt
+           | ) as s0
+           | on t0.id1 = s0.id1
+           | when not matched then insert *
+         """.stripMargin
+      )
+
+      checkAnswer(s"select id1, id2, name, price, ts, dt from $tableName")(
+        Seq(1, 1, "a1", 1.0, 10, "2022-08-18"),
+        Seq(1, 2, "a2", 10.0, 100, "2022-08-18")
+      )
+    }
+  }
+
 }

Reply via email to