This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 5af6d70399496ff7b11d574e34b3691f3ab3d034 Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Tue Sep 12 05:52:20 2023 -0500 [HUDI-6478] Deduce op as upsert for INSERT INTO (#9665) When users explicitly defines primaryKey and preCombineField when CREATE TABLE, subsequent INSERT INTO will deduce the operation as UPSERT. --------- Co-authored-by: sivabalan <n.siv...@gmail.com> --- .../apache/hudi/AutoRecordKeyGenerationUtils.scala | 11 +- .../scala/org/apache/hudi/HoodieWriterUtils.scala | 31 ++-- .../spark/sql/hudi/ProvidesHoodieConfig.scala | 48 +++--- .../sql/hudi/TestAlterTableDropPartition.scala | 1 - .../apache/spark/sql/hudi/TestInsertTable.scala | 161 ++++++++++++++++----- .../spark/sql/hudi/TestTimeTravelTable.scala | 22 ++- 6 files changed, 177 insertions(+), 97 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala index 6c1b828f3be..f5bbfbf7fef 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala @@ -20,7 +20,6 @@ package org.apache.hudi import org.apache.hudi.DataSourceWriteOptions.{INSERT_DROP_DUPS, PRECOMBINE_FIELD} -import org.apache.hudi.HoodieSparkSqlWriter.getClass import org.apache.hudi.common.config.HoodieConfig import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.HoodieWriteConfig @@ -32,9 +31,7 @@ object AutoRecordKeyGenerationUtils { private val log = LoggerFactory.getLogger(getClass) def mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters: Map[String, String], hoodieConfig: HoodieConfig): Unit = { - val autoGenerateRecordKeys = isAutoGenerateRecordKeys(parameters) - // hudi will auto generate. - if (autoGenerateRecordKeys) { + if (shouldAutoGenerateRecordKeys(parameters)) { // de-dup is not supported with auto generation of record keys if (parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean) { @@ -54,7 +51,9 @@ object AutoRecordKeyGenerationUtils { } } - def isAutoGenerateRecordKeys(parameters: Map[String, String]): Boolean = { - !parameters.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) // if record key is not configured, + def shouldAutoGenerateRecordKeys(parameters: Map[String, String]): Boolean = { + val recordKeyFromTableConfig = parameters.getOrElse(HoodieTableConfig.RECORDKEY_FIELDS.key(), "") + val recordKeyFromWriterConfig = parameters.getOrElse(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "") + recordKeyFromTableConfig.isEmpty && recordKeyFromWriterConfig.isEmpty } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 3d043569835..5230c34984f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -17,8 +17,9 @@ package org.apache.hudi +import org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys import org.apache.hudi.DataSourceOptionsHelper.allAlternatives -import org.apache.hudi.DataSourceWriteOptions.{RECORD_MERGER_IMPLS, _} +import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig, TypedProperties} import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType} @@ -29,11 +30,10 @@ import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.util.SparkKeyGenUtils -import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.hudi.command.{MergeIntoKeyGenerator, SqlKeyGenerator} +import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.slf4j.LoggerFactory -import java.util.Properties import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.JavaConverters._ @@ -43,12 +43,10 @@ import scala.collection.JavaConverters._ object HoodieWriterUtils { private val log = LoggerFactory.getLogger(getClass) + /** - * Add default options for unspecified write options keys. - * - * @param parameters - * @return - */ + * Add default options for unspecified write options keys. + */ def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = { val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala val props = TypedProperties.fromMap(parameters) @@ -94,15 +92,16 @@ object HoodieWriterUtils { * Determines whether writes need to take prepped path or regular non-prepped path. * - For spark-sql writes (UPDATES, DELETES), we could use prepped flow due to the presences of meta fields. * - For pkless tables, if incoming df has meta fields, we could use prepped flow. + * * @param hoodieConfig hoodie config of interest. - * @param parameters raw parameters. - * @param operation operation type. - * @param df incoming dataframe + * @param parameters raw parameters. + * @param operation operation type. + * @param df incoming dataframe * @return true if prepped writes, false otherwise. */ - def canDoPreppedWrites(hoodieConfig: HoodieConfig, parameters: Map[String, String], operation : WriteOperationType, df: Dataset[Row]): Boolean = { + def canDoPreppedWrites(hoodieConfig: HoodieConfig, parameters: Map[String, String], operation: WriteOperationType, df: Dataset[Row]): Boolean = { var isPrepped = false - if (AutoRecordKeyGenerationUtils.isAutoGenerateRecordKeys(parameters) + if (shouldAutoGenerateRecordKeys(parameters) && parameters.getOrElse(SPARK_SQL_WRITES_PREPPED_KEY, "false").equals("false") && parameters.getOrElse(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").equals("false") && df.schema.fieldNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)) { @@ -121,6 +120,7 @@ object HoodieWriterUtils { /** * Fetch params by translating alternatives if any. Do not set any default as this method is intended to be called * before validation. + * * @param parameters hash map of parameters. * @return hash map of raw with translated parameters. */ @@ -134,8 +134,6 @@ object HoodieWriterUtils { /** * Get the partition columns to stored to hoodie.properties. - * @param parameters - * @return */ def getPartitionColumns(parameters: Map[String, String]): String = { SparkKeyGenUtils.getPartitionColumns(TypedProperties.fromMap(parameters)) @@ -164,7 +162,7 @@ object HoodieWriterUtils { * Detects conflicts between new parameters and existing table configurations */ def validateTableConfig(spark: SparkSession, params: Map[String, String], - tableConfig: HoodieConfig, isOverWriteMode: Boolean): Unit = { + tableConfig: HoodieConfig, isOverWriteMode: Boolean): Unit = { // If Overwrite is set as save mode, we don't need to do table config validation. if (!isOverWriteMode) { val resolver = spark.sessionState.conf.resolver @@ -267,6 +265,7 @@ object HoodieWriterUtils { PAYLOAD_CLASS_NAME -> HoodieTableConfig.PAYLOAD_CLASS_NAME, RECORD_MERGER_STRATEGY -> HoodieTableConfig.RECORD_MERGER_STRATEGY ) + def mappingSparkDatasourceConfigsToTableConfigs(options: Map[String, String]): Map[String, String] = { val includingTableConfigs = scala.collection.mutable.Map() ++ options sparkDatasourceConfigsToTableConfigsMap.foreach(kv => { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index f85032790dd..4eb8d2b1d1e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.toProperties @@ -28,7 +29,6 @@ import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig, HoodieWr import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, MultiPartKeysValueExtractor} import org.apache.hudi.keygen.ComplexKeyGenerator -import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.sql.InsertMode import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.spark.internal.Logging @@ -96,7 +96,7 @@ trait ProvidesHoodieConfig extends Logging { // TODO use HoodieSparkValidateDuplicateKeyRecordMerger when SparkRecordMerger is default classOf[ValidateDuplicateKeyPayload].getCanonicalName } else if (operation == INSERT_OPERATION_OPT_VAL && tableType == COW_TABLE_TYPE_OPT_VAL && - insertMode == InsertMode.STRICT){ + insertMode == InsertMode.STRICT) { // Validate duplicate key for inserts to COW table when using strict insert mode. classOf[ValidateDuplicateKeyPayload].getCanonicalName } else { @@ -108,13 +108,16 @@ trait ProvidesHoodieConfig extends Logging { * Deduce the sql write operation for INSERT_INTO */ private def deduceSparkSqlInsertIntoWriteOperation(isOverwritePartition: Boolean, isOverwriteTable: Boolean, - sqlWriteOperation: String): String = { + shouldAutoKeyGen: Boolean, preCombineField: String, + sparkSqlInsertIntoOperationSet: Boolean, sparkSqlInsertIntoOperation: String): String = { if (isOverwriteTable) { INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL } else if (isOverwritePartition) { INSERT_OVERWRITE_OPERATION_OPT_VAL + } else if (!sparkSqlInsertIntoOperationSet && !shouldAutoKeyGen && preCombineField.nonEmpty) { + UPSERT_OPERATION_OPT_VAL } else { - sqlWriteOperation + sparkSqlInsertIntoOperation } } @@ -145,7 +148,7 @@ trait ProvidesHoodieConfig extends Logging { // if table is pk table and has enableBulkInsert use bulk insert for non-strict mode. case (true, false, false, _, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL // if auto record key generation is enabled, use bulk_insert - case (_, _, _, _, _,_,true) => BULK_INSERT_OPERATION_OPT_VAL + case (_, _, _, _, _, _, true) => BULK_INSERT_OPERATION_OPT_VAL // for the rest case, use the insert operation case _ => INSERT_OPERATION_OPT_VAL } @@ -182,7 +185,7 @@ trait ProvidesHoodieConfig extends Logging { // NOTE: Here we fallback to "" to make sure that null value is not overridden with // default value ("ts") // TODO(HUDI-3456) clean up - val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("") + val preCombineField = combinedOpts.getOrElse(PRECOMBINE_FIELD.key, "") val hiveStylePartitioningEnable = Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true") val urlEncodePartitioning = Option(tableConfig.getUrlEncodePartitioning).getOrElse("false") @@ -193,14 +196,14 @@ trait ProvidesHoodieConfig extends Logging { DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean val dropDuplicate = sparkSession.conf .getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean - val autoGenerateRecordKeys : Boolean = !combinedOpts.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()); + val shouldAutoKeyGen: Boolean = shouldAutoGenerateRecordKeys(combinedOpts) val insertMode = InsertMode.of(combinedOpts.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key, DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue())) val insertModeSet = combinedOpts.contains(SQL_INSERT_MODE.key) - val sqlWriteOperationOpt = combinedOpts.get(SPARK_SQL_INSERT_INTO_OPERATION.key()) - val sqlWriteOperationSet = sqlWriteOperationOpt.nonEmpty - val sqlWriteOperation = sqlWriteOperationOpt.getOrElse(SPARK_SQL_INSERT_INTO_OPERATION.defaultValue()) + val sparkSqlInsertIntoOperationOpt = combinedOpts.get(SPARK_SQL_INSERT_INTO_OPERATION.key()) + val sparkSqlInsertIntoOperationSet = sparkSqlInsertIntoOperationOpt.nonEmpty + val sparkSqlInsertIntoOperation = sparkSqlInsertIntoOperationOpt.getOrElse(SPARK_SQL_INSERT_INTO_OPERATION.defaultValue()) val insertDupPolicyOpt = combinedOpts.get(INSERT_DUP_POLICY.key()) val insertDupPolicySet = insertDupPolicyOpt.nonEmpty val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(), INSERT_DUP_POLICY.defaultValue()) @@ -208,19 +211,22 @@ trait ProvidesHoodieConfig extends Logging { val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty && hoodieCatalogTable.primaryKeys.nonEmpty - // try to use sql write operation instead of legacy insert mode. If only insert mode is explicitly specified, w/o specifying - // any value for sql write operation, leagcy configs will be honored. But on all other cases (i.e when neither of the configs is set, - // or when both configs are set, or when only sql write operation is set), we honor sql write operation and ignore - // the insert mode. - val useLegacyInsertModeFlow = insertModeSet && !sqlWriteOperationSet + /* + * The sql write operation has higher precedence than the legacy insert mode. + * Only when the legacy insert mode is explicitly set, without setting sql write operation, + * legacy configs will be honored. On all other cases (i.e when both are set, either is set, + * or when only the sql write operation is set), we honor the sql write operation. + */ + val useLegacyInsertModeFlow = insertModeSet && !sparkSqlInsertIntoOperationSet var operation = combinedOpts.getOrElse(OPERATION.key, if (useLegacyInsertModeFlow) { // NOTE: Target operation could be overridden by the user, therefore if it has been provided as an input // we'd prefer that value over auto-deduced operation. Otherwise, we deduce target operation type deduceOperation(enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate, - isNonStrictMode, isPartitionedTable, combineBeforeInsert, insertMode, autoGenerateRecordKeys) + isNonStrictMode, isPartitionedTable, combineBeforeInsert, insertMode, shouldAutoKeyGen) } else { - deduceSparkSqlInsertIntoWriteOperation(isOverwritePartition, isOverwriteTable, sqlWriteOperation) + deduceSparkSqlInsertIntoWriteOperation(isOverwritePartition, isOverwriteTable, + shouldAutoKeyGen, preCombineField, sparkSqlInsertIntoOperationSet, sparkSqlInsertIntoOperation) } ) @@ -233,14 +239,14 @@ trait ProvidesHoodieConfig extends Logging { Map() } } else if (operation.equals(INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)) { - if (sqlWriteOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || enableBulkInsert) { + if (sparkSqlInsertIntoOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || enableBulkInsert) { operation = BULK_INSERT_OPERATION_OPT_VAL Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE_TABLE.value()) } else { Map() } } else if (operation.equals(INSERT_OVERWRITE_OPERATION_OPT_VAL)) { - if (sqlWriteOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || enableBulkInsert) { + if (sparkSqlInsertIntoOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || enableBulkInsert) { operation = BULK_INSERT_OPERATION_OPT_VAL Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE.value()) } else { @@ -254,7 +260,7 @@ trait ProvidesHoodieConfig extends Logging { // w/o specifying any value for insert dup policy, legacy configs will be honored. But on all other cases (i.e when neither of the configs is set, // or when both configs are set, or when only insert dup policy is set), we honor insert dup policy and ignore the insert mode. val useLegacyInsertDropDupFlow = insertModeSet && !insertDupPolicySet - val payloadClassName = if (useLegacyInsertDropDupFlow) { + val payloadClassName = if (useLegacyInsertDropDupFlow) { deducePayloadClassNameLegacy(operation, tableType, insertMode) } else { if (insertDupPolicy == FAIL_INSERT_DUP_POLICY) { @@ -304,7 +310,7 @@ trait ProvidesHoodieConfig extends Logging { defaultOpts = defaultOpts, overridingOpts = overridingOpts) } - def getDropDupsConfig(useLegacyInsertModeFlow: Boolean, incomingParams : Map[String, String]): Map[String, String] = { + def getDropDupsConfig(useLegacyInsertModeFlow: Boolean, incomingParams: Map[String, String]): Map[String, String] = { if (!useLegacyInsertModeFlow) { Map(DataSourceWriteOptions.INSERT_DUP_POLICY.key() -> incomingParams.getOrElse(DataSourceWriteOptions.INSERT_DUP_POLICY.key(), DataSourceWriteOptions.INSERT_DUP_POLICY.defaultValue()), diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index b421732d270..2c592f5a815 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -417,7 +417,6 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", "10", "02")""") checkAnswer(s"select id, name, ts, year, month, day from $tableName")( - Seq(2, "l4", "v1", "2021", "10", "02"), Seq(2, "l4", "v1", "2021", "10", "02") ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index ff2f58982bd..e53a4385efa 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -1727,25 +1727,26 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } /** - * When neither of strict mode nor sql.write.operation is set, sql write operation takes precedence and default value is chosen. + * When neither of strict mode nor sql.write.operation is set, sql write operation is deduced as UPSERT + * due to presence of preCombineField. */ test("Test sql write operation with INSERT_INTO No explicit configs") { spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key) spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode") spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy") spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation") - withRecordType()(withTempDir { tmp => - Seq("cow","mor").foreach {tableType => - withTable(generateTableName) { tableName => - ingestAndValidateData(tableType, tableName, tmp) - } + withRecordType()(withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + withTable(generateTableName) { tableName => + ingestAndValidateData(tableType, tableName, tmp, WriteOperationType.UPSERT) } - }) + } + }) } test("Test sql write operation with INSERT_INTO override both strict mode and sql write operation") { withRecordType()(withTempDir { tmp => - Seq("cow","mor").foreach { tableType => + Seq("cow", "mor").foreach { tableType => Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT, WriteOperationType.UPSERT).foreach { operation => withTable(generateTableName) { tableName => ingestAndValidateData(tableType, tableName, tmp, operation, @@ -1758,7 +1759,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { test("Test sql write operation with INSERT_INTO override only sql write operation") { withRecordType()(withTempDir { tmp => - Seq("cow","mor").foreach {tableType => + Seq("cow", "mor").foreach { tableType => Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT, WriteOperationType.UPSERT).foreach { operation => withTable(generateTableName) { tableName => ingestAndValidateData(tableType, tableName, tmp, operation, @@ -1772,11 +1773,11 @@ class TestInsertTable extends HoodieSparkSqlTestBase { test("Test sql write operation with INSERT_INTO override only strict mode") { spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key) spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode") - spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy") + spark.sessionState.conf.unsetConf(DataSourceWriteOptions.INSERT_DUP_POLICY.key()) spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation") spark.sessionState.conf.unsetConf("hoodie.sql.bulk.insert.enable") withRecordType()(withTempDir { tmp => - Seq("cow","mor").foreach {tableType => + Seq("cow", "mor").foreach { tableType => withTable(generateTableName) { tableName => ingestAndValidateData(tableType, tableName, tmp, WriteOperationType.UPSERT, List("set hoodie.sql.insert.mode = upsert")) @@ -1786,7 +1787,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } def ingestAndValidateData(tableType: String, tableName: String, tmp: File, - expectedOperationtype: WriteOperationType = WriteOperationType.INSERT, + expectedOperationtype: WriteOperationType, setOptions: List[String] = List.empty) : Unit = { setOptions.foreach(entry => { spark.sql(entry) @@ -1851,14 +1852,94 @@ class TestInsertTable extends HoodieSparkSqlTestBase { spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation") } + test("Test sql write operation with INSERT_INTO No explicit configs No Precombine") { + spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key) + spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode") + spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy") + spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation") + withRecordType()(withTempDir { tmp => + Seq("cow","mor").foreach { tableType => + withTable(generateTableName) { tableName => + ingestAndValidateDataNoPrecombine(tableType, tableName, tmp, WriteOperationType.INSERT) + } + } + }) + } + + def ingestAndValidateDataNoPrecombine(tableType: String, tableName: String, tmp: File, + expectedOperationtype: WriteOperationType, + setOptions: List[String] = List.empty) : Unit = { + setOptions.foreach(entry => { + spark.sql(entry) + }) + + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | dt string + |) using hudi + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id' + | ) + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$tableName' + """.stripMargin) + + spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')") + + assertResult(expectedOperationtype) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName").getOperationType + } + checkAnswer(s"select id, name, price, dt from $tableName")( + Seq(1, "a1", 10.0, "2021-07-18") + ) + + // insert record again but w/ diff values but same primary key. + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1_1', 10, "2021-07-18"), + | (2, 'a2', 20, "2021-07-18"), + | (2, 'a2_2', 30, "2021-07-18") + """.stripMargin) + + assertResult(expectedOperationtype) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName").getOperationType + } + if (expectedOperationtype == WriteOperationType.UPSERT) { + // dedup should happen within same batch being ingested and existing records on storage should get updated + checkAnswer(s"select id, name, price, dt from $tableName order by id")( + Seq(1, "a1_1", 10.0, "2021-07-18"), + Seq(2, "a2_2", 30.0, "2021-07-18") + ) + } else { + // no dedup across batches + checkAnswer(s"select id, name, price, dt from $tableName order by id")( + Seq(1, "a1", 10.0, "2021-07-18"), + Seq(1, "a1_1", 10.0, "2021-07-18"), + Seq(2, "a2", 20.0, "2021-07-18"), + Seq(2, "a2_2", 30.0, "2021-07-18") + ) + } + spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key) + spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode") + spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy") + spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation") + } + test("Test insert dup policy with INSERT_INTO explicit new configs INSERT operation ") { withRecordType()(withTempDir { tmp => - Seq("cow","mor").foreach {tableType => + Seq("cow", "mor").foreach { tableType => val operation = WriteOperationType.INSERT - Seq(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY).foreach { dupPolicy => + Seq(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY).foreach { dupPolicy => withTable(generateTableName) { tableName => ingestAndValidateDataDupPolicy(tableType, tableName, tmp, operation, - List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " + operation.value(), "set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy), + List(s"set ${SPARK_SQL_INSERT_INTO_OPERATION.key}=${operation.value}", + s"set ${DataSourceWriteOptions.INSERT_DUP_POLICY.key}=$dupPolicy"), dupPolicy) } } @@ -1868,27 +1949,27 @@ class TestInsertTable extends HoodieSparkSqlTestBase { test("Test insert dup policy with INSERT_INTO explicit new configs BULK_INSERT operation ") { withRecordType()(withTempDir { tmp => - Seq("cow").foreach {tableType => + Seq("cow").foreach { tableType => val operation = WriteOperationType.BULK_INSERT val dupPolicy = NONE_INSERT_DUP_POLICY - withTable(generateTableName) { tableName => - ingestAndValidateDataDupPolicy(tableType, tableName, tmp, operation, - List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " + operation.value(), "set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy), - dupPolicy) - } + withTable(generateTableName) { tableName => + ingestAndValidateDataDupPolicy(tableType, tableName, tmp, operation, + List(s"set ${SPARK_SQL_INSERT_INTO_OPERATION.key}=${operation.value}", + s"set ${DataSourceWriteOptions.INSERT_DUP_POLICY.key}=$dupPolicy"), + dupPolicy) + } } }) } test("Test DROP insert dup policy with INSERT_INTO explicit new configs BULK INSERT operation") { withRecordType(Seq(HoodieRecordType.AVRO))(withTempDir { tmp => - Seq("cow").foreach {tableType => - val operation = WriteOperationType.BULK_INSERT + Seq("cow").foreach { tableType => val dupPolicy = DROP_INSERT_DUP_POLICY withTable(generateTableName) { tableName => - ingestAndValidateDropDupPolicyBulkInsert(tableType, tableName, tmp, operation, - List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " + operation.value(), - "set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy)) + ingestAndValidateDropDupPolicyBulkInsert(tableType, tableName, tmp, + List(s"set ${SPARK_SQL_INSERT_INTO_OPERATION.key}=${WriteOperationType.BULK_INSERT.value}", + s"set ${DataSourceWriteOptions.INSERT_DUP_POLICY.key}=$dupPolicy")) } } }) @@ -1896,22 +1977,24 @@ class TestInsertTable extends HoodieSparkSqlTestBase { test("Test FAIL insert dup policy with INSERT_INTO explicit new configs") { withRecordType(Seq(HoodieRecordType.AVRO))(withTempDir { tmp => - Seq("cow").foreach {tableType => + Seq("cow").foreach { tableType => val operation = WriteOperationType.UPSERT val dupPolicy = FAIL_INSERT_DUP_POLICY - withTable(generateTableName) { tableName => - ingestAndValidateDataDupPolicy(tableType, tableName, tmp, operation, - List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " + operation.value(), "set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy), - dupPolicy, true) - } - } + withTable(generateTableName) { tableName => + ingestAndValidateDataDupPolicy(tableType, tableName, tmp, operation, + List(s"set ${SPARK_SQL_INSERT_INTO_OPERATION.key}=${operation.value}", + s"set ${DataSourceWriteOptions.INSERT_DUP_POLICY.key}=$dupPolicy"), + dupPolicy, true) + } + } }) } def ingestAndValidateDataDupPolicy(tableType: String, tableName: String, tmp: File, - expectedOperationtype: WriteOperationType = WriteOperationType.INSERT, - setOptions: List[String] = List.empty, insertDupPolicy : String = NONE_INSERT_DUP_POLICY, - expectExceptionOnSecondBatch: Boolean = false) : Unit = { + expectedOperationtype: WriteOperationType = WriteOperationType.INSERT, + setOptions: List[String] = List.empty, + insertDupPolicy : String = NONE_INSERT_DUP_POLICY, + expectExceptionOnSecondBatch: Boolean = false) : Unit = { // set additional options setOptions.foreach(entry => { @@ -2010,8 +2093,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } def ingestAndValidateDropDupPolicyBulkInsert(tableType: String, tableName: String, tmp: File, - expectedOperationtype: WriteOperationType = WriteOperationType.BULK_INSERT, - setOptions: List[String] = List.empty) : Unit = { + setOptions: List[String] = List.empty) : Unit = { // set additional options setOptions.foreach(entry => { @@ -2027,8 +2109,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { |) using hudi | tblproperties ( | type = '$tableType', - | primaryKey = 'id', - | preCombine = 'name' + | primaryKey = 'id' | ) | partitioned by (dt) | location '${tmp.getCanonicalPath}/$tableName' diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala index a2fb0c80fad..73bad3be282 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala @@ -41,24 +41,24 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase { | location '${tmp.getCanonicalPath}/$tableName1' """.stripMargin) + // 1st commit instant spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)") val metaClient1 = HoodieTableMetaClient.builder() .setBasePath(s"${tmp.getCanonicalPath}/$tableName1") .setConf(spark.sessionState.newHadoopConf()) .build() - val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline .lastInstant().get().getTimestamp + // 2nd commit instant spark.sql(s"insert into $tableName1 values(1, 'a2', 20, 2000)") checkAnswer(s"select id, name, price, ts from $tableName1")( - Seq(1, "a1", 10.0, 1000), Seq(1, "a2", 20.0, 2000) ) - // time travel from instant1 + // time travel as of instant 1 checkAnswer( s"select id, name, price, ts from $tableName1 TIMESTAMP AS OF '$instant1'")( Seq(1, "a1", 10.0, 1000) @@ -194,11 +194,6 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase { Seq(2, "a2", 20.0, 1000) ) - checkAnswer(s"select id, name, price, ts from $tableName1")( - Seq(1, "a1", 10.0, 1000), - Seq(2, "a2", 20.0, 1000) - ) - spark.sql(s"insert into $tableName2 values(3, 'a3', 10, 1000)") spark.sql(s"insert into $tableName2 values(4, 'a4', 20, 1000)") @@ -272,25 +267,26 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase { | location '${tmp.getCanonicalPath}/$tableName' """.stripMargin) + // 1st commit instant spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") val metaClient = HoodieTableMetaClient.builder() .setBasePath(s"${tmp.getCanonicalPath}/$tableName") .setConf(spark.sessionState.newHadoopConf()) .build() - - val instant = metaClient.getActiveTimeline.getAllCommitsTimeline + val instant1 = metaClient.getActiveTimeline.getAllCommitsTimeline .lastInstant().get().getTimestamp + + // 2nd commit instant spark.sql(s"insert into $tableName values(1, 'a2', 20, 2000)") checkAnswer(s"select id, name, price, ts from $tableName distribute by cast(rand() * 2 as int)")( - Seq(1, "a1", 10.0, 1000), Seq(1, "a2", 20.0, 2000) ) - // time travel from instant + // time travel as of instant 1 checkAnswer( - s"select id, name, price, ts from $tableName TIMESTAMP AS OF '$instant' distribute by cast(rand() * 2 as int)")( + s"select id, name, price, ts from $tableName TIMESTAMP AS OF '$instant1' distribute by cast(rand() * 2 as int)")( Seq(1, "a1", 10.0, 1000) ) })