This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e0fe1612eb [HUDI-5160] Fix data source write save as table (#7448)
e0fe1612eb is described below

commit e0fe1612ebf40caf4e588c249b6a06c0287a46fd
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jan 6 01:14:33 2023 +0800

    [HUDI-5160] Fix data source write save as table (#7448)
---
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  | 20 ++++----
 .../apache/spark/sql/hudi/HoodieOptionConfig.scala | 59 ++++++++++++----------
 .../spark/sql/hudi/HoodieSqlCommonUtils.scala      |  2 +-
 .../spark/sql/hudi/TestHoodieOptionConfig.scala    |  2 +-
 4 files changed, 45 insertions(+), 38 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 7bd6dd2244..826c0090b2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -23,14 +23,15 @@ import 
org.apache.hudi.common.config.DFSPropertiesConfiguration
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
-import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.avro.SchemaConverters
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hudi.HoodieOptionConfig
-import org.apache.spark.sql.hudi.HoodieOptionConfig.SQL_KEY_TABLE_PRIMARY_KEY
+import org.apache.spark.sql.hudi.HoodieOptionConfig._
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -216,20 +217,21 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
   private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = {
     val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap
     val globalTableConfigs = 
mappingSparkDatasourceConfigsToTableConfigs(globalProps)
-    val globalSqlOptions = 
HoodieOptionConfig.mappingTableConfigToSqlOption(globalTableConfigs)
+    val globalSqlOptions = mapTableConfigsToSqlOptions(globalTableConfigs)
 
-    val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(globalSqlOptions 
++ catalogProperties)
+    val sqlOptions = withDefaultSqlOptions(globalSqlOptions ++
+      mapDataSourceWriteOptionsToSqlOptions(catalogProperties) ++ 
catalogProperties)
 
     // get final schema and parameters
     val (finalSchema, tableConfigs) = (table.tableType, hoodieTableExists) 
match {
       case (CatalogTableType.EXTERNAL, true) =>
         val existingTableConfig = tableConfig.getProps.asScala.toMap
         val currentTableConfig = globalTableConfigs ++ existingTableConfig
-        val catalogTableProps = 
HoodieOptionConfig.mappingSqlOptionToTableConfig(catalogProperties)
+        val catalogTableProps = mapSqlOptionsToTableConfigs(catalogProperties)
         validateTableConfig(spark, catalogTableProps, 
convertMapToHoodieConfig(existingTableConfig))
 
         val options = extraTableConfig(hoodieTableExists, currentTableConfig) 
++
-          HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) ++ 
currentTableConfig
+          mapSqlOptionsToTableConfigs(sqlOptions) ++ currentTableConfig
 
         val schemaFromMetaOpt = loadTableSchemaByMetaClient()
         val schema = if (schemaFromMetaOpt.nonEmpty) {
@@ -243,11 +245,11 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
         (schema, options)
 
       case (_, false) =>
-        ValidationUtils.checkArgument(table.schema.nonEmpty,
+        checkArgument(table.schema.nonEmpty,
           s"Missing schema for Create Table: $catalogTableName")
         val schema = table.schema
         val options = extraTableConfig(tableExists = false, 
globalTableConfigs) ++
-          HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions)
+          mapSqlOptionsToTableConfigs(sqlOptions)
         (addMetaFields(schema), options)
 
       case (CatalogTableType.MANAGED, true) =>
@@ -255,7 +257,7 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
           s". The associated location('$tableLocation') already exists.")
     }
     HoodieOptionConfig.validateTable(spark, finalSchema,
-      HoodieOptionConfig.mappingTableConfigToSqlOption(tableConfigs))
+      mapTableConfigsToSqlOptions(tableConfigs))
 
     val resolver = spark.sessionState.conf.resolver
     val dataSchema = finalSchema.filterNot { f =>
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index df75b60f54..7219af6c4a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -79,7 +79,7 @@ object HoodieOptionConfig {
   /**
    * The mapping of the sql short name key to the hoodie's config key.
    */
-  private lazy val keyMapping: Map[String, String] = {
+  private lazy val sqlOptionKeyToWriteConfigKey: Map[String, String] = {
     HoodieOptionConfig.getClass.getDeclaredFields
         .filter(f => f.getType == classOf[HoodieSQLOption[_]])
         .map(f => {f.setAccessible(true); 
f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]})
@@ -87,11 +87,14 @@ object HoodieOptionConfig {
         .toMap
   }
 
+  private lazy val writeConfigKeyToSqlOptionKey: Map[String, String] =
+    sqlOptionKeyToWriteConfigKey.map(f => f._2 -> f._1)
+
   /**
    * The mapping of the sql short name key to the hoodie table config key
    * defined in HoodieTableConfig.
    */
-  private lazy val keyTableConfigMapping: Map[String, String] = {
+  private lazy val sqlOptionKeyToTableConfigKey: Map[String, String] = {
     HoodieOptionConfig.getClass.getDeclaredFields
       .filter(f => f.getType == classOf[HoodieSQLOption[_]])
       .map(f => {f.setAccessible(true); 
f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]})
@@ -100,41 +103,43 @@ object HoodieOptionConfig {
       .toMap
   }
 
-  private lazy val tableConfigKeyToSqlKey: Map[String, String] =
-    keyTableConfigMapping.map(f => f._2 -> f._1)
+  private lazy val tableConfigKeyToSqlOptionKey: Map[String, String] =
+    sqlOptionKeyToTableConfigKey.map(f => f._2 -> f._1)
 
   /**
    * Mapping of the short sql value to the hoodie's config value
    */
-  private val valueMapping: Map[String, String] = Map (
+  private val sqlOptionValueToWriteConfigValue: Map[String, String] = Map (
     SQL_VALUE_TABLE_TYPE_COW -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
     SQL_VALUE_TABLE_TYPE_MOR -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
   )
 
-  private lazy val reverseValueMapping = valueMapping.map(f => f._2 -> f._1)
+  private lazy val writeConfigValueToSqlOptionValue = 
sqlOptionValueToWriteConfigValue.map(f => f._2 -> f._1)
 
   def withDefaultSqlOptions(options: Map[String, String]): Map[String, String] 
= defaultSqlOptions ++ options
 
   /**
-   * Mapping the sql's short name key/value in the options to the hoodie's 
config key/value.
-   * @param options
-   * @return
+   * Map SQL options to data source write configs.
    */
-  def mappingSqlOptionToHoodieParam(options: Map[String, String]): Map[String, 
String] = {
+  def mapSqlOptionsToDataSourceWriteConfigs(options: Map[String, String]): 
Map[String, String] = {
     options.map (kv =>
-      keyMapping.getOrElse(kv._1, kv._1) -> valueMapping.getOrElse(kv._2, 
kv._2))
+      sqlOptionKeyToWriteConfigKey.getOrElse(kv._1, kv._1) -> 
sqlOptionValueToWriteConfigValue.getOrElse(kv._2, kv._2))
   }
 
   /**
-   * Mapping the sql options to the hoodie table config which used to store to 
the hoodie
-   * .properties when create the table.
-   * @param options
-   * @return
+   * Mapping the data source write configs to SQL options.
+   */
+  def mapDataSourceWriteOptionsToSqlOptions(options: Map[String, String]): 
Map[String, String] = {
+    options.map(kv => writeConfigKeyToSqlOptionKey.getOrElse(kv._1, kv._1) -> 
writeConfigValueToSqlOptionValue.getOrElse(kv._2, kv._2))
+  }
+
+  /**
+   * Map SQL options to table configs.
    */
-  def mappingSqlOptionToTableConfig(options: Map[String, String]): Map[String, 
String] = {
+  def mapSqlOptionsToTableConfigs(options: Map[String, String]): Map[String, 
String] = {
     options.map { case (k, v) =>
-      if (keyTableConfigMapping.contains(k)) {
-        keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v)
+      if (sqlOptionKeyToTableConfigKey.contains(k)) {
+        sqlOptionKeyToTableConfigKey(k) -> 
sqlOptionValueToWriteConfigValue.getOrElse(v, v)
       } else {
         k -> v
       }
@@ -142,10 +147,10 @@ object HoodieOptionConfig {
   }
 
   /**
-   * Mapping the table config (loaded from the hoodie.properties) to the sql 
options.
+   * Map table configs to SQL options.
    */
-  def mappingTableConfigToSqlOption(options: Map[String, String]): Map[String, 
String] = {
-    options.map(kv => tableConfigKeyToSqlKey.getOrElse(kv._1, kv._1) -> 
reverseValueMapping.getOrElse(kv._2, kv._2))
+  def mapTableConfigsToSqlOptions(options: Map[String, String]): Map[String, 
String] = {
+    options.map(kv => tableConfigKeyToSqlOptionKey.getOrElse(kv._1, kv._1) -> 
writeConfigValueToSqlOptionValue.getOrElse(kv._2, kv._2))
   }
 
   val defaultSqlOptions: Map[String, String] = {
@@ -163,7 +168,7 @@ object HoodieOptionConfig {
    * @return
    */
   def getPrimaryColumns(options: Map[String, String]): Array[String] = {
-    val params = mappingSqlOptionToHoodieParam(options)
+    val params = mapSqlOptionsToDataSourceWriteConfigs(options)
     params.get(DataSourceWriteOptions.RECORDKEY_FIELD.key)
       .map(_.split(",").filter(_.nonEmpty))
       .getOrElse(Array.empty)
@@ -175,24 +180,24 @@ object HoodieOptionConfig {
    * @return
    */
   def getTableType(options: Map[String, String]): String = {
-    val params = mappingSqlOptionToHoodieParam(options)
+    val params = mapSqlOptionsToDataSourceWriteConfigs(options)
     params.getOrElse(DataSourceWriteOptions.TABLE_TYPE.key,
       DataSourceWriteOptions.TABLE_TYPE.defaultValue)
   }
 
   def getPreCombineField(options: Map[String, String]): Option[String] = {
-    val params = mappingSqlOptionToHoodieParam(options)
+    val params = mapSqlOptionsToDataSourceWriteConfigs(options)
     params.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key).filter(_.nonEmpty)
   }
 
   def deleteHoodieOptions(options: Map[String, String]): Map[String, String] = 
{
-    options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv => 
keyMapping.contains(kv._1))
+    options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv => 
sqlOptionKeyToWriteConfigKey.contains(kv._1))
   }
 
   // extract primaryKey, preCombineField, type options
   def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
-    val sqlOptions = mappingTableConfigToSqlOption(options)
-    val targetOptions = keyMapping.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName) 
-- Set(SQL_MERGER_STRATEGY.sqlKeyName)
+    val sqlOptions = mapTableConfigsToSqlOptions(options)
+    val targetOptions = sqlOptionKeyToWriteConfigKey.keySet -- 
Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_MERGER_STRATEGY.sqlKeyName)
     sqlOptions.filterKeys(targetOptions.contains)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index aff65672c5..eb26ef52d3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -263,7 +263,7 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
   def withSparkConf(spark: SparkSession, options: Map[String, String])
                    (baseConfig: Map[String, String] = Map.empty): Map[String, 
String] = {
     baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // 
Table options has the highest priority
-      (spark.sessionState.conf.getAllConfs ++ 
HoodieOptionConfig.mappingSqlOptionToHoodieParam(options))
+      (spark.sessionState.conf.getAllConfs ++ 
HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(options))
         .filterKeys(isHoodieConfigKey)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
index ef84eb2c89..2cb9c98878 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
@@ -56,7 +56,7 @@ class TestHoodieOptionConfig extends 
SparkClientFunctionalTestHarness {
       "hoodie.index.type" -> "INMEMORY",
       "hoodie.compact.inline" -> "true"
     )
-    val tableConfigs = 
HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions)
+    val tableConfigs = 
HoodieOptionConfig.mapSqlOptionsToTableConfigs(sqlOptions)
 
     assertTrue(tableConfigs.size == 5)
     assertTrue(tableConfigs(HoodieTableConfig.RECORDKEY_FIELDS.key) == 
"id,addr")

Reply via email to