This is an automated email from the ASF dual-hosted git repository. akudinkin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 13ed95a9fd2 Make most of the Spark SQL DML operations configs overridable (#7850) 13ed95a9fd2 is described below commit 13ed95a9fd2e0b330f8e16cb2b08714a432a70b4 Author: Alexey Kudinkin <alexey.kudin...@gmail.com> AuthorDate: Sat Feb 4 10:54:17 2023 -0800 Make most of the Spark SQL DML operations configs overridable (#7850) This PR makes most of the Spark SQL operations configs are overridable, leaving only a few ones as those that should not and couldn't be overridden by the user --- .../spark/sql/hudi/ProvidesHoodieConfig.scala | 69 +++++++++++----------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 0c766f5135b..ffc4079824f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -61,20 +61,10 @@ trait ProvidesHoodieConfig extends Logging { val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) val defaultOpts = Map[String, String]( + OPERATION.key -> UPSERT_OPERATION_OPT_VAL, KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - OPERATION.key -> UPSERT_OPERATION_OPT_VAL - ) - - val overridingOpts = Map[String, String]( - "path" -> hoodieCatalogTable.tableLocation, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), - TBL_NAME.key -> hoodieCatalogTable.tableName, - PRECOMBINE_FIELD.key -> preCombineField, - HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, - URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL, - PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), @@ -85,6 +75,16 @@ trait ProvidesHoodieConfig extends Logging { HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString ) + val overridingOpts = Map[String, String]( + "path" -> hoodieCatalogTable.tableLocation, + RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), + TBL_NAME.key -> hoodieCatalogTable.tableName, + PRECOMBINE_FIELD.key -> preCombineField, + HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, + URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, + PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp + ) + combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, defaultOpts = defaultOpts, overridingOpts = overridingOpts) } @@ -183,22 +183,10 @@ trait ProvidesHoodieConfig extends Logging { PAYLOAD_CLASS_NAME.key -> payloadClassName, // NOTE: By default insert would try to do deduplication in case that pre-combine column is specified // for the table - HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn) - ) - - val overridingOpts = extraOptions ++ Map( - "path" -> path, - TABLE_TYPE.key -> tableType, - TBL_NAME.key -> hoodieCatalogTable.tableName, - OPERATION.key -> operation, - HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable, - URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning, + HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn), KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName, SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), - PRECOMBINE_FIELD.key -> preCombineField, - PARTITIONPATH_FIELD.key -> partitionFieldsStr, HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr, HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), @@ -209,6 +197,18 @@ trait ProvidesHoodieConfig extends Logging { HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) ) + val overridingOpts = extraOptions ++ Map( + "path" -> path, + TABLE_TYPE.key -> tableType, + TBL_NAME.key -> hoodieCatalogTable.tableName, + OPERATION.key -> operation, + HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable, + URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning, + RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), + PRECOMBINE_FIELD.key -> preCombineField, + PARTITIONPATH_FIELD.key -> partitionFieldsStr + ) + combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, defaultOpts = defaultOpts, overridingOpts = overridingOpts) } @@ -257,17 +257,10 @@ trait ProvidesHoodieConfig extends Logging { val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) - val overridingOpts = Map( - "path" -> path, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), - TBL_NAME.key -> tableConfig.getTableName, - HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, - URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, + val defaultOpts = Map( KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL, - OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, - PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), @@ -278,8 +271,18 @@ trait ProvidesHoodieConfig extends Logging { HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) ) + val overridingOpts = Map( + "path" -> path, + RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), + TBL_NAME.key -> tableConfig.getTableName, + HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, + URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, + OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, + PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp + ) + combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, - defaultOpts = Map.empty, overridingOpts = overridingOpts) + defaultOpts = defaultOpts, overridingOpts = overridingOpts) } def buildHiveSyncConfig(sparkSession: SparkSession,