This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new df5d7c8fd262 feat(spark-datasource): support spark.hoodie.* read 
config overrides (#18205)
df5d7c8fd262 is described below

commit df5d7c8fd262c42afbc65ca6a83f0ff4075863cc
Author: Surya Prasanna <[email protected]>
AuthorDate: Tue Feb 24 11:09:41 2026 -0800

    feat(spark-datasource): support spark.hoodie.* read config overrides 
(#18205)
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  | 17 ++++++-
 .../main/scala/org/apache/hudi/DefaultSource.scala | 15 +++++-
 .../org/apache/hudi/TestDataSourceOptions.scala    | 54 +++++++++++++++++++++-
 3 files changed, 82 insertions(+), 4 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 237afaf4379f..8f10823cb870 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -991,6 +991,10 @@ object DataSourceOptionsHelper {
 
   private val log = LoggerFactory.getLogger(DataSourceOptionsHelper.getClass)
 
+  // Prefix constants for config normalization
+  private val SPARK_HOODIE_PREFIX = "spark.hoodie."
+  private val SPARK_PREFIX = "spark."
+
   // put all the configs with alternatives here
   private val allConfigsWithAlternatives = List(
     DataSourceReadOptions.QUERY_TYPE,
@@ -1076,7 +1080,18 @@ object DataSourceOptionsHelper {
   def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, 
String] = {
     // First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by 
HiveSyncTool,
     // or else use query type from QUERY_TYPE.
-    val paramsWithGlobalProps = 
DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++ parameters
+    // Config precedence (low -> high):
+    // 1) global DFS props
+    // 2) spark.hoodie.* (normalized to hoodie.*)
+    // 3) hoodie.* / explicit data source options
+    // NOTE: If both spark.hoodie.X and hoodie.X are set, hoodie.X wins.
+    val normalizedSparkHoodieConfigs = parameters.collect {
+      case (key, value) if key.startsWith(SPARK_HOODIE_PREFIX) => 
(key.stripPrefix(SPARK_PREFIX), value)
+    }
+    val paramsWithoutSparkHoodie = 
parameters.filterNot(_._1.startsWith(SPARK_HOODIE_PREFIX))
+    val paramsWithGlobalProps = 
DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++
+      normalizedSparkHoodieConfigs ++
+      paramsWithoutSparkHoodie
     val queryType = paramsWithGlobalProps.get(IS_QUERY_AS_RO_TABLE)
       .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else 
QUERY_TYPE_SNAPSHOT_OPT_VAL)
       .getOrElse(paramsWithGlobalProps.getOrElse(QUERY_TYPE.key, 
QUERY_TYPE.defaultValue()))
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 2706f2a7c387..bf8dee324f4f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -107,8 +107,17 @@ class DefaultSource extends RelationProvider
       throw new HoodieException("Glob paths are not supported for read paths 
as of Hudi 1.2.0")
     }
 
+    val hoodieAndSparkHoodieSqlConfs = sqlContext.getAllConfs.filter {
+      case (key, _) => key.startsWith("hoodie.") || 
key.startsWith("spark.hoodie.")
+    }
     // Add default options for unspecified read options keys.
-    val parameters = 
DataSourceOptionsHelper.parametersWithReadDefaults(sqlContext.getAllConfs.filter(k
 => k._1.startsWith("hoodie.")) ++ optParams)
+    // Effective precedence (low -> high):
+    // 1) global DFS props
+    // 2) spark.hoodie.* SQL confs (normalized in parametersWithReadDefaults)
+    // 3) hoodie.* SQL confs
+    // 4) explicit DataFrame/DataSource options
+    val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(
+      hoodieAndSparkHoodieSqlConfs ++ optParams)
 
     // Get the table base path
     val tablePath = DataSourceUtils.getTablePath(storage, Seq(new 
StoragePath(path.get)).asJava)
@@ -125,7 +134,9 @@ class DefaultSource extends RelationProvider
       parameters
     }
 
-    DefaultSource.createRelation(sqlContext, metaClient, schema, options.toMap)
+    val relation = DefaultSource.createRelation(sqlContext, metaClient, 
schema, options.toMap)
+    log.info(s"Created relation ${relation.getClass.getSimpleName} with 
${options.size} resolved options")
+    relation
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala
index 5f2f7dada0ed..20d61973f213 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala
@@ -19,9 +19,10 @@
 
 package org.apache.hudi
 
-import org.apache.hudi.common.config.HoodieCommonConfig
+import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieCommonConfig}
 import org.apache.hudi.common.table.HoodieTableConfig
 
+import org.junit.jupiter.api.AfterEach
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Test
 
@@ -43,4 +44,55 @@ class TestDataSourceOptions {
       HoodieTableConfig.DROP_PARTITION_COLUMNS.defaultValue(),
       DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue())
   }
+
+  @Test
+  def testReadDefaultsSupportSparkHoodieConfigs(): Unit = {
+    val params = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+      "spark.hoodie.datasource.query.type" -> 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL
+    ))
+
+    assertEquals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, 
params(DataSourceReadOptions.QUERY_TYPE.key))
+    assertTrue(!params.contains("spark.hoodie.datasource.query.type"))
+  }
+
+  @Test
+  def testReadDefaultsPreferHoodieOverSparkHoodieWhenBothSet(): Unit = {
+    val params = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+      "spark.hoodie.datasource.query.type" -> 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+      "hoodie.datasource.query.type" -> 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL
+    ))
+
+    assertEquals(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, 
params(DataSourceReadOptions.QUERY_TYPE.key))
+  }
+
+  @Test
+  def testReadDefaultsConfigHierarchyWithGlobalDFSProps(): Unit = {
+    // Set a config in global DFS props (lowest priority)
+    DFSPropertiesConfiguration.addToGlobalProps(
+      DataSourceReadOptions.QUERY_TYPE.key,
+      DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL
+    )
+
+    // Test 1: Global DFS props are used when no other configs are set
+    val params1 = DataSourceOptionsHelper.parametersWithReadDefaults(Map.empty)
+    assertEquals(DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, 
params1(DataSourceReadOptions.QUERY_TYPE.key))
+
+    // Test 2: spark.hoodie.* overrides global DFS props
+    val params2 = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+      "spark.hoodie.datasource.query.type" -> 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL
+    ))
+    assertEquals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, 
params2(DataSourceReadOptions.QUERY_TYPE.key))
+
+    // Test 3: hoodie.* overrides both spark.hoodie.* and global DFS props
+    val params3 = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+      "spark.hoodie.datasource.query.type" -> 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+      "hoodie.datasource.query.type" -> 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL
+    ))
+    assertEquals(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, 
params3(DataSourceReadOptions.QUERY_TYPE.key))
+  }
+
+  @AfterEach
+  def cleanup(): Unit = {
+    DFSPropertiesConfiguration.clearGlobalProps()
+  }
 }

Reply via email to