This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 27ac42818bc [HUDI-7183] Fix static insert overwrite partitions issue 
(#10254)
27ac42818bc is described below

commit 27ac42818bcf3768c2a6742ac0edfcb79c253e52
Author: Wechar Yu <yuwq1...@gmail.com>
AuthorDate: Sun Dec 17 11:32:30 2023 +0800

    [HUDI-7183] Fix static insert overwrite partitions issue (#10254)
---
 .../SparkInsertOverwriteCommitActionExecutor.java  | 17 ++--
 ...setBulkInsertOverwriteCommitActionExecutor.java | 18 ++--
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |  7 +-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      | 83 ++++++++++--------
 .../command/InsertIntoHoodieTableCommand.scala     | 32 +------
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 98 ++++++++++++++++++++++
 6 files changed, 177 insertions(+), 78 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index d12efab229d..788e1040783 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -36,7 +36,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import org.apache.spark.Partitioner;
 
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -81,14 +81,15 @@ public class SparkInsertOverwriteCommitActionExecutor<T>
 
   @Override
   protected Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
-    if (writeMetadata.getWriteStatuses().isEmpty()) {
-      String staticOverwritePartition = 
config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
-      if (StringUtils.isNullOrEmpty(staticOverwritePartition)) {
-        return Collections.emptyMap();
-      } else {
-        return Collections.singletonMap(staticOverwritePartition, 
getAllExistingFileIds(staticOverwritePartition));
-      }
+    String staticOverwritePartition = 
config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+    if (StringUtils.nonEmpty(staticOverwritePartition)) {
+      // static insert overwrite partitions
+      List<String> partitionPaths = 
Arrays.asList(staticOverwritePartition.split(","));
+      context.setJobStatus(this.getClass().getSimpleName(), "Getting 
ExistingFileIds of matching static partitions");
+      return 
HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitionPaths, 
partitionPaths.size()).mapToPair(
+          partitionPath -> Pair.of(partitionPath, 
getAllExistingFileIds(partitionPath)))).collectAsMap();
     } else {
+      // dynamic insert overwrite partitions
       return 
HoodieJavaPairRDD.getJavaPairRDD(writeMetadata.getWriteStatuses().map(status -> 
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
           Pair.of(partitionPath, 
getAllExistingFileIds(partitionPath)))).collectAsMap();
     }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
index c1fd952b106..67ba2027cbd 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieInternalConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -33,7 +34,7 @@ import org.apache.hudi.data.HoodieJavaPairRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -60,14 +61,15 @@ public class DatasetBulkInsertOverwriteCommitActionExecutor 
extends BaseDatasetB
 
   @Override
   protected Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
-    if (writeStatuses.isEmpty()) {
-      String staticOverwritePartition = 
writeConfig.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
-      if (staticOverwritePartition == null || 
staticOverwritePartition.isEmpty()) {
-        return Collections.emptyMap();
-      } else {
-        return Collections.singletonMap(staticOverwritePartition, 
getAllExistingFileIds(staticOverwritePartition));
-      }
+    String staticOverwritePartition = 
writeConfig.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+    if (StringUtils.nonEmpty(staticOverwritePartition)) {
+      // static insert overwrite partitions
+      List<String> partitionPaths = 
Arrays.asList(staticOverwritePartition.split(","));
+      table.getContext().setJobStatus(this.getClass().getSimpleName(), 
"Getting ExistingFileIds of matching static partitions");
+      return 
HoodieJavaPairRDD.getJavaPairRDD(table.getContext().parallelize(partitionPaths, 
partitionPaths.size()).mapToPair(
+          partitionPath -> Pair.of(partitionPath, 
getAllExistingFileIds(partitionPath)))).collectAsMap();
     } else {
+      // dynamic insert overwrite partitions
       return HoodieJavaPairRDD.getJavaPairRDD(writeStatuses.map(status -> 
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
           Pair.of(partitionPath, 
getAllExistingFileIds(partitionPath)))).collectAsMap();
     }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index b766e0d315e..c1414fe77fe 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -339,7 +339,12 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
           nullableField
         }
       }.partition(f => partitionFields.contains(f.name))
-      StructType(dataFields ++ partFields)
+      // insert_overwrite operation with partial partition values will mix up 
the order
+      // of partition columns, so we also need reorder partition fields here.
+      val nameToField = partFields.map(field => (field.name, field)).toMap
+      val orderedPartFields = partitionFields.map(nameToField(_)).toSeq
+
+      StructType(dataFields ++ orderedPartFields)
     })
     catch {
       case cause: Throwable =>
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 5492d12d5fb..250067d4b84 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hudi
 
 import 
org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
-import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.{DataSourceWriteOptions, HoodieFileIndex}
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.toProperties
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
TypedProperties}
@@ -32,8 +32,10 @@ import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.hudi.sql.InsertMode
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal}
+import org.apache.spark.sql.execution.datasources.FileStatusCache
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import 
org.apache.spark.sql.hudi.HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isHoodieConfigKey, 
isUsingHiveCatalog}
@@ -334,42 +336,57 @@ trait ProvidesHoodieConfig extends Logging {
     }
   }
 
