This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e0fe1612eb [HUDI-5160] Fix data source write save as table (#7448)
e0fe1612eb is described below
commit e0fe1612ebf40caf4e588c249b6a06c0287a46fd
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jan 6 01:14:33 2023 +0800
[HUDI-5160] Fix data source write save as table (#7448)
---
.../sql/catalyst/catalog/HoodieCatalogTable.scala | 20 ++++----
.../apache/spark/sql/hudi/HoodieOptionConfig.scala | 59 ++++++++++++----------
.../spark/sql/hudi/HoodieSqlCommonUtils.scala | 2 +-
.../spark/sql/hudi/TestHoodieOptionConfig.scala | 2 +-
4 files changed, 45 insertions(+), 38 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 7bd6dd2244..826c0090b2 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -23,14 +23,15 @@ import
org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
-import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hudi.HoodieOptionConfig
-import org.apache.spark.sql.hudi.HoodieOptionConfig.SQL_KEY_TABLE_PRIMARY_KEY
+import org.apache.spark.sql.hudi.HoodieOptionConfig._
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -216,20 +217,21 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = {
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap
val globalTableConfigs =
mappingSparkDatasourceConfigsToTableConfigs(globalProps)
- val globalSqlOptions =
HoodieOptionConfig.mappingTableConfigToSqlOption(globalTableConfigs)
+ val globalSqlOptions = mapTableConfigsToSqlOptions(globalTableConfigs)
- val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(globalSqlOptions
++ catalogProperties)
+ val sqlOptions = withDefaultSqlOptions(globalSqlOptions ++
+ mapDataSourceWriteOptionsToSqlOptions(catalogProperties) ++
catalogProperties)
// get final schema and parameters
val (finalSchema, tableConfigs) = (table.tableType, hoodieTableExists)
match {
case (CatalogTableType.EXTERNAL, true) =>
val existingTableConfig = tableConfig.getProps.asScala.toMap
val currentTableConfig = globalTableConfigs ++ existingTableConfig
- val catalogTableProps =
HoodieOptionConfig.mappingSqlOptionToTableConfig(catalogProperties)
+ val catalogTableProps = mapSqlOptionsToTableConfigs(catalogProperties)
validateTableConfig(spark, catalogTableProps,
convertMapToHoodieConfig(existingTableConfig))
val options = extraTableConfig(hoodieTableExists, currentTableConfig)
++
- HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) ++
currentTableConfig
+ mapSqlOptionsToTableConfigs(sqlOptions) ++ currentTableConfig
val schemaFromMetaOpt = loadTableSchemaByMetaClient()
val schema = if (schemaFromMetaOpt.nonEmpty) {
@@ -243,11 +245,11 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
(schema, options)
case (_, false) =>
- ValidationUtils.checkArgument(table.schema.nonEmpty,
+ checkArgument(table.schema.nonEmpty,
s"Missing schema for Create Table: $catalogTableName")
val schema = table.schema
val options = extraTableConfig(tableExists = false,
globalTableConfigs) ++
- HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions)
+ mapSqlOptionsToTableConfigs(sqlOptions)
(addMetaFields(schema), options)
case (CatalogTableType.MANAGED, true) =>
@@ -255,7 +257,7 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
s". The associated location('$tableLocation') already exists.")
}
HoodieOptionConfig.validateTable(spark, finalSchema,
- HoodieOptionConfig.mappingTableConfigToSqlOption(tableConfigs))
+ mapTableConfigsToSqlOptions(tableConfigs))
val resolver = spark.sessionState.conf.resolver
val dataSchema = finalSchema.filterNot { f =>
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index df75b60f54..7219af6c4a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -79,7 +79,7 @@ object HoodieOptionConfig {
/**
* The mapping of the sql short name key to the hoodie's config key.
*/
- private lazy val keyMapping: Map[String, String] = {
+ private lazy val sqlOptionKeyToWriteConfigKey: Map[String, String] = {
HoodieOptionConfig.getClass.getDeclaredFields
.filter(f => f.getType == classOf[HoodieSQLOption[_]])
.map(f => {f.setAccessible(true);
f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]})
@@ -87,11 +87,14 @@ object HoodieOptionConfig {
.toMap
}
+ private lazy val writeConfigKeyToSqlOptionKey: Map[String, String] =
+ sqlOptionKeyToWriteConfigKey.map(f => f._2 -> f._1)
+
/**
* The mapping of the sql short name key to the hoodie table config key
* defined in HoodieTableConfig.
*/
- private lazy val keyTableConfigMapping: Map[String, String] = {
+ private lazy val sqlOptionKeyToTableConfigKey: Map[String, String] = {
HoodieOptionConfig.getClass.getDeclaredFields
.filter(f => f.getType == classOf[HoodieSQLOption[_]])
.map(f => {f.setAccessible(true);
f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]})
@@ -100,41 +103,43 @@ object HoodieOptionConfig {
.toMap
}
- private lazy val tableConfigKeyToSqlKey: Map[String, String] =
- keyTableConfigMapping.map(f => f._2 -> f._1)
+ private lazy val tableConfigKeyToSqlOptionKey: Map[String, String] =
+ sqlOptionKeyToTableConfigKey.map(f => f._2 -> f._1)
/**
* Mapping of the short sql value to the hoodie's config value
*/
- private val valueMapping: Map[String, String] = Map (
+ private val sqlOptionValueToWriteConfigValue: Map[String, String] = Map (
SQL_VALUE_TABLE_TYPE_COW -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
SQL_VALUE_TABLE_TYPE_MOR -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
)
- private lazy val reverseValueMapping = valueMapping.map(f => f._2 -> f._1)
+ private lazy val writeConfigValueToSqlOptionValue =
sqlOptionValueToWriteConfigValue.map(f => f._2 -> f._1)
def withDefaultSqlOptions(options: Map[String, String]): Map[String, String]
= defaultSqlOptions ++ options
/**
- * Mapping the sql's short name key/value in the options to the hoodie's
config key/value.
- * @param options
- * @return
+ * Map SQL options to data source write configs.
*/
- def mappingSqlOptionToHoodieParam(options: Map[String, String]): Map[String,
String] = {
+ def mapSqlOptionsToDataSourceWriteConfigs(options: Map[String, String]):
Map[String, String] = {
options.map (kv =>
- keyMapping.getOrElse(kv._1, kv._1) -> valueMapping.getOrElse(kv._2,
kv._2))
+ sqlOptionKeyToWriteConfigKey.getOrElse(kv._1, kv._1) ->
sqlOptionValueToWriteConfigValue.getOrElse(kv._2, kv._2))
}
/**
- * Mapping the sql options to the hoodie table config which used to store to
the hoodie
- * .properties when create the table.
- * @param options
- * @return
+ * Mapping the data source write configs to SQL options.
+ */
+ def mapDataSourceWriteOptionsToSqlOptions(options: Map[String, String]):
Map[String, String] = {
+ options.map(kv => writeConfigKeyToSqlOptionKey.getOrElse(kv._1, kv._1) ->
writeConfigValueToSqlOptionValue.getOrElse(kv._2, kv._2))
+ }
+
+ /**
+ * Map SQL options to table configs.
*/
- def mappingSqlOptionToTableConfig(options: Map[String, String]): Map[String,
String] = {
+ def mapSqlOptionsToTableConfigs(options: Map[String, String]): Map[String,
String] = {
options.map { case (k, v) =>
- if (keyTableConfigMapping.contains(k)) {
- keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v)
+ if (sqlOptionKeyToTableConfigKey.contains(k)) {
+ sqlOptionKeyToTableConfigKey(k) ->
sqlOptionValueToWriteConfigValue.getOrElse(v, v)
} else {
k -> v
}
@@ -142,10 +147,10 @@ object HoodieOptionConfig {
}
/**
- * Mapping the table config (loaded from the hoodie.properties) to the sql
options.
+ * Map table configs to SQL options.
*/
- def mappingTableConfigToSqlOption(options: Map[String, String]): Map[String,
String] = {
- options.map(kv => tableConfigKeyToSqlKey.getOrElse(kv._1, kv._1) ->
reverseValueMapping.getOrElse(kv._2, kv._2))
+ def mapTableConfigsToSqlOptions(options: Map[String, String]): Map[String,
String] = {
+ options.map(kv => tableConfigKeyToSqlOptionKey.getOrElse(kv._1, kv._1) ->
writeConfigValueToSqlOptionValue.getOrElse(kv._2, kv._2))
}
val defaultSqlOptions: Map[String, String] = {
@@ -163,7 +168,7 @@ object HoodieOptionConfig {
* @return
*/
def getPrimaryColumns(options: Map[String, String]): Array[String] = {
- val params = mappingSqlOptionToHoodieParam(options)
+ val params = mapSqlOptionsToDataSourceWriteConfigs(options)
params.get(DataSourceWriteOptions.RECORDKEY_FIELD.key)
.map(_.split(",").filter(_.nonEmpty))
.getOrElse(Array.empty)
@@ -175,24 +180,24 @@ object HoodieOptionConfig {
* @return
*/
def getTableType(options: Map[String, String]): String = {
- val params = mappingSqlOptionToHoodieParam(options)
+ val params = mapSqlOptionsToDataSourceWriteConfigs(options)
params.getOrElse(DataSourceWriteOptions.TABLE_TYPE.key,
DataSourceWriteOptions.TABLE_TYPE.defaultValue)
}
def getPreCombineField(options: Map[String, String]): Option[String] = {
- val params = mappingSqlOptionToHoodieParam(options)
+ val params = mapSqlOptionsToDataSourceWriteConfigs(options)
params.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key).filter(_.nonEmpty)
}
def deleteHoodieOptions(options: Map[String, String]): Map[String, String] =
{
- options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv =>
keyMapping.contains(kv._1))
+ options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv =>
sqlOptionKeyToWriteConfigKey.contains(kv._1))
}
// extract primaryKey, preCombineField, type options
def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
- val sqlOptions = mappingTableConfigToSqlOption(options)
- val targetOptions = keyMapping.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName)
-- Set(SQL_MERGER_STRATEGY.sqlKeyName)
+ val sqlOptions = mapTableConfigsToSqlOptions(options)
+ val targetOptions = sqlOptionKeyToWriteConfigKey.keySet --
Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_MERGER_STRATEGY.sqlKeyName)
sqlOptions.filterKeys(targetOptions.contains)
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index aff65672c5..eb26ef52d3 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -263,7 +263,7 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
def withSparkConf(spark: SparkSession, options: Map[String, String])
(baseConfig: Map[String, String] = Map.empty): Map[String,
String] = {
baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ //
Table options has the highest priority
- (spark.sessionState.conf.getAllConfs ++
HoodieOptionConfig.mappingSqlOptionToHoodieParam(options))
+ (spark.sessionState.conf.getAllConfs ++
HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(options))
.filterKeys(isHoodieConfigKey)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
index ef84eb2c89..2cb9c98878 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
@@ -56,7 +56,7 @@ class TestHoodieOptionConfig extends
SparkClientFunctionalTestHarness {
"hoodie.index.type" -> "INMEMORY",
"hoodie.compact.inline" -> "true"
)
- val tableConfigs =
HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions)
+ val tableConfigs =
HoodieOptionConfig.mapSqlOptionsToTableConfigs(sqlOptions)
assertTrue(tableConfigs.size == 5)
assertTrue(tableConfigs(HoodieTableConfig.RECORDKEY_FIELDS.key) ==
"id,addr")