This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 419f881167 [HUDI-4643] MergeInto syntax WHEN MATCHED is optional but must be set (#6443) 419f881167 is described below commit 419f881167bc9800a93d0e121cb376cf092bfb6f Author: 董可伦 <dongkelu...@inspur.com> AuthorDate: Sat Aug 20 12:34:46 2022 +0800 [HUDI-4643] MergeInto syntax WHEN MATCHED is optional but must be set (#6443) --- .../hudi/command/MergeIntoHoodieTableCommand.scala | 6 +- .../spark/sql/hudi/TestMergeIntoTable2.scala | 94 +++++++++++++++++++++- 2 files changed, 98 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 6cd96d7064..5676b72aef 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -153,7 +153,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie if (mergeInto.matchedActions.nonEmpty) { // Do the upsert executeUpsert(sourceDF, parameters) } else { // If there is no match actions in the statement, execute insert operation only. - executeInsertOnly(sourceDF, parameters) + val targetDF = Dataset.ofRows(sparkSession, mergeInto.targetTable) + val primaryKeys = hoodieCatalogTable.tableConfig.getRecordKeyFieldProp.split(",") + // Only records that are not included in the target table can be inserted + val insertSourceDF = sourceDF.join(targetDF, primaryKeys,"leftanti") + executeInsertOnly(insertSourceDF, parameters) } sparkSession.catalog.refreshTable(targetTableIdentify.unquotedString) Seq.empty[Row] diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index e162368dac..1c2dc0aab6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -258,7 +258,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | ) """.stripMargin) - // Insert data to source table + // Insert data spark.sql(s"insert into $tableName select 1, 'a1', 1, 10, '2021-03-21'") checkAnswer(s"select id, name, price, ts, dt from $tableName")( Seq(1, "a1", 1.0, 10, "2021-03-21") @@ -544,4 +544,96 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { } } + test("Test only insert when source table contains history") { + withTempDir { tmp => + val tableName = generateTableName + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey ='id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // Insert data + spark.sql(s"insert into $tableName select 1, 'a1', 1, 10, '2022-08-18'") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 1.0, 10, "2022-08-18") + ) + + // Insert data which not matched insert-condition. + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 1 as id, 'a1' as name, 11 as price, 110 as ts, '2022-08-19' as dt union all + | select 2 as id, 'a2' as name, 10 as price, 100 as ts, '2022-08-18' as dt + | ) as s0 + | on t0.id = s0.id + | when not matched then insert * + """.stripMargin + ) + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 1.0, 10, "2022-08-18"), + Seq(2, "a2", 10.0, 100, "2022-08-18") + ) + } + } + + test("Test only insert when source table contains history and target table has multiple keys") { + withTempDir { tmp => + val tableName = generateTableName + // Create table with multiple keys + spark.sql( + s""" + |create table $tableName ( + | id1 int, + | id2 int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey ='id1,id2', + | preCombineField = 'ts' + | ) + """.stripMargin) + spark.sql("set hoodie.merge.allow.duplicate.on.inserts = true") + // Insert data + spark.sql(s"insert into $tableName select 1, 1, 'a1', 1, 10, '2022-08-18'") + checkAnswer(s"select id1, id2, name, price, ts, dt from $tableName")( + Seq(1, 1, "a1", 1.0, 10, "2022-08-18") + ) + + // Insert data which not matched insert-condition. + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 1 as id1, 1 as id2, 'a1' as name, 11 as price, 110 as ts, '2022-08-19' as dt union all + | select 1 as id1, 2 as id2, 'a2' as name, 10 as price, 100 as ts, '2022-08-18' as dt + | ) as s0 + | on t0.id1 = s0.id1 + | when not matched then insert * + """.stripMargin + ) + + checkAnswer(s"select id1, id2, name, price, ts, dt from $tableName")( + Seq(1, 1, "a1", 1.0, 10, "2022-08-18"), + Seq(1, 2, "a2", 10.0, 100, "2022-08-18") + ) + } + } + }