This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.14.0-siva-0.14.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e4fd81f1b2549baf5f51211ec11d22718e05b9c1 Author: Wechar Yu <yuwq1...@gmail.com> AuthorDate: Sun Dec 17 11:32:30 2023 +0800 [HUDI-7183] Fix static insert overwrite partitions issue (#10254) --- .../SparkInsertOverwriteCommitActionExecutor.java | 17 ++-- ...setBulkInsertOverwriteCommitActionExecutor.java | 18 ++-- .../sql/catalyst/catalog/HoodieCatalogTable.scala | 7 +- .../spark/sql/hudi/ProvidesHoodieConfig.scala | 83 ++++++++++-------- .../command/InsertIntoHoodieTableCommand.scala | 32 +------ .../apache/spark/sql/hudi/TestInsertTable.scala | 98 ++++++++++++++++++++++ 6 files changed, 177 insertions(+), 78 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java index d12efab229d..788e1040783 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java @@ -36,7 +36,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.Partitioner; -import java.util.Collections; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -81,14 +81,15 @@ public class SparkInsertOverwriteCommitActionExecutor<T> @Override protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) { - if (writeMetadata.getWriteStatuses().isEmpty()) { - String staticOverwritePartition = config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS); - if (StringUtils.isNullOrEmpty(staticOverwritePartition)) { - return Collections.emptyMap(); - } else { - return Collections.singletonMap(staticOverwritePartition, getAllExistingFileIds(staticOverwritePartition)); - } + String staticOverwritePartition = config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS); + if (StringUtils.nonEmpty(staticOverwritePartition)) { + // static insert overwrite partitions + List<String> partitionPaths = Arrays.asList(staticOverwritePartition.split(",")); + context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of matching static partitions"); + return HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitionPaths, partitionPaths.size()).mapToPair( + partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap(); } else { + // dynamic insert overwrite partitions return HoodieJavaPairRDD.getJavaPairRDD(writeMetadata.getWriteStatuses().map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap(); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java index c1fd952b106..67ba2027cbd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieInternalConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -33,7 +34,7 @@ import org.apache.hudi.data.HoodieJavaPairRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -60,14 +61,15 @@ public class DatasetBulkInsertOverwriteCommitActionExecutor extends BaseDatasetB @Override protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) { - if (writeStatuses.isEmpty()) { - String staticOverwritePartition = writeConfig.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS); - if (staticOverwritePartition == null || staticOverwritePartition.isEmpty()) { - return Collections.emptyMap(); - } else { - return Collections.singletonMap(staticOverwritePartition, getAllExistingFileIds(staticOverwritePartition)); - } + String staticOverwritePartition = writeConfig.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS); + if (StringUtils.nonEmpty(staticOverwritePartition)) { + // static insert overwrite partitions + List<String> partitionPaths = Arrays.asList(staticOverwritePartition.split(",")); + table.getContext().setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of matching static partitions"); + return HoodieJavaPairRDD.getJavaPairRDD(table.getContext().parallelize(partitionPaths, partitionPaths.size()).mapToPair( + partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap(); } else { + // dynamic insert overwrite partitions return HoodieJavaPairRDD.getJavaPairRDD(writeStatuses.map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap(); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 3c0db3b4691..5fcc750ac5b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -335,7 +335,12 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten nullableField } }.partition(f => partitionFields.contains(f.name)) - StructType(dataFields ++ partFields) + // insert_overwrite operation with partial partition values will mix up the order + // of partition columns, so we also need reorder partition fields here. + val nameToField = partFields.map(field => (field.name, field)).toMap + val orderedPartFields = partitionFields.map(nameToField(_)).toSeq + + StructType(dataFields ++ orderedPartFields) }) catch { case cause: Throwable => diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index a34a6dfb052..22e6cfeeeb5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys -import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.{DataSourceWriteOptions, HoodieFileIndex} import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.toProperties import org.apache.hudi.common.config.{DFSPropertiesConfiguration, TypedProperties} @@ -32,8 +32,10 @@ import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.sql.InsertMode import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} +import org.apache.spark.sql.execution.datasources.FileStatusCache import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hudi.HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isHoodieConfigKey, isUsingHiveCatalog} @@ -334,42 +336,57 @@ trait ProvidesHoodieConfig extends Logging { } } - def deduceIsOverwriteTable(sparkSession: SparkSession, - catalogTable: HoodieCatalogTable, - partitionSpec: Map[String, Option[String]], - extraOptions: Map[String, String]): Boolean = { + /** + * Deduce the overwrite config based on writeOperation and overwriteMode config. + * If hoodie.datasource.write.operation is insert_overwrite/insert_overwrite_table, use dynamic overwrite; + * else if hoodie.datasource.overwrite.mode is configured, use it; + * else use spark.sql.sources.partitionOverwriteMode. + * + * The returned staticOverwritePartitionPathOpt is defined only in static insert_overwrite case. + * + * @return (overwriteMode, isOverWriteTable, isOverWritePartition, staticOverwritePartitionPathOpt) + */ + def deduceOverwriteConfig(sparkSession: SparkSession, + catalogTable: HoodieCatalogTable, + partitionSpec: Map[String, Option[String]], + extraOptions: Map[String, String]): (SaveMode, Boolean, Boolean, Option[String]) = { val combinedOpts: Map[String, String] = combineOptions(catalogTable, catalogTable.tableConfig, sparkSession.sqlContext.conf, defaultOpts = Map.empty, overridingOpts = extraOptions) val operation = combinedOpts.getOrElse(OPERATION.key, null) - operation match { - case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL => - true - case INSERT_OVERWRITE_OPERATION_OPT_VAL => - false + val isOverwriteOperation = operation != null && + (operation.equals(INSERT_OVERWRITE_OPERATION_OPT_VAL) || operation.equals(INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)) + // If hoodie.datasource.overwrite.mode configured, respect it, otherwise respect spark.sql.sources.partitionOverwriteMode + val hoodieOverwriteMode = combinedOpts.getOrElse(OVERWRITE_MODE.key, + sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase() + val isStaticOverwrite = !isOverwriteOperation && (hoodieOverwriteMode match { + case "STATIC" => true + case "DYNAMIC" => false + case _ => throw new IllegalArgumentException("Config hoodie.datasource.overwrite.mode is illegal") + }) + val isOverWriteTable = operation match { + case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL => true + case INSERT_OVERWRITE_OPERATION_OPT_VAL => false case _ => - // NonPartitioned table always insert overwrite whole table - if (catalogTable.partitionFields.isEmpty) { - true - } else { - // Insert overwrite partitioned table with PARTITION clause will always insert overwrite the specific partition - if (partitionSpec.nonEmpty) { - false - } else { - // If hoodie.datasource.overwrite.mode configured, respect it, otherwise respect spark.sql.sources.partitionOverwriteMode - val hoodieOverwriteMode = combinedOpts.getOrElse(OVERWRITE_MODE.key, - sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase() - - hoodieOverwriteMode match { - case "STATIC" => - true - case "DYNAMIC" => - false - case _ => - throw new IllegalArgumentException("Config hoodie.datasource.overwrite.mode is illegal") - } - } - } + // There are two cases where we need use insert_overwrite_table + // 1. NonPartitioned table always insert overwrite whole table + // 2. static mode and no partition values specified + catalogTable.partitionFields.isEmpty || (isStaticOverwrite && partitionSpec.isEmpty) + } + val overwriteMode = if (isOverWriteTable) SaveMode.Overwrite else SaveMode.Append + val staticPartitions = if (isStaticOverwrite && !isOverWriteTable) { + val fileIndex = HoodieFileIndex(sparkSession, catalogTable.metaClient, None, combinedOpts, FileStatusCache.getOrCreate(sparkSession)) + val partitionNameToType = catalogTable.partitionSchema.fields.map(field => (field.name, field.dataType)).toMap + val staticPartitionValues = partitionSpec.filter(p => p._2.isDefined).mapValues(_.get) + val predicates = staticPartitionValues.map { case (k, v) => + val partition = AttributeReference(k, partitionNameToType(k))() + val value = Literal(v) + EqualTo(partition, value) + }.toSeq + Option(fileIndex.getPartitionPaths(predicates).map(_.getPath).mkString(",")) + } else { + Option.empty } + (overwriteMode, isOverWriteTable, !isOverWriteTable, staticPartitions) } def buildHoodieDropPartitionsConfig(sparkSession: SparkSession, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index b8d5be7638f..3f3d4e10ea9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -88,19 +88,11 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi extraOptions: Map[String, String] = Map.empty): Boolean = { val catalogTable = new HoodieCatalogTable(sparkSession, table) - var mode = SaveMode.Append - var isOverWriteTable = false - var isOverWritePartition = false - - if (overwrite) { - if (deduceIsOverwriteTable(sparkSession, catalogTable, partitionSpec, extraOptions)) { - isOverWriteTable = true - mode = SaveMode.Overwrite - } else { - isOverWritePartition = true - } + val (mode, isOverWriteTable, isOverWritePartition, staticOverwritePartitionPathOpt) = if (overwrite) { + deduceOverwriteConfig(sparkSession, catalogTable, partitionSpec, extraOptions) + } else { + (SaveMode.Append, false, false, Option.empty) } - val staticOverwritePartitionPathOpt = getStaticOverwritePartitionPath(catalogTable, partitionSpec, isOverWritePartition) val config = buildHoodieInsertConfig(catalogTable, sparkSession, isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions, staticOverwritePartitionPathOpt) val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, sparkSession.sessionState.conf) @@ -118,22 +110,6 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi success } - private def getStaticOverwritePartitionPath(hoodieCatalogTable: HoodieCatalogTable, - partitionsSpec: Map[String, Option[String]], - isOverWritePartition: Boolean): Option[String] = { - if (isOverWritePartition) { - val staticPartitionValues = filterStaticPartitionValues(partitionsSpec) - val isStaticOverwritePartition = staticPartitionValues.keys.size == hoodieCatalogTable.partitionFields.length - if (isStaticOverwritePartition) { - Option.apply(makePartitionPath(hoodieCatalogTable, staticPartitionValues)) - } else { - Option.empty - } - } else { - Option.empty - } - } - /** * Align provided [[query]]'s output with the expected [[catalogTable]] schema by * diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 1a925827088..9d14064f398 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -504,6 +504,104 @@ class TestInsertTable extends HoodieSparkSqlTestBase { }) } + test("Test insert overwrite for multi partitioned table") { + withRecordType()(Seq("cow", "mor").foreach { tableType => + Seq("dynamic", "static").foreach { overwriteMode => + withTable(generateTableName) { tableName => + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string, + | hh string + |) using hudi + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id' + | ) + | partitioned by (dt, hh) + """.stripMargin + ) + + spark.sql( + s""" + | insert into table $tableName values + | (0, 'a0', 10, 1000, '2023-12-05', '00'), + | (1, 'a1', 10, 1000, '2023-12-06', '00'), + | (2, 'a2', 10, 1000, '2023-12-06', '01') + """.stripMargin) + checkAnswer(s"select id, name, price, ts, dt, hh from $tableName")( + Seq(0, "a0", 10.0, 1000, "2023-12-05", "00"), + Seq(1, "a1", 10.0, 1000, "2023-12-06", "00"), + Seq(2, "a2", 10.0, 1000, "2023-12-06", "01") + ) + + withSQLConf("hoodie.datasource.overwrite.mode" -> overwriteMode) { + // test insert overwrite partitions with partial partition values + spark.sql( + s""" + | insert overwrite table $tableName partition (dt='2023-12-06', hh) values + | (3, 'a3', 10, 1000, '00'), + | (4, 'a4', 10, 1000, '02') + """.stripMargin) + val expected = if (overwriteMode.equalsIgnoreCase("dynamic")) { + Seq( + Seq(0, "a0", 10.0, 1000, "2023-12-05", "00"), + Seq(3, "a3", 10.0, 1000, "2023-12-06", "00"), + Seq(2, "a2", 10.0, 1000, "2023-12-06", "01"), + Seq(4, "a4", 10.0, 1000, "2023-12-06", "02") + ) + } else { + Seq( + Seq(0, "a0", 10.0, 1000, "2023-12-05", "00"), + Seq(3, "a3", 10.0, 1000, "2023-12-06", "00"), + Seq(4, "a4", 10.0, 1000, "2023-12-06", "02") + ) + } + checkAnswer(s"select id, name, price, ts, dt, hh from $tableName")(expected: _*) + + // test insert overwrite without partition values + spark.sql( + s""" + | insert overwrite table $tableName values + | (5, 'a5', 10, 1000, '2023-12-06', '02') + """.stripMargin) + val expected2 = if (overwriteMode.equalsIgnoreCase("dynamic")) { + // dynamic mode only overwrite the matching partitions + Seq( + Seq(0, "a0", 10.0, 1000, "2023-12-05", "00"), + Seq(3, "a3", 10.0, 1000, "2023-12-06", "00"), + Seq(2, "a2", 10.0, 1000, "2023-12-06", "01"), + Seq(5, "a5", 10.0, 1000, "2023-12-06", "02") + ) + } else { + // static mode will overwrite the table + Seq( + Seq(5, "a5", 10.0, 1000, "2023-12-06", "02") + ) + } + checkAnswer(s"select id, name, price, ts, dt, hh from $tableName")(expected2: _*) + + // test insert overwrite table + withSQLConf("hoodie.datasource.write.operation" -> "insert_overwrite_table") { + spark.sql( + s""" + | insert overwrite table $tableName partition (dt='2023-12-06', hh) values + | (6, 'a6', 10, 1000, '00') + """.stripMargin) + checkAnswer(s"select id, name, price, ts, dt, hh from $tableName")( + Seq(6, "a6", 10.0, 1000, "2023-12-06", "00") + ) + } + } + } + } + }) + } + test("Test Different Type of Partition Column") { withRecordType()(withTempDir { tmp => val typeAndValue = Seq(