This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 5af6d70399496ff7b11d574e34b3691f3ab3d034
Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Tue Sep 12 05:52:20 2023 -0500

    [HUDI-6478] Deduce op as upsert for INSERT INTO (#9665)
    
    When users explicitly defines primaryKey and preCombineField when CREATE 
TABLE,
    subsequent INSERT INTO will deduce the operation as UPSERT.
    
    ---------
    
    Co-authored-by: sivabalan <n.siv...@gmail.com>
---
 .../apache/hudi/AutoRecordKeyGenerationUtils.scala |  11 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  31 ++--
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |  48 +++---
 .../sql/hudi/TestAlterTableDropPartition.scala     |   1 -
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 161 ++++++++++++++++-----
 .../spark/sql/hudi/TestTimeTravelTable.scala       |  22 ++-
 6 files changed, 177 insertions(+), 97 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
index 6c1b828f3be..f5bbfbf7fef 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
@@ -20,7 +20,6 @@
 package org.apache.hudi
 
 import org.apache.hudi.DataSourceWriteOptions.{INSERT_DROP_DUPS, 
PRECOMBINE_FIELD}
-import org.apache.hudi.HoodieSparkSqlWriter.getClass
 import org.apache.hudi.common.config.HoodieConfig
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.config.HoodieWriteConfig
@@ -32,9 +31,7 @@ object AutoRecordKeyGenerationUtils {
   private val log = LoggerFactory.getLogger(getClass)
 
   def mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters: Map[String, 
String], hoodieConfig: HoodieConfig): Unit = {
-    val autoGenerateRecordKeys = isAutoGenerateRecordKeys(parameters)
-    // hudi will auto generate.
-    if (autoGenerateRecordKeys) {
+    if (shouldAutoGenerateRecordKeys(parameters)) {
       // de-dup is not supported with auto generation of record keys
       if (parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
         HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean) {
@@ -54,7 +51,9 @@ object AutoRecordKeyGenerationUtils {
     }
   }
 
-  def isAutoGenerateRecordKeys(parameters: Map[String, String]): Boolean = {
-    !parameters.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) // if 
record key is not configured,
+  def shouldAutoGenerateRecordKeys(parameters: Map[String, String]): Boolean = 
{
+    val recordKeyFromTableConfig = 
parameters.getOrElse(HoodieTableConfig.RECORDKEY_FIELDS.key(), "")
+    val recordKeyFromWriterConfig = 
parameters.getOrElse(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "")
+    recordKeyFromTableConfig.isEmpty && recordKeyFromWriterConfig.isEmpty
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 3d043569835..5230c34984f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -17,8 +17,9 @@
 
 package org.apache.hudi
 
+import 
org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
 import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
-import org.apache.hudi.DataSourceWriteOptions.{RECORD_MERGER_IMPLS, _}
+import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieCommonConfig, HoodieConfig, TypedProperties}
 import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
@@ -29,11 +30,10 @@ import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.util.SparkKeyGenUtils
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
 import org.apache.spark.sql.hudi.command.{MergeIntoKeyGenerator, 
SqlKeyGenerator}
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
 import org.slf4j.LoggerFactory
 
-import java.util.Properties
 import scala.collection.JavaConversions.mapAsJavaMap
 import scala.collection.JavaConverters._
 
@@ -43,12 +43,10 @@ import scala.collection.JavaConverters._
 object HoodieWriterUtils {
 
   private val log = LoggerFactory.getLogger(getClass)
+
   /**
-    * Add default options for unspecified write options keys.
-    *
-    * @param parameters
-    * @return
-    */
+   * Add default options for unspecified write options keys.
+   */
   def parametersWithWriteDefaults(parameters: Map[String, String]): 
Map[String, String] = {
     val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala
     val props = TypedProperties.fromMap(parameters)
@@ -94,15 +92,16 @@ object HoodieWriterUtils {
    * Determines whether writes need to take prepped path or regular 
non-prepped path.
    * - For spark-sql writes (UPDATES, DELETES), we could use prepped flow due 
to the presences of meta fields.
    * - For pkless tables, if incoming df has meta fields, we could use prepped 
flow.
+   *
    * @param hoodieConfig hoodie config of interest.
-   * @param parameters raw parameters.
-   * @param operation operation type.
-   * @param df incoming dataframe
+   * @param parameters   raw parameters.
+   * @param operation    operation type.
+   * @param df           incoming dataframe
    * @return true if prepped writes, false otherwise.
    */
-  def canDoPreppedWrites(hoodieConfig: HoodieConfig, parameters: Map[String, 
String], operation : WriteOperationType, df: Dataset[Row]): Boolean = {
+  def canDoPreppedWrites(hoodieConfig: HoodieConfig, parameters: Map[String, 
String], operation: WriteOperationType, df: Dataset[Row]): Boolean = {
     var isPrepped = false
-    if (AutoRecordKeyGenerationUtils.isAutoGenerateRecordKeys(parameters)
+    if (shouldAutoGenerateRecordKeys(parameters)
       && parameters.getOrElse(SPARK_SQL_WRITES_PREPPED_KEY, 
"false").equals("false")
       && parameters.getOrElse(SPARK_SQL_MERGE_INTO_PREPPED_KEY, 
"false").equals("false")
       && 
df.schema.fieldNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
@@ -121,6 +120,7 @@ object HoodieWriterUtils {
   /**
    * Fetch params by translating alternatives if any. Do not set any default 
as this method is intended to be called
    * before validation.
+   *
    * @param parameters hash map of parameters.
    * @return hash map of raw with translated parameters.
    */
@@ -134,8 +134,6 @@ object HoodieWriterUtils {
 
   /**
    * Get the partition columns to stored to hoodie.properties.
-   * @param parameters
-   * @return
    */
   def getPartitionColumns(parameters: Map[String, String]): String = {
     SparkKeyGenUtils.getPartitionColumns(TypedProperties.fromMap(parameters))
@@ -164,7 +162,7 @@ object HoodieWriterUtils {
    * Detects conflicts between new parameters and existing table configurations
    */
   def validateTableConfig(spark: SparkSession, params: Map[String, String],
-      tableConfig: HoodieConfig, isOverWriteMode: Boolean): Unit = {
+                          tableConfig: HoodieConfig, isOverWriteMode: 
Boolean): Unit = {
     // If Overwrite is set as save mode, we don't need to do table config 
validation.
     if (!isOverWriteMode) {
       val resolver = spark.sessionState.conf.resolver
@@ -267,6 +265,7 @@ object HoodieWriterUtils {
     PAYLOAD_CLASS_NAME -> HoodieTableConfig.PAYLOAD_CLASS_NAME,
     RECORD_MERGER_STRATEGY -> HoodieTableConfig.RECORD_MERGER_STRATEGY
   )
+
   def mappingSparkDatasourceConfigsToTableConfigs(options: Map[String, 
String]): Map[String, String] = {
     val includingTableConfigs = scala.collection.mutable.Map() ++ options
     sparkDatasourceConfigsToTableConfigsMap.foreach(kv => {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index f85032790dd..4eb8d2b1d1e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi
 
+import 
org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.toProperties
@@ -28,7 +29,6 @@ import org.apache.hudi.config.{HoodieIndexConfig, 
HoodieInternalConfig, HoodieWr
 import org.apache.hudi.hive.ddl.HiveSyncMode
 import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, 
MultiPartKeysValueExtractor}
 import org.apache.hudi.keygen.ComplexKeyGenerator
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.sql.InsertMode
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.spark.internal.Logging
@@ -96,7 +96,7 @@ trait ProvidesHoodieConfig extends Logging {
       // TODO use HoodieSparkValidateDuplicateKeyRecordMerger when 
SparkRecordMerger is default
       classOf[ValidateDuplicateKeyPayload].getCanonicalName
     } else if (operation == INSERT_OPERATION_OPT_VAL && tableType == 
COW_TABLE_TYPE_OPT_VAL &&
-      insertMode == InsertMode.STRICT){
+      insertMode == InsertMode.STRICT) {
       // Validate duplicate key for inserts to COW table when using strict 
insert mode.
       classOf[ValidateDuplicateKeyPayload].getCanonicalName
     } else {
@@ -108,13 +108,16 @@ trait ProvidesHoodieConfig extends Logging {
    * Deduce the sql write operation for INSERT_INTO
    */
   private def deduceSparkSqlInsertIntoWriteOperation(isOverwritePartition: 
Boolean, isOverwriteTable: Boolean,
-                                                    sqlWriteOperation: 
String): String = {
+                                                     shouldAutoKeyGen: 
Boolean, preCombineField: String,
+                                                     
sparkSqlInsertIntoOperationSet: Boolean, sparkSqlInsertIntoOperation: String): 
String = {
     if (isOverwriteTable) {
       INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
     } else if (isOverwritePartition) {
       INSERT_OVERWRITE_OPERATION_OPT_VAL
+    } else if (!sparkSqlInsertIntoOperationSet && !shouldAutoKeyGen && 
preCombineField.nonEmpty) {
+      UPSERT_OPERATION_OPT_VAL
     } else {
-      sqlWriteOperation
+      sparkSqlInsertIntoOperation
     }
   }
 
@@ -145,7 +148,7 @@ trait ProvidesHoodieConfig extends Logging {
       // if table is pk table and has enableBulkInsert use bulk insert for 
non-strict mode.
       case (true, false, false, _, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
       // if auto record key generation is enabled, use bulk_insert
-      case (_, _, _, _, _,_,true) => BULK_INSERT_OPERATION_OPT_VAL
+      case (_, _, _, _, _, _, true) => BULK_INSERT_OPERATION_OPT_VAL
       // for the rest case, use the insert operation
       case _ => INSERT_OPERATION_OPT_VAL
     }
@@ -182,7 +185,7 @@ trait ProvidesHoodieConfig extends Logging {
     // NOTE: Here we fallback to "" to make sure that null value is not 
overridden with
     // default value ("ts")
     // TODO(HUDI-3456) clean up
-    val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("")
+    val preCombineField = combinedOpts.getOrElse(PRECOMBINE_FIELD.key, "")
 
     val hiveStylePartitioningEnable = 
Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true")
     val urlEncodePartitioning = 
Option(tableConfig.getUrlEncodePartitioning).getOrElse("false")
@@ -193,14 +196,14 @@ trait ProvidesHoodieConfig extends Logging {
       DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
     val dropDuplicate = sparkSession.conf
       
.getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean
-    val autoGenerateRecordKeys : Boolean = 
!combinedOpts.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
+    val shouldAutoKeyGen: Boolean = shouldAutoGenerateRecordKeys(combinedOpts)
 
     val insertMode = 
InsertMode.of(combinedOpts.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
       DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
     val insertModeSet = combinedOpts.contains(SQL_INSERT_MODE.key)
-    val sqlWriteOperationOpt = 
combinedOpts.get(SPARK_SQL_INSERT_INTO_OPERATION.key())
-    val sqlWriteOperationSet = sqlWriteOperationOpt.nonEmpty
-    val sqlWriteOperation = 
sqlWriteOperationOpt.getOrElse(SPARK_SQL_INSERT_INTO_OPERATION.defaultValue())
+    val sparkSqlInsertIntoOperationOpt = 
combinedOpts.get(SPARK_SQL_INSERT_INTO_OPERATION.key())
+    val sparkSqlInsertIntoOperationSet = 
sparkSqlInsertIntoOperationOpt.nonEmpty
+    val sparkSqlInsertIntoOperation = 
sparkSqlInsertIntoOperationOpt.getOrElse(SPARK_SQL_INSERT_INTO_OPERATION.defaultValue())
     val insertDupPolicyOpt = combinedOpts.get(INSERT_DUP_POLICY.key())
     val insertDupPolicySet = insertDupPolicyOpt.nonEmpty
     val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(), 
INSERT_DUP_POLICY.defaultValue())
@@ -208,19 +211,22 @@ trait ProvidesHoodieConfig extends Logging {
     val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
     val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty && 
hoodieCatalogTable.primaryKeys.nonEmpty
 
-    // try to use sql write operation instead of legacy insert mode. If only 
insert mode is explicitly specified, w/o specifying
-    // any value for sql write operation, leagcy configs will be honored. But 
on all other cases (i.e when neither of the configs is set,
-    // or when both configs are set, or when only sql write operation is set), 
we honor sql write operation and ignore
-    // the insert mode.
-    val useLegacyInsertModeFlow = insertModeSet && !sqlWriteOperationSet
+    /*
+     * The sql write operation has higher precedence than the legacy insert 
mode.
+     * Only when the legacy insert mode is explicitly set, without setting sql 
write operation,
+     * legacy configs will be honored. On all other cases (i.e when both are 
set, either is set,
+     * or when only the sql write operation is set), we honor the sql write 
operation.
+     */
+    val useLegacyInsertModeFlow = insertModeSet && 
!sparkSqlInsertIntoOperationSet
     var operation = combinedOpts.getOrElse(OPERATION.key,
       if (useLegacyInsertModeFlow) {
         // NOTE: Target operation could be overridden by the user, therefore 
if it has been provided as an input
         //       we'd prefer that value over auto-deduced operation. 
Otherwise, we deduce target operation type
         deduceOperation(enableBulkInsert, isOverwritePartition, 
isOverwriteTable, dropDuplicate,
-          isNonStrictMode, isPartitionedTable, combineBeforeInsert, 
insertMode, autoGenerateRecordKeys)
+          isNonStrictMode, isPartitionedTable, combineBeforeInsert, 
insertMode, shouldAutoKeyGen)
       } else {
-        deduceSparkSqlInsertIntoWriteOperation(isOverwritePartition, 
isOverwriteTable, sqlWriteOperation)
+        deduceSparkSqlInsertIntoWriteOperation(isOverwritePartition, 
isOverwriteTable,
+          shouldAutoKeyGen, preCombineField, sparkSqlInsertIntoOperationSet, 
sparkSqlInsertIntoOperation)
       }
     )
 
@@ -233,14 +239,14 @@ trait ProvidesHoodieConfig extends Logging {
         Map()
       }
     } else if (operation.equals(INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)) {
-      if (sqlWriteOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || 
enableBulkInsert) {
+      if (sparkSqlInsertIntoOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || 
enableBulkInsert) {
         operation = BULK_INSERT_OPERATION_OPT_VAL
         Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> 
WriteOperationType.INSERT_OVERWRITE_TABLE.value())
       } else {
         Map()
       }
     } else if (operation.equals(INSERT_OVERWRITE_OPERATION_OPT_VAL)) {
-      if (sqlWriteOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || 
enableBulkInsert) {
+      if (sparkSqlInsertIntoOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || 
enableBulkInsert) {
         operation = BULK_INSERT_OPERATION_OPT_VAL
         Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> 
WriteOperationType.INSERT_OVERWRITE.value())
       } else {
@@ -254,7 +260,7 @@ trait ProvidesHoodieConfig extends Logging {
     // w/o specifying any value for insert dup policy, legacy configs will be 
honored. But on all other cases (i.e when neither of the configs is set,
     // or when both configs are set, or when only insert dup policy is set), 
we honor insert dup policy and ignore the insert mode.
     val useLegacyInsertDropDupFlow = insertModeSet && !insertDupPolicySet
-    val payloadClassName =  if (useLegacyInsertDropDupFlow) {
+    val payloadClassName = if (useLegacyInsertDropDupFlow) {
       deducePayloadClassNameLegacy(operation, tableType, insertMode)
     } else {
       if (insertDupPolicy == FAIL_INSERT_DUP_POLICY) {
@@ -304,7 +310,7 @@ trait ProvidesHoodieConfig extends Logging {
       defaultOpts = defaultOpts, overridingOpts = overridingOpts)
   }
 
-  def getDropDupsConfig(useLegacyInsertModeFlow: Boolean, incomingParams : 
Map[String, String]): Map[String, String] = {
+  def getDropDupsConfig(useLegacyInsertModeFlow: Boolean, incomingParams: 
Map[String, String]): Map[String, String] = {
     if (!useLegacyInsertModeFlow) {
       Map(DataSourceWriteOptions.INSERT_DUP_POLICY.key() -> 
incomingParams.getOrElse(DataSourceWriteOptions.INSERT_DUP_POLICY.key(),
         DataSourceWriteOptions.INSERT_DUP_POLICY.defaultValue()),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index b421732d270..2c592f5a815 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -417,7 +417,6 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
         spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", 
"10", "02")""")
 
         checkAnswer(s"select id, name, ts, year, month, day from $tableName")(
-          Seq(2, "l4", "v1", "2021", "10", "02"),
           Seq(2, "l4", "v1", "2021", "10", "02")
         )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index ff2f58982bd..e53a4385efa 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -1727,25 +1727,26 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   /**
-   * When neither of strict mode nor sql.write.operation is set, sql write 
operation takes precedence and default value is chosen.
+   * When neither of strict mode nor sql.write.operation is set, sql write 
operation is deduced as UPSERT
+   * due to presence of preCombineField.
    */
   test("Test sql write operation with INSERT_INTO No explicit configs") {
     spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
     spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
     spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
     spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
-      withRecordType()(withTempDir { tmp =>
-        Seq("cow","mor").foreach {tableType =>
-          withTable(generateTableName) { tableName =>
-            ingestAndValidateData(tableType, tableName, tmp)
-          }
+    withRecordType()(withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        withTable(generateTableName) { tableName =>
+          ingestAndValidateData(tableType, tableName, tmp, 
WriteOperationType.UPSERT)
         }
-      })
+      }
+    })
   }
 
   test("Test sql write operation with INSERT_INTO override both strict mode 
and sql write operation") {
     withRecordType()(withTempDir { tmp =>
-      Seq("cow","mor").foreach { tableType =>
+      Seq("cow", "mor").foreach { tableType =>
         Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT, 
WriteOperationType.UPSERT).foreach { operation =>
           withTable(generateTableName) { tableName =>
             ingestAndValidateData(tableType, tableName, tmp, operation,
@@ -1758,7 +1759,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
 
   test("Test sql write operation with INSERT_INTO override only sql write 
operation") {
     withRecordType()(withTempDir { tmp =>
-      Seq("cow","mor").foreach {tableType =>
+      Seq("cow", "mor").foreach { tableType =>
         Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT, 
WriteOperationType.UPSERT).foreach { operation =>
           withTable(generateTableName) { tableName =>
             ingestAndValidateData(tableType, tableName, tmp, operation,
@@ -1772,11 +1773,11 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   test("Test sql write operation with INSERT_INTO override only strict mode") {
     spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
     spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
-    spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
+    
spark.sessionState.conf.unsetConf(DataSourceWriteOptions.INSERT_DUP_POLICY.key())
     spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
     spark.sessionState.conf.unsetConf("hoodie.sql.bulk.insert.enable")
     withRecordType()(withTempDir { tmp =>
-      Seq("cow","mor").foreach {tableType =>
+      Seq("cow", "mor").foreach { tableType =>
         withTable(generateTableName) { tableName =>
           ingestAndValidateData(tableType, tableName, tmp, 
WriteOperationType.UPSERT,
             List("set hoodie.sql.insert.mode = upsert"))
@@ -1786,7 +1787,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   def ingestAndValidateData(tableType: String, tableName: String, tmp: File,
-                            expectedOperationtype: WriteOperationType = 
WriteOperationType.INSERT,
+                            expectedOperationtype: WriteOperationType,
                             setOptions: List[String] = List.empty) : Unit = {
     setOptions.foreach(entry => {
       spark.sql(entry)
@@ -1851,14 +1852,94 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
   }
 
+  test("Test sql write operation with INSERT_INTO No explicit configs No 
Precombine") {
+    spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
+    spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
+    spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
+    spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
+    withRecordType()(withTempDir { tmp =>
+      Seq("cow","mor").foreach { tableType =>
+        withTable(generateTableName) { tableName =>
+          ingestAndValidateDataNoPrecombine(tableType, tableName, tmp, 
WriteOperationType.INSERT)
+        }
+      }
+    })
+  }
+
+  def ingestAndValidateDataNoPrecombine(tableType: String, tableName: String, 
tmp: File,
+                            expectedOperationtype: WriteOperationType,
+                            setOptions: List[String] = List.empty) : Unit = {
+    setOptions.foreach(entry => {
+      spark.sql(entry)
+    })
+
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  dt string
+         |) using hudi
+         | tblproperties (
+         |  type = '$tableType',
+         |  primaryKey = 'id'
+         | )
+         | partitioned by (dt)
+         | location '${tmp.getCanonicalPath}/$tableName'
+         """.stripMargin)
+
+    spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')")
+
+    assertResult(expectedOperationtype) {
+      getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+    }
+    checkAnswer(s"select id, name, price, dt from $tableName")(
+      Seq(1, "a1", 10.0, "2021-07-18")
+    )
+
+    // insert record again but w/ diff values but same primary key.
+    spark.sql(
+      s"""
+         | insert into $tableName values
+         | (1, 'a1_1', 10, "2021-07-18"),
+         | (2, 'a2', 20, "2021-07-18"),
+         | (2, 'a2_2', 30, "2021-07-18")
+              """.stripMargin)
+
+    assertResult(expectedOperationtype) {
+      getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+    }
+    if (expectedOperationtype == WriteOperationType.UPSERT) {
+      // dedup should happen within same batch being ingested and existing 
records on storage should get updated
+      checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+        Seq(1, "a1_1", 10.0, "2021-07-18"),
+        Seq(2, "a2_2", 30.0, "2021-07-18")
+      )
+    } else {
+      // no dedup across batches
+      checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+        Seq(1, "a1", 10.0, "2021-07-18"),
+        Seq(1, "a1_1", 10.0, "2021-07-18"),
+        Seq(2, "a2", 20.0, "2021-07-18"),
+        Seq(2, "a2_2", 30.0, "2021-07-18")
+      )
+    }
+    spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
+    spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
+    spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
+    spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
+  }
+
   test("Test insert dup policy with INSERT_INTO explicit new configs INSERT 
operation ") {
     withRecordType()(withTempDir { tmp =>
-      Seq("cow","mor").foreach {tableType =>
+      Seq("cow", "mor").foreach { tableType =>
         val operation = WriteOperationType.INSERT
-          Seq(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY).foreach { 
dupPolicy =>
+        Seq(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY).foreach { 
dupPolicy =>
           withTable(generateTableName) { tableName =>
             ingestAndValidateDataDupPolicy(tableType, tableName, tmp, 
operation,
-              List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " + 
operation.value(), "set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " 
= " + dupPolicy),
+              List(s"set 
${SPARK_SQL_INSERT_INTO_OPERATION.key}=${operation.value}",
+                s"set 
${DataSourceWriteOptions.INSERT_DUP_POLICY.key}=$dupPolicy"),
               dupPolicy)
           }
         }
@@ -1868,27 +1949,27 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
 
   test("Test insert dup policy with INSERT_INTO explicit new configs 
BULK_INSERT operation ") {
     withRecordType()(withTempDir { tmp =>
-      Seq("cow").foreach {tableType =>
+      Seq("cow").foreach { tableType =>
         val operation = WriteOperationType.BULK_INSERT
         val dupPolicy = NONE_INSERT_DUP_POLICY
-            withTable(generateTableName) { tableName =>
-              ingestAndValidateDataDupPolicy(tableType, tableName, tmp, 
operation,
-                List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " + 
operation.value(), "set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " 
= " + dupPolicy),
-                dupPolicy)
-            }
+        withTable(generateTableName) { tableName =>
+          ingestAndValidateDataDupPolicy(tableType, tableName, tmp, operation,
+            List(s"set 
${SPARK_SQL_INSERT_INTO_OPERATION.key}=${operation.value}",
+              s"set 
${DataSourceWriteOptions.INSERT_DUP_POLICY.key}=$dupPolicy"),
+            dupPolicy)
+        }
       }
     })
   }
 
   test("Test DROP insert dup policy with INSERT_INTO explicit new configs BULK 
INSERT operation") {
     withRecordType(Seq(HoodieRecordType.AVRO))(withTempDir { tmp =>
-      Seq("cow").foreach {tableType =>
-        val operation = WriteOperationType.BULK_INSERT
+      Seq("cow").foreach { tableType =>
         val dupPolicy = DROP_INSERT_DUP_POLICY
         withTable(generateTableName) { tableName =>
-          ingestAndValidateDropDupPolicyBulkInsert(tableType, tableName, tmp, 
operation,
-            List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " + 
operation.value(),
-              "set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " 
+ dupPolicy))
+          ingestAndValidateDropDupPolicyBulkInsert(tableType, tableName, tmp,
+            List(s"set 
${SPARK_SQL_INSERT_INTO_OPERATION.key}=${WriteOperationType.BULK_INSERT.value}",
+              s"set 
${DataSourceWriteOptions.INSERT_DUP_POLICY.key}=$dupPolicy"))
         }
       }
     })
@@ -1896,22 +1977,24 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
 
   test("Test FAIL insert dup policy with INSERT_INTO explicit new configs") {
     withRecordType(Seq(HoodieRecordType.AVRO))(withTempDir { tmp =>
-      Seq("cow").foreach {tableType =>
+      Seq("cow").foreach { tableType =>
         val operation = WriteOperationType.UPSERT
         val dupPolicy = FAIL_INSERT_DUP_POLICY
-            withTable(generateTableName) { tableName =>
-              ingestAndValidateDataDupPolicy(tableType, tableName, tmp, 
operation,
-                List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " + 
operation.value(), "set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " 
= " + dupPolicy),
-                dupPolicy, true)
-            }
-          }
+        withTable(generateTableName) { tableName =>
+          ingestAndValidateDataDupPolicy(tableType, tableName, tmp, operation,
+            List(s"set 
${SPARK_SQL_INSERT_INTO_OPERATION.key}=${operation.value}",
+              s"set 
${DataSourceWriteOptions.INSERT_DUP_POLICY.key}=$dupPolicy"),
+            dupPolicy, true)
+        }
+      }
     })
   }
 
   def ingestAndValidateDataDupPolicy(tableType: String, tableName: String, 
tmp: File,
-                            expectedOperationtype: WriteOperationType = 
WriteOperationType.INSERT,
-                            setOptions: List[String] = List.empty, 
insertDupPolicy : String = NONE_INSERT_DUP_POLICY,
-                                    expectExceptionOnSecondBatch: Boolean = 
false) : Unit = {
+                                     expectedOperationtype: WriteOperationType 
= WriteOperationType.INSERT,
+                                     setOptions: List[String] = List.empty,
+                                     insertDupPolicy : String = 
NONE_INSERT_DUP_POLICY,
+                                     expectExceptionOnSecondBatch: Boolean = 
false) : Unit = {
 
     // set additional options
     setOptions.foreach(entry => {
@@ -2010,8 +2093,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   def ingestAndValidateDropDupPolicyBulkInsert(tableType: String, tableName: 
String, tmp: File,
-                                     expectedOperationtype: WriteOperationType 
= WriteOperationType.BULK_INSERT,
-                                     setOptions: List[String] = List.empty) : 
Unit = {
+                                               setOptions: List[String] = 
List.empty) : Unit = {
 
     // set additional options
     setOptions.foreach(entry => {
@@ -2027,8 +2109,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
          |) using hudi
          | tblproperties (
          |  type = '$tableType',
-         |  primaryKey = 'id',
-         |  preCombine = 'name'
+         |  primaryKey = 'id'
          | )
          | partitioned by (dt)
          | location '${tmp.getCanonicalPath}/$tableName'
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
index a2fb0c80fad..73bad3be282 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
@@ -41,24 +41,24 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
              | location '${tmp.getCanonicalPath}/$tableName1'
        """.stripMargin)
 
+        // 1st commit instant
         spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
 
         val metaClient1 = HoodieTableMetaClient.builder()
           .setBasePath(s"${tmp.getCanonicalPath}/$tableName1")
           .setConf(spark.sessionState.newHadoopConf())
           .build()
-
         val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline
           .lastInstant().get().getTimestamp
 
+        // 2nd commit instant
         spark.sql(s"insert into $tableName1 values(1, 'a2', 20, 2000)")
 
         checkAnswer(s"select id, name, price, ts from $tableName1")(
-          Seq(1, "a1", 10.0, 1000),
           Seq(1, "a2", 20.0, 2000)
         )
 
-        // time travel from instant1
+        // time travel as of instant 1
         checkAnswer(
           s"select id, name, price, ts from $tableName1 TIMESTAMP AS OF 
'$instant1'")(
           Seq(1, "a1", 10.0, 1000)
@@ -194,11 +194,6 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
             Seq(2, "a2", 20.0, 1000)
           )
 
-          checkAnswer(s"select id, name, price, ts from $tableName1")(
-            Seq(1, "a1", 10.0, 1000),
-            Seq(2, "a2", 20.0, 1000)
-          )
-
           spark.sql(s"insert into $tableName2 values(3, 'a3', 10, 1000)")
           spark.sql(s"insert into $tableName2 values(4, 'a4', 20, 1000)")
 
@@ -272,25 +267,26 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
              | location '${tmp.getCanonicalPath}/$tableName'
        """.stripMargin)
 
+        // 1st commit instant
         spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
 
         val metaClient = HoodieTableMetaClient.builder()
           .setBasePath(s"${tmp.getCanonicalPath}/$tableName")
           .setConf(spark.sessionState.newHadoopConf())
           .build()
-
-        val instant = metaClient.getActiveTimeline.getAllCommitsTimeline
+        val instant1 = metaClient.getActiveTimeline.getAllCommitsTimeline
           .lastInstant().get().getTimestamp
+
+        // 2nd commit instant
         spark.sql(s"insert into $tableName values(1, 'a2', 20, 2000)")
 
         checkAnswer(s"select id, name, price, ts from $tableName distribute by 
cast(rand() * 2 as int)")(
-          Seq(1, "a1", 10.0, 1000),
           Seq(1, "a2", 20.0, 2000)
         )
 
-        // time travel from instant
+        // time travel as of instant 1
         checkAnswer(
-          s"select id, name, price, ts from $tableName TIMESTAMP AS OF 
'$instant' distribute by cast(rand() * 2 as int)")(
+          s"select id, name, price, ts from $tableName TIMESTAMP AS OF 
'$instant1' distribute by cast(rand() * 2 as int)")(
           Seq(1, "a1", 10.0, 1000)
         )
       })


Reply via email to