This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 62a9279d666 [MINOR] Pass prepped boolean correctly in sql writer 
(#9320)
62a9279d666 is described below

commit 62a9279d666646fd7abe1872857ea2f94fdedd46
Author: Sagar Sumit <sagarsumi...@gmail.com>
AuthorDate: Thu Aug 3 08:22:59 2023 +0530

    [MINOR] Pass prepped boolean correctly in sql writer (#9320)
---
 .../scala/org/apache/hudi/HoodieSparkSqlWriter.scala     |  3 +--
 .../sql/hudi/command/MergeIntoHoodieTableCommand.scala   | 16 ++++++++--------
 .../hudi/TestMergeIntoTableWithNonRecordKeyField.scala   |  3 ---
 3 files changed, 9 insertions(+), 13 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index fcee3fdab49..07b16e1e47d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -404,8 +404,7 @@ object HoodieSparkSqlWriter {
                 hoodieRecords
               }
             client.startCommitWithTime(instantTime, commitActionType)
-            val writeResult = DataSourceUtils.doWriteOperation(client, 
dedupedHoodieRecords, instantTime, operation,
-              isPrepped)
+            val writeResult = DataSourceUtils.doWriteOperation(client, 
dedupedHoodieRecords, instantTime, operation, isPrepped)
             (writeResult, client)
         }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index eba75c95452..f830c552bc8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -24,8 +24,8 @@ import 
org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_NULLABLE
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.model.HoodieAvroRecordMerger
 import org.apache.hudi.common.util.StringUtils
-import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, 
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME}
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, 
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -342,7 +342,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     val tableMetaCols = mergeInto.targetTable.output.filter(a => 
isMetaField(a.name))
     val joinData = 
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable, 
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
     val incomingDataCols = 
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
-    val projectedJoinPlan = if 
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), 
SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
+    // for pkless table, we need to project the meta columns
+    val hasPrimaryKey = 
hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
+    val projectedJoinPlan = if (!hasPrimaryKey || 
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), 
"false") == "true") {
       Project(tableMetaCols ++ incomingDataCols, joinData)
     } else {
       Project(incomingDataCols, joinData)
@@ -619,12 +621,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // default value ("ts")
     // TODO(HUDI-3456) clean up
     val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("")
-
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, 
tableConfig)
-
-    val enableOptimizedMerge = 
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
-      SPARK_SQL_OPTIMIZED_WRITES.defaultValue())
-
+    // for pkless tables, we need to enable optimized merge
+    val hasPrimaryKey = tableConfig.getRecordKeyFields.isPresent
+    val enableOptimizedMerge = if (!hasPrimaryKey) "true" else 
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), 
"false")
     val keyGeneratorClassName = if (enableOptimizedMerge == "true") {
       classOf[MergeIntoKeyGenerator].getCanonicalName
     } else {
@@ -653,7 +653,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
       RECORD_MERGER_IMPLS.key -> classOf[HoodieAvroRecordMerger].getName,
 
-        // NOTE: We have to explicitly override following configs to make sure 
no schema validation is performed
+      // NOTE: We have to explicitly override following configs to make sure 
no schema validation is performed
       //       as schema of the incoming dataset might be diverging from the 
table's schema (full schemas'
       //       compatibility b/w table's schema and incoming one is not 
necessary in this case since we can
       //       be cherry-picking only selected columns from the incoming 
dataset to be inserted/updated in the
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
index dd1d00580dc..48964b37323 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
@@ -172,7 +172,6 @@ class TestMergeIntoTableWithNonRecordKeyField extends 
HoodieSparkSqlTestBase wit
            |    (6, 'a6', 60, 100)
            |""".stripMargin)
 
-      // First merge with a extra input field 'flag' (insert a new record)
       spark.sql(
         s"""
            | merge into $tableName
@@ -219,7 +218,6 @@ class TestMergeIntoTableWithNonRecordKeyField extends 
HoodieSparkSqlTestBase wit
     for (withPrecombine <- Seq(true, false)) {
       withRecordType()(withTempDir { tmp =>
         spark.sql("set hoodie.payload.combined.schema.validate = true")
-        spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
         val tableName = generateTableName
 
         val prekstr = if (withPrecombine) "tblproperties (preCombineField = 
'ts')" else ""
@@ -242,7 +240,6 @@ class TestMergeIntoTableWithNonRecordKeyField extends 
HoodieSparkSqlTestBase wit
              |    (1, 'a1', 10, 100)
              |""".stripMargin)
 
-        // First merge with a extra input field 'flag' (insert a new record)
         spark.sql(
           s"""
              | merge into $tableName

Reply via email to