-  def deduceIsOverwriteTable(sparkSession: SparkSession,
-                             catalogTable: HoodieCatalogTable,
-                             partitionSpec: Map[String, Option[String]],
-                             extraOptions: Map[String, String]): Boolean = {
+  /**
+   * Deduce the overwrite config based on writeOperation and overwriteMode 
config.
+   * If hoodie.datasource.write.operation is 
insert_overwrite/insert_overwrite_table, use dynamic overwrite;
+   * else if hoodie.datasource.overwrite.mode is configured, use it;
+   * else use spark.sql.sources.partitionOverwriteMode.
+   *
+   * The returned staticOverwritePartitionPathOpt is defined only in static 
insert_overwrite case.
+   *
+   * @return (overwriteMode, isOverWriteTable, isOverWritePartition, 
staticOverwritePartitionPathOpt)
+   */
+  def deduceOverwriteConfig(sparkSession: SparkSession,
+                            catalogTable: HoodieCatalogTable,
+                            partitionSpec: Map[String, Option[String]],
+                            extraOptions: Map[String, String]): (SaveMode, 
Boolean, Boolean, Option[String]) = {
     val combinedOpts: Map[String, String] = combineOptions(catalogTable, 
catalogTable.tableConfig, sparkSession.sqlContext.conf,
       defaultOpts = Map.empty, overridingOpts = extraOptions)
     val operation = combinedOpts.getOrElse(OPERATION.key, null)
-    operation match {
-      case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL =>
-        true
-      case INSERT_OVERWRITE_OPERATION_OPT_VAL =>
-        false
+    val isOverwriteOperation = operation != null &&
+      (operation.equals(INSERT_OVERWRITE_OPERATION_OPT_VAL) || 
operation.equals(INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL))
+    // If hoodie.datasource.overwrite.mode configured, respect it, otherwise 
respect spark.sql.sources.partitionOverwriteMode
+    val hoodieOverwriteMode = combinedOpts.getOrElse(OVERWRITE_MODE.key,
+      
sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()
+    val isStaticOverwrite = !isOverwriteOperation && (hoodieOverwriteMode 
match {
+      case "STATIC" => true
+      case "DYNAMIC" => false
+      case _ => throw new IllegalArgumentException("Config 
hoodie.datasource.overwrite.mode is illegal")
+    })
+    val isOverWriteTable = operation match {
+      case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL => true
+      case INSERT_OVERWRITE_OPERATION_OPT_VAL => false
       case _ =>
-        // NonPartitioned table always insert overwrite whole table
-        if (catalogTable.partitionFields.isEmpty) {
-          true
-        } else {
-          // Insert overwrite partitioned table with PARTITION clause will 
always insert overwrite the specific partition
-          if (partitionSpec.nonEmpty) {
-            false
-          } else {
-            // If hoodie.datasource.overwrite.mode configured, respect it, 
otherwise respect spark.sql.sources.partitionOverwriteMode
-            val hoodieOverwriteMode = 
combinedOpts.getOrElse(OVERWRITE_MODE.key,
-              
sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()
-
-            hoodieOverwriteMode match {
-              case "STATIC" =>
-                true
-              case "DYNAMIC" =>
-                false
-              case _ =>
-                throw new IllegalArgumentException("Config 
hoodie.datasource.overwrite.mode is illegal")
-            }
-          }
-        }
+        // There are two cases where we need use insert_overwrite_table
+        // 1. NonPartitioned table always insert overwrite whole table
+        // 2. static mode and no partition values specified
+        catalogTable.partitionFields.isEmpty || (isStaticOverwrite && 
partitionSpec.isEmpty)
+    }
+    val overwriteMode = if (isOverWriteTable) SaveMode.Overwrite else 
SaveMode.Append
+    val staticPartitions = if (isStaticOverwrite && !isOverWriteTable) {
+      val fileIndex = HoodieFileIndex(sparkSession, catalogTable.metaClient, 
None, combinedOpts, FileStatusCache.getOrCreate(sparkSession))
+      val partitionNameToType = catalogTable.partitionSchema.fields.map(field 
=> (field.name, field.dataType)).toMap
+      val staticPartitionValues = partitionSpec.filter(p => 
p._2.isDefined).mapValues(_.get)
+      val predicates = staticPartitionValues.map { case (k, v) =>
+        val partition = AttributeReference(k, partitionNameToType(k))()
+        val value = Literal(v)
+        EqualTo(partition, value)
+      }.toSeq
+      
Option(fileIndex.getPartitionPaths(predicates).map(_.getPath).mkString(","))
+    } else {
+      Option.empty
     }
+    (overwriteMode, isOverWriteTable, !isOverWriteTable, staticPartitions)
   }
 
   def buildHoodieDropPartitionsConfig(sparkSession: SparkSession,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 008118bfcce..5a7aec53b63 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -88,19 +88,11 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig wi
           extraOptions: Map[String, String] = Map.empty): Boolean = {
     val catalogTable = new HoodieCatalogTable(sparkSession, table)
 
-    var mode = SaveMode.Append
-    var isOverWriteTable = false
-    var isOverWritePartition = false
-
-    if (overwrite) {
-      if (deduceIsOverwriteTable(sparkSession, catalogTable, partitionSpec, 
extraOptions)) {
-        isOverWriteTable = true
-        mode = SaveMode.Overwrite
-      } else {
-        isOverWritePartition = true
-      }
+    val (mode, isOverWriteTable, isOverWritePartition, 
staticOverwritePartitionPathOpt) = if (overwrite) {
+      deduceOverwriteConfig(sparkSession, catalogTable, partitionSpec, 
extraOptions)
+    } else {
+      (SaveMode.Append, false, false, Option.empty)
     }
-    val staticOverwritePartitionPathOpt = 
getStaticOverwritePartitionPath(catalogTable, partitionSpec, 
isOverWritePartition)
     val config = buildHoodieInsertConfig(catalogTable, sparkSession, 
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions, 
staticOverwritePartitionPathOpt)
 
     val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, 
sparkSession.sessionState.conf)
@@ -118,22 +110,6 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig wi
     success
   }
 
-  private def getStaticOverwritePartitionPath(hoodieCatalogTable: 
HoodieCatalogTable,
-                                              partitionsSpec: Map[String, 
Option[String]],
-                                              isOverWritePartition: Boolean): 
Option[String] = {
-    if (isOverWritePartition) {
-      val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
-      val isStaticOverwritePartition = staticPartitionValues.keys.size == 
hoodieCatalogTable.partitionFields.length
-      if (isStaticOverwritePartition) {
-        Option.apply(makePartitionPath(hoodieCatalogTable, 
staticPartitionValues))
-      } else {
-        Option.empty
-      }
-    } else {
-      Option.empty
-    }
-  }
-
   /**
    * Align provided [[query]]'s output with the expected [[catalogTable]] 
schema by
    *
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index a62317d8920..b1b7353c2bc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -504,6 +504,104 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     })
   }
 
+  test("Test insert overwrite for multi partitioned table") {
+    withRecordType()(Seq("cow", "mor").foreach { tableType =>
+      Seq("dynamic", "static").foreach { overwriteMode =>
+        withTable(generateTableName) { tableName =>
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long,
+               |  dt string,
+               |  hh string
+               |) using hudi
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id'
+               | )
+               | partitioned by (dt, hh)
+          """.stripMargin
+          )
+
+          spark.sql(
+            s"""
+               | insert into table $tableName values
+               | (0, 'a0', 10, 1000, '2023-12-05', '00'),
+               | (1, 'a1', 10, 1000, '2023-12-06', '00'),
+               | (2, 'a2', 10, 1000, '2023-12-06', '01')
+          """.stripMargin)
+          checkAnswer(s"select id, name, price, ts, dt, hh from $tableName")(
+            Seq(0, "a0", 10.0, 1000, "2023-12-05", "00"),
+            Seq(1, "a1", 10.0, 1000, "2023-12-06", "00"),
+            Seq(2, "a2", 10.0, 1000, "2023-12-06", "01")
+          )
+
+          withSQLConf("hoodie.datasource.overwrite.mode" -> overwriteMode) {
+            // test insert overwrite partitions with partial partition values
+            spark.sql(
+              s"""
+                 | insert overwrite table $tableName partition 
(dt='2023-12-06', hh) values
+                 | (3, 'a3', 10, 1000, '00'),
+                 | (4, 'a4', 10, 1000, '02')
+            """.stripMargin)
+            val expected = if (overwriteMode.equalsIgnoreCase("dynamic")) {
+              Seq(
+                Seq(0, "a0", 10.0, 1000, "2023-12-05", "00"),
+                Seq(3, "a3", 10.0, 1000, "2023-12-06", "00"),
+                Seq(2, "a2", 10.0, 1000, "2023-12-06", "01"),
+                Seq(4, "a4", 10.0, 1000, "2023-12-06", "02")
+              )
+            } else {
+              Seq(
+                Seq(0, "a0", 10.0, 1000, "2023-12-05", "00"),
+                Seq(3, "a3", 10.0, 1000, "2023-12-06", "00"),
+                Seq(4, "a4", 10.0, 1000, "2023-12-06", "02")
+              )
+            }
+            checkAnswer(s"select id, name, price, ts, dt, hh from 
$tableName")(expected: _*)
+
+            // test insert overwrite without partition values
+            spark.sql(
+              s"""
+                 | insert overwrite table $tableName values
+                 | (5, 'a5', 10, 1000, '2023-12-06', '02')
+            """.stripMargin)
+            val expected2 = if (overwriteMode.equalsIgnoreCase("dynamic")) {
+              // dynamic mode only overwrite the matching partitions
+              Seq(
+                Seq(0, "a0", 10.0, 1000, "2023-12-05", "00"),
+                Seq(3, "a3", 10.0, 1000, "2023-12-06", "00"),
+                Seq(2, "a2", 10.0, 1000, "2023-12-06", "01"),
+                Seq(5, "a5", 10.0, 1000, "2023-12-06", "02")
+              )
+            } else {
+              // static mode will overwrite the table
+              Seq(
+                Seq(5, "a5", 10.0, 1000, "2023-12-06", "02")
+              )
+            }
+            checkAnswer(s"select id, name, price, ts, dt, hh from 
$tableName")(expected2: _*)
+
+            // test insert overwrite table
+            withSQLConf("hoodie.datasource.write.operation" -> 
"insert_overwrite_table") {
+              spark.sql(
+                s"""
+                   | insert overwrite table $tableName partition 
(dt='2023-12-06', hh) values
+                   | (6, 'a6', 10, 1000, '00')
+              """.stripMargin)
+              checkAnswer(s"select id, name, price, ts, dt, hh from 
$tableName")(
+                Seq(6, "a6", 10.0, 1000, "2023-12-06", "00")
+              )
+            }
+          }
+        }
+      }
+    })
+  }
+
   test("Test Different Type of Partition Column") {
    withRecordType()(withTempDir { tmp =>
      val typeAndValue = Seq(

Reply via email to