nsivabalan commented on code in PR #9123: URL: https://github.com/apache/hudi/pull/9123#discussion_r1257549916
########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala: ########## @@ -187,29 +218,38 @@ trait ProvidesHoodieConfig extends Logging { val insertModeOpt = combinedOpts.get(SQL_INSERT_MODE.key) val insertModeSet = insertModeOpt.nonEmpty + val sqlWriteOperationOpt = combinedOpts.get(SQL_WRITE_OPERATION.key()) + val sqlWriteOperationSet = sqlWriteOperationOpt.nonEmpty + val sqlWriteOperation = sqlWriteOperationOpt.getOrElse(SQL_WRITE_OPERATION.defaultValue()) + val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(), INSERT_DUP_POLICY.defaultValue()) val insertMode = InsertMode.of(insertModeOpt.getOrElse(SQL_INSERT_MODE.defaultValue())) val isNonStrictMode = insertMode == InsertMode.NON_STRICT val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty && hoodieCatalogTable.primaryKeys.nonEmpty - // NOTE: Target operation could be overridden by the user, therefore if it has been provided as an input - // we'd prefer that value over auto-deduced operation. Otherwise, we deduce target operation type + // try to use sql write operation instead of legacy insert mode. If only insert mode is explicitly specified, we will uze + // o + val useLegacyInsertModeFlow = insertModeSet && !sqlWriteOperationSet val operation = combinedOpts.getOrElse(OPERATION.key, + if (useLegacyInsertModeFlow) { + // NOTE: Target operation could be overridden by the user, therefore if it has been provided as an input + // we'd prefer that value over auto-deduced operation. Otherwise, we deduce target operation type deduceWriteOperationForInsertInfo(isPartitionedTable, isOverwritePartition, isOverwriteTable, insertModeSet, dropDuplicate, - enableBulkInsert, isInsertInto, isNonStrictMode, combineBeforeInsert)) + enableBulkInsert, isInsertInto, isNonStrictMode, combineBeforeInsert) + } else { + deduceSqlWriteOperation(isOverwritePartition, isOverwriteTable, sqlWriteOperation) + } + ) - val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL && - tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) { - // Validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload - // on reading. - // TODO use HoodieSparkValidateDuplicateKeyRecordMerger when SparkRecordMerger is default - classOf[ValidateDuplicateKeyPayload].getCanonicalName - } else if (operation == INSERT_OPERATION_OPT_VAL && tableType == COW_TABLE_TYPE_OPT_VAL && - insertMode == InsertMode.STRICT){ - // Validate duplicate key for inserts to COW table when using strict insert mode. - classOf[ValidateDuplicateKeyPayload].getCanonicalName + val payloadClassName = if (useLegacyInsertModeFlow) { + deducePayloadClassNameLegacy(operation, tableType, insertMode) } else { - classOf[OverwriteWithLatestAvroPayload].getCanonicalName + // should we also consider old way of doing things. Review Comment: this is already taken care in deducePayloadClassNameLegacy, none of the downstream methods do anything differently. Its only used to deduce the payload class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org