This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new df5d7c8fd262 feat(spark-datasource): support spark.hoodie.* read
config overrides (#18205)
df5d7c8fd262 is described below
commit df5d7c8fd262c42afbc65ca6a83f0ff4075863cc
Author: Surya Prasanna <[email protected]>
AuthorDate: Tue Feb 24 11:09:41 2026 -0800
feat(spark-datasource): support spark.hoodie.* read config overrides
(#18205)
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 17 ++++++-
.../main/scala/org/apache/hudi/DefaultSource.scala | 15 +++++-
.../org/apache/hudi/TestDataSourceOptions.scala | 54 +++++++++++++++++++++-
3 files changed, 82 insertions(+), 4 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 237afaf4379f..8f10823cb870 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -991,6 +991,10 @@ object DataSourceOptionsHelper {
private val log = LoggerFactory.getLogger(DataSourceOptionsHelper.getClass)
+ // Prefix constants for config normalization
+ private val SPARK_HOODIE_PREFIX = "spark.hoodie."
+ private val SPARK_PREFIX = "spark."
+
// put all the configs with alternatives here
private val allConfigsWithAlternatives = List(
DataSourceReadOptions.QUERY_TYPE,
@@ -1076,7 +1080,18 @@ object DataSourceOptionsHelper {
def parametersWithReadDefaults(parameters: Map[String, String]): Map[String,
String] = {
// First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by
HiveSyncTool,
// or else use query type from QUERY_TYPE.
- val paramsWithGlobalProps =
DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++ parameters
+ // Config precedence (low -> high):
+ // 1) global DFS props
+ // 2) spark.hoodie.* (normalized to hoodie.*)
+ // 3) hoodie.* / explicit data source options
+ // NOTE: If both spark.hoodie.X and hoodie.X are set, hoodie.X wins.
+ val normalizedSparkHoodieConfigs = parameters.collect {
+ case (key, value) if key.startsWith(SPARK_HOODIE_PREFIX) =>
(key.stripPrefix(SPARK_PREFIX), value)
+ }
+ val paramsWithoutSparkHoodie =
parameters.filterNot(_._1.startsWith(SPARK_HOODIE_PREFIX))
+ val paramsWithGlobalProps =
DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++
+ normalizedSparkHoodieConfigs ++
+ paramsWithoutSparkHoodie
val queryType = paramsWithGlobalProps.get(IS_QUERY_AS_RO_TABLE)
.map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else
QUERY_TYPE_SNAPSHOT_OPT_VAL)
.getOrElse(paramsWithGlobalProps.getOrElse(QUERY_TYPE.key,
QUERY_TYPE.defaultValue()))
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 2706f2a7c387..bf8dee324f4f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -107,8 +107,17 @@ class DefaultSource extends RelationProvider
throw new HoodieException("Glob paths are not supported for read paths
as of Hudi 1.2.0")
}
+ val hoodieAndSparkHoodieSqlConfs = sqlContext.getAllConfs.filter {
+ case (key, _) => key.startsWith("hoodie.") ||
key.startsWith("spark.hoodie.")
+ }
// Add default options for unspecified read options keys.
- val parameters =
DataSourceOptionsHelper.parametersWithReadDefaults(sqlContext.getAllConfs.filter(k
=> k._1.startsWith("hoodie.")) ++ optParams)
+ // Effective precedence (low -> high):
+ // 1) global DFS props
+ // 2) spark.hoodie.* SQL confs (normalized in parametersWithReadDefaults)
+ // 3) hoodie.* SQL confs
+ // 4) explicit DataFrame/DataSource options
+ val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(
+ hoodieAndSparkHoodieSqlConfs ++ optParams)
// Get the table base path
val tablePath = DataSourceUtils.getTablePath(storage, Seq(new
StoragePath(path.get)).asJava)
@@ -125,7 +134,9 @@ class DefaultSource extends RelationProvider
parameters
}
- DefaultSource.createRelation(sqlContext, metaClient, schema, options.toMap)
+ val relation = DefaultSource.createRelation(sqlContext, metaClient,
schema, options.toMap)
+ log.info(s"Created relation ${relation.getClass.getSimpleName} with
${options.size} resolved options")
+ relation
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala
index 5f2f7dada0ed..20d61973f213 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala
@@ -19,9 +19,10 @@
package org.apache.hudi
-import org.apache.hudi.common.config.HoodieCommonConfig
+import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
HoodieCommonConfig}
import org.apache.hudi.common.table.HoodieTableConfig
+import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test
@@ -43,4 +44,55 @@ class TestDataSourceOptions {
HoodieTableConfig.DROP_PARTITION_COLUMNS.defaultValue(),
DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue())
}
+
+ @Test
+ def testReadDefaultsSupportSparkHoodieConfigs(): Unit = {
+ val params = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+ "spark.hoodie.datasource.query.type" ->
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL
+ ))
+
+ assertEquals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
params(DataSourceReadOptions.QUERY_TYPE.key))
+ assertTrue(!params.contains("spark.hoodie.datasource.query.type"))
+ }
+
+ @Test
+ def testReadDefaultsPreferHoodieOverSparkHoodieWhenBothSet(): Unit = {
+ val params = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+ "spark.hoodie.datasource.query.type" ->
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+ "hoodie.datasource.query.type" ->
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL
+ ))
+
+ assertEquals(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL,
params(DataSourceReadOptions.QUERY_TYPE.key))
+ }
+
+ @Test
+ def testReadDefaultsConfigHierarchyWithGlobalDFSProps(): Unit = {
+ // Set a config in global DFS props (lowest priority)
+ DFSPropertiesConfiguration.addToGlobalProps(
+ DataSourceReadOptions.QUERY_TYPE.key,
+ DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL
+ )
+
+ // Test 1: Global DFS props are used when no other configs are set
+ val params1 = DataSourceOptionsHelper.parametersWithReadDefaults(Map.empty)
+ assertEquals(DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL,
params1(DataSourceReadOptions.QUERY_TYPE.key))
+
+ // Test 2: spark.hoodie.* overrides global DFS props
+ val params2 = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+ "spark.hoodie.datasource.query.type" ->
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL
+ ))
+ assertEquals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
params2(DataSourceReadOptions.QUERY_TYPE.key))
+
+ // Test 3: hoodie.* overrides both spark.hoodie.* and global DFS props
+ val params3 = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+ "spark.hoodie.datasource.query.type" ->
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+ "hoodie.datasource.query.type" ->
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL
+ ))
+ assertEquals(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL,
params3(DataSourceReadOptions.QUERY_TYPE.key))
+ }
+
+ @AfterEach
+ def cleanup(): Unit = {
+ DFSPropertiesConfiguration.clearGlobalProps()
+ }
}