nsivabalan commented on code in PR #9123:
URL: https://github.com/apache/hudi/pull/9123#discussion_r1257549916


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -187,29 +218,38 @@ trait ProvidesHoodieConfig extends Logging {
 
     val insertModeOpt = combinedOpts.get(SQL_INSERT_MODE.key)
     val insertModeSet = insertModeOpt.nonEmpty
+    val sqlWriteOperationOpt = combinedOpts.get(SQL_WRITE_OPERATION.key())
+    val sqlWriteOperationSet = sqlWriteOperationOpt.nonEmpty
+    val sqlWriteOperation = 
sqlWriteOperationOpt.getOrElse(SQL_WRITE_OPERATION.defaultValue())
+    val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(), 
INSERT_DUP_POLICY.defaultValue())
     val insertMode = 
InsertMode.of(insertModeOpt.getOrElse(SQL_INSERT_MODE.defaultValue()))
     val isNonStrictMode = insertMode == InsertMode.NON_STRICT
     val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
     val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty && 
hoodieCatalogTable.primaryKeys.nonEmpty
 
-    // NOTE: Target operation could be overridden by the user, therefore if it 
has been provided as an input
-    //       we'd prefer that value over auto-deduced operation. Otherwise, we 
deduce target operation type
+    // try to use sql write operation instead of legacy insert mode. If only 
insert mode is explicitly specified, we will uze
+    // o
+    val useLegacyInsertModeFlow = insertModeSet && !sqlWriteOperationSet
     val operation = combinedOpts.getOrElse(OPERATION.key,
+      if (useLegacyInsertModeFlow) {
+        // NOTE: Target operation could be overridden by the user, therefore 
if it has been provided as an input
+        //       we'd prefer that value over auto-deduced operation. 
Otherwise, we deduce target operation type
       deduceWriteOperationForInsertInfo(isPartitionedTable, 
isOverwritePartition, isOverwriteTable, insertModeSet, dropDuplicate,
-        enableBulkInsert, isInsertInto, isNonStrictMode, combineBeforeInsert))
+        enableBulkInsert, isInsertInto, isNonStrictMode, combineBeforeInsert)
+      } else {
+        deduceSqlWriteOperation(isOverwritePartition, isOverwriteTable, 
sqlWriteOperation)
+      }
+    )
 
-    val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL &&
-      tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
-      // Validate duplicate key for COW, for MOR it will do the merge with the 
DefaultHoodieRecordPayload
-      // on reading.
-      // TODO use HoodieSparkValidateDuplicateKeyRecordMerger when 
SparkRecordMerger is default
-      classOf[ValidateDuplicateKeyPayload].getCanonicalName
-    } else if (operation == INSERT_OPERATION_OPT_VAL && tableType == 
COW_TABLE_TYPE_OPT_VAL &&
-      insertMode == InsertMode.STRICT){
-      // Validate duplicate key for inserts to COW table when using strict 
insert mode.
-      classOf[ValidateDuplicateKeyPayload].getCanonicalName
+    val payloadClassName =  if (useLegacyInsertModeFlow) {
+      deducePayloadClassNameLegacy(operation, tableType, insertMode)
     } else {
-      classOf[OverwriteWithLatestAvroPayload].getCanonicalName
+      // should we also consider old way of doing things.

Review Comment:
   this is already taken care in deducePayloadClassNameLegacy, none of the 
downstream methods do anything differently. Its only used to deduce the payload 
class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to