This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1203b215b03c feat: Use PartitionValueExtractor interface in Spark 
reader path (#17850)
1203b215b03c is described below

commit 1203b215b03cb5c76fffb46a360efe0c4ce21fe0
Author: Surya Prasanna <[email protected]>
AuthorDate: Sun Mar 1 21:56:36 2026 -0800

    feat: Use PartitionValueExtractor interface in Spark reader path (#17850)
    
    Users can now configure custom partition value extractors for read 
operations using the hoodie.datasource.read.partition.value.extractor.class 
option, enabling support for non-standard partition path formats.
    
    Changes:
    
    Moved PartitionValueExtractor interface from hudi-sync-common to 
hudi-common for broader accessibility
    Added `hoodie.datasource.partition_extractor_class` as writer property and 
`hoodie.table.partition_extractor_class` as table property to assist with the 
same.
    
    Impact
    Public API Changes:
    
    New config option: hoodie.datasource.read.partition.value.extractor.class 
for Spark reads
    PartitionValueExtractor interface relocated from 
org.apache.hudi.sync.common.model to org.apache.hudi.hive.sync
    
    User-Facing Changes:
    Users can now customize partition value extraction during read operations 
by providing a custom PartitionValueExtractor implementation
---
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |  53 ++-
 .../hudi/common/table/HoodieTableConfig.java       |  13 +
 .../hudi/common/table/HoodieTableMetaClient.java   |  12 +
 .../hudi/common/util/HoodieTableConfigUtils.java   |  40 ++-
 .../sync/common/model/PartitionValueExtractor.java |   0
 .../apache/hudi/configuration/FlinkOptions.java    |   7 +
 .../java/org/apache/hudi/util/StreamerUtil.java    |   1 +
 .../hudi/common/table/TestHoodieTableConfig.java   |   2 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  23 ++
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  10 +-
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |   9 +
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   8 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  10 +
 .../apache/hudi/SparkHoodieTableFileIndex.scala    |   9 +-
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |  22 +-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |  25 +-
 .../hudi/command/CreateHoodieTableCommand.scala    |   1 +
 .../sql/hudi/common/MockSlashKeyGenerator.scala    | 134 +++++++
 .../common/MockSlashPartitionValueExtractor.scala  |  40 +++
 .../common/TestCustomParitionValueExtractor.scala  | 386 +++++++++++++++++++++
 .../apache/hudi/sync/common/HoodieSyncConfig.java  |  37 +-
 .../hudi/sync/common/TestHoodieSyncConfig.java     |  16 +
 .../apache/hudi/utilities/streamer/StreamSync.java |   7 +-
 23 files changed, 804 insertions(+), 61 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 08c315b0e78d..03294b83c0f2 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.TimestampKeyGeneratorConfig
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.schema.HoodieSchema
-import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.common.util.{StringUtils, Option => HOption}
 import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator
 import org.apache.hudi.keygen.constant.KeyGeneratorType
 import org.apache.hudi.storage.StoragePath
@@ -31,6 +31,8 @@ import org.apache.hudi.util.ExceptionWrappingIterator
 import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.sync.common.model.PartitionValueExtractor
+
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
@@ -224,11 +226,13 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
                                  partitionPath: String,
                                  tableBasePath: StoragePath,
                                  tableSchema: StructType,
-                                 tableConfig: java.util.Map[String, String],
+                                 tableConfig: HoodieTableConfig,
                                  timeZoneId: String,
-                                 shouldValidatePartitionColumns: Boolean): 
Array[Object] = {
+                                 shouldValidatePartitionColumns: Boolean,
+                                 usePartitionValueExtractorOnRead: Boolean): 
Array[Object] = {
     val keyGeneratorClass = 
KeyGeneratorType.getKeyGeneratorClassName(tableConfig)
-    val timestampKeyGeneratorType = 
tableConfig.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
+    val timestampKeyGeneratorType = 
tableConfig.propsMap().get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
+    val partitionValueExtractorClass = 
tableConfig.getPartitionExtractorClass.orElse("")
 
     if (null != keyGeneratorClass
       && null != timestampKeyGeneratorType
@@ -238,10 +242,43 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
       // we couldn't reconstruct initial partition column values from 
partition paths due to lost data after formatting.
       // But the output for these cases is in a string format, so we can pass 
partitionPath as UTF8String
       Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath))
+    } else if(usePartitionValueExtractorOnRead && 
!StringUtils.isNullOrEmpty(partitionValueExtractorClass)) {
+      
parsePartitionValuesBasedOnPartitionValueExtractor(partitionValueExtractorClass,
 partitionPath,
+        partitionColumns, tableSchema)
     } else {
       doParsePartitionColumnValues(partitionColumns, partitionPath, 
tableBasePath, tableSchema, timeZoneId,
-        shouldValidatePartitionColumns, 
tableConfig.getOrDefault(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.key,
-          
HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.defaultValue).toBoolean)
+        shouldValidatePartitionColumns, 
tableConfig.getSlashSeparatedDatePartitioning)
+    }
+  }
+
+  /**
+   * Parses partition values from partition path using a custom 
PartitionValueExtractor.
+   *
+   * @param partitionValueExtractorClass Fully qualified class name of the 
PartitionValueExtractor implementation
+   * @param partitionPath The partition path to extract values from
+   * @param partitionColumns Array of partition column names
+   * @param tableSchema The schema of the table
+   * @return Array of partition values as Objects, properly typed according to 
the schema
+   */
+  private def parsePartitionValuesBasedOnPartitionValueExtractor(
+      partitionValueExtractorClass: String,
+      partitionPath: String,
+      partitionColumns: Array[String],
+      tableSchema: StructType): Array[Object] = {
+    try {
+      val partitionValueExtractor = Class.forName(partitionValueExtractorClass)
+        .getDeclaredConstructor()
+        .newInstance()
+        .asInstanceOf[PartitionValueExtractor]
+      val partitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(partitionPath).asScala.toArray
+      val partitionSchema = buildPartitionSchemaForNestedFields(tableSchema, 
partitionColumns)
+      val typedValues = partitionValues.zip(partitionSchema.fields).map { case 
(stringValue, field) =>
+        castStringToType(stringValue, field.dataType)
+      }
+      typedValues.map(_.asInstanceOf[Object])
+    } catch {
+      case e: Exception =>
+        throw new RuntimeException(s"Failed to extract partition value using 
$partitionValueExtractorClass class", e)
     }
   }
 
@@ -336,7 +373,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
     ).toSeq(partitionSchema)
   }
 
-  private def buildPartitionSchemaForNestedFields(schema: StructType, 
partitionColumns: Array[String]): StructType = {
+  def buildPartitionSchemaForNestedFields(schema: StructType, 
partitionColumns: Array[String]): StructType = {
     val partitionFields = partitionColumns.flatMap { partitionCol =>
       extractNestedField(schema, partitionCol)
     }
@@ -364,7 +401,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
     traverseSchema(schema, pathParts.toList, fieldPath)
   }
 
-  private def castStringToType(value: String, dataType: 
org.apache.spark.sql.types.DataType): Any = {
+  def castStringToType(value: String, dataType: 
org.apache.spark.sql.types.DataType): Any = {
     import org.apache.spark.sql.types._
 
     // handling cases where the value contains path separators or is complex
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 3de45e1754ee..29d2ee9f0ca5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -50,6 +50,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.util.BinaryUtil;
 import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.HoodieTableConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
@@ -335,6 +336,14 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("Key Generator type to determine key generator 
class");
 
+  public static final ConfigProperty<String> PARTITION_EXTRACTOR_CLASS = 
ConfigProperty
+      .key("hoodie.table.partition_extractor_class")
+      .noDefaultValue()
+      
.withInferFunction(HoodieTableConfigUtils::inferPartitionValueExtractorClass)
+      .markAdvanced()
+      .withDocumentation("Class which implements PartitionValueExtractor to 
extract the partition values, "
+          + "default is inferred based on partition configuration.");
+
   // TODO: this has to be UTC. why is it not the default?
   public static final ConfigProperty<HoodieTimelineTimeZone> TIMELINE_TIMEZONE 
= ConfigProperty
       .key("hoodie.table.timeline.timezone")
@@ -1208,6 +1217,10 @@ public class HoodieTableConfig extends HoodieConfig {
     return KeyGeneratorType.getKeyGeneratorClassName(this);
   }
 
+  public Option<String> getPartitionExtractorClass() {
+    return Option.ofNullable(getString(PARTITION_EXTRACTOR_CLASS));
+  }
+
   public HoodieTimelineTimeZone getTimelineTimezone() {
     return 
HoodieTimelineTimeZone.valueOf(getStringOrDefault(TIMELINE_TIMEZONE));
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 6ddd459e3d6b..e20d9d3a1220 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -1070,6 +1070,7 @@ public class HoodieTableMetaClient implements 
Serializable {
     private Boolean bootstrapIndexEnable;
     private Boolean populateMetaFields;
     private String keyGeneratorClassProp;
+    private String partitionValueExtractorClass;
     private String keyGeneratorType;
     private Boolean slashSeparatedDatePartitioning;
     private Boolean hiveStylePartitioningEnable;
@@ -1243,6 +1244,11 @@ public class HoodieTableMetaClient implements 
Serializable {
       return this;
     }
 
+    public TableBuilder setPartitionValueExtractorClass(String 
partitionValueExtractorClass) {
+      this.partitionValueExtractorClass = partitionValueExtractorClass;
+      return this;
+    }
+
     public TableBuilder setHiveStylePartitioningEnable(Boolean 
hiveStylePartitioningEnable) {
       this.hiveStylePartitioningEnable = hiveStylePartitioningEnable;
       return this;
@@ -1445,6 +1451,9 @@ public class HoodieTableMetaClient implements 
Serializable {
       if 
(hoodieConfig.contains(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING)) {
         
setSlashSeparatedDatePartitioning(hoodieConfig.getBoolean(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING));
       }
+      if (hoodieConfig.contains(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS)) {
+        
setPartitionValueExtractorClass(hoodieConfig.getString(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS));
+      }
       if 
(hoodieConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)) {
         
setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
       }
@@ -1588,6 +1597,9 @@ public class HoodieTableMetaClient implements 
Serializable {
       if (null != slashSeparatedDatePartitioning) {
         
tableConfig.setValue(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING, 
Boolean.toString(slashSeparatedDatePartitioning));
       }
+      if (null != partitionValueExtractorClass) {
+        tableConfig.setValue(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS, 
partitionValueExtractorClass);
+      }
       if (null != hiveStylePartitioningEnable) {
         tableConfig.setValue(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, 
Boolean.toString(hiveStylePartitioningEnable));
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
index b15c97164fa2..bc4db3789093 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
@@ -21,12 +21,12 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import java.util.Arrays;
 import java.util.stream.Collectors;
 
 public class HoodieTableConfigUtils {
-
   /**
    * This function returns the partition fields joined by 
BaseKeyGenerator.FIELD_SEPARATOR. It will also
    * include the key generator partition type with the field. The key 
generator partition type is used for
@@ -75,7 +75,7 @@ public class HoodieTableConfigUtils {
         ? 
partitionField.split(BaseKeyGenerator.CUSTOM_KEY_GENERATOR_SPLIT_REGEX)[0]
         : partitionField;
   }
-  
+
   /**
    * This function returns the hoodie.table.version from hoodie.properties 
file.
    */
@@ -84,4 +84,40 @@ public class HoodieTableConfigUtils {
         ? 
HoodieTableVersion.fromVersionCode(config.getInt(HoodieTableConfig.VERSION))
         : HoodieTableConfig.VERSION.defaultValue();
   }
+
+  /**
+   * Infers the appropriate PartitionValueExtractor class based on table 
configuration.
+   * This function determines the correct extractor based on the number of 
partition fields
+   * and partitioning style (Hive-style vs non-Hive-style).
+   *
+   * @param cfg HoodieConfig containing table configuration
+   * @return Option containing the inferred PartitionValueExtractor class name
+   */
+  public static Option<String> inferPartitionValueExtractorClass(HoodieConfig 
cfg) {
+    Option<String> partitionFieldsOpt = 
HoodieTableConfig.getPartitionFieldProp(cfg)
+        .or(() -> 
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)));
+    if (!partitionFieldsOpt.isPresent()) {
+      return Option.empty();
+    }
+
+    String partitionFields = partitionFieldsOpt.get();
+    if (StringUtils.nonEmpty(partitionFields)) {
+      int numOfPartFields = partitionFields.split(",").length;
+      if (numOfPartFields == 1) {
+        if 
(cfg.contains(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key())
+            && 
cfg.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key()).equals("true"))
 {
+          return 
Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor");
+        } else if 
(cfg.contains(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING)
+            && 
cfg.getString(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING).equals("true"))
 {
+          return 
Option.of("org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor");
+        } else {
+          return 
Option.of("org.apache.hudi.hive.SinglePartPartitionValueExtractor");
+        }
+      } else {
+        return Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor");
+      }
+    } else {
+      return Option.of("org.apache.hudi.hive.NonPartitionedExtractor");
+    }
+  }
 }
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java
 
b/hudi-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java
similarity index 100%
rename from 
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java
rename to 
hudi-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index a7b33421e72b..489a54837b82 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -650,6 +650,13 @@ public class FlinkOptions extends HoodieConfig {
           + "**Note** This is being actively worked on. Please use "
           + "`hoodie.datasource.write.keygenerator.class` instead.");
 
+  @AdvancedConfig
+  public static final ConfigOption<String> PARTITION_VALUE_EXTRACTOR = 
ConfigOptions
+      .key(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key())
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Partition value extractor class helps extract the 
partition value from partition paths");
+
   public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH";
   public static final String PARTITION_FORMAT_DAY = "yyyyMMdd";
   public static final String PARTITION_FORMAT_DASHED_DAY = "yyyy-MM-dd";
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index ecf7b73e312d..ffe94fcfd558 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -314,6 +314,7 @@ public class StreamerUtil {
           
.setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD.key(), 
null))
           .setKeyGeneratorClassProp(
               
conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName()))
+          
.setPartitionValueExtractorClass(conf.getString(FlinkOptions.PARTITION_VALUE_EXTRACTOR.key(),
 null))
           
.setHiveStylePartitioningEnable(conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING))
           
.setUrlEncodePartitioning(conf.get(FlinkOptions.URL_ENCODE_PARTITIONING))
           .setCDCEnabled(conf.get(FlinkOptions.CDC_ENABLED))
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 8f561425cdd7..9f84887b1dab 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -384,7 +384,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness 
{
   @Test
   void testDefinedTableConfigs() {
     List<ConfigProperty<?>> configProperties = 
HoodieTableConfig.definedTableConfigs();
-    assertEquals(43, configProperties.size());
+    assertEquals(44, configProperties.size());
     configProperties.forEach(c -> {
       assertNotNull(c);
       assertFalse(c.doc().isEmpty());
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 7e92b47f3321..111630b3694c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -290,6 +290,15 @@ object DataSourceReadOptions {
     .sinceVersion("1.1.0")
     .withDocumentation("Fully qualified class name of the catalog that is used 
by the Polaris spark client.")
 
+  val USE_PARTITION_VALUE_EXTRACTOR_ON_READ: ConfigProperty[String] = 
ConfigProperty
+    .key("hoodie.datasource.read.partition.value.extractor.enabled")
+    .defaultValue("false")
+    .markAdvanced()
+    .sinceVersion("1.2.0")
+    .withDocumentation("This config helps whether PartitionValueExtractor 
interface can be used" +
+      " for parsing partition values from partition path. When this config is 
enabled, it uses" +
+      " PartitionValueExtractor class value stored in the hoodie.properties 
file.")
+
   /** @deprecated Use {@link QUERY_TYPE} and its methods instead */
   @Deprecated
   val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
@@ -513,6 +522,14 @@ object DataSourceWriteOptions {
 
   val KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED: 
ConfigProperty[String] = 
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED
 
+  val PARTITION_EXTRACTOR_CLASS: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.partition_extractor_class")
+    .noDefaultValue()
+    .markAdvanced()
+    .sinceVersion("1.2.0")
+    .withDocumentation("PartitionValueExtractor implementation used by Spark 
datasource write/read paths " +
+      "to parse partition values from partition paths.")
+
   val ENABLE_ROW_WRITER: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.row.writer.enable")
     .defaultValue("true")
@@ -1074,6 +1091,12 @@ object DataSourceOptionsHelper {
     if (!params.contains(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key()) 
&& tableConfig.getKeyGeneratorClassName != null) {
       missingWriteConfigs ++= 
Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> 
tableConfig.getKeyGeneratorClassName)
     }
+    if 
(!params.contains(DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key())
+        && tableConfig.getPartitionExtractorClass.isPresent) {
+      missingWriteConfigs ++= Map(
+        DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key() -> 
tableConfig.getPartitionExtractorClass.get()
+      )
+    }
     if (!params.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()) && 
tableConfig.getPayloadClass != null) {
       missingWriteConfigs ++= 
Map(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> 
tableConfig.getPayloadClass)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 56280da8a9d7..ecd6a0188d17 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -477,6 +477,11 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     getPartitionColumnsAsInternalRowInternal(file,
       metaClient.getBasePath, extractPartitionValuesFromPartitionPath = true)
 
+  protected def usePartitionValueExtractorOnRead(optParams: Map[String, 
String], sparkSession: SparkSession): Boolean = {
+    
optParams.getOrElse(DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key,
+      
DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.defaultValue).toBoolean
+  }
+
   protected def getPartitionColumnsAsInternalRowInternal(file: 
StoragePathInfo, basePath: StoragePath,
                                                          
extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
     if (extractPartitionValuesFromPartitionPath) {
@@ -489,9 +494,10 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
         relativePath,
         basePath,
         tableStructSchema,
-        tableConfig.propsMap,
+        tableConfig,
         timeZoneId,
-        conf.getBoolean("spark.sql.sources.validatePartitionColumns", true))
+        conf.getBoolean("spark.sql.sources.validatePartitionColumns", true),
+        usePartitionValueExtractorOnRead(optParams, sparkSession))
       if(rowValues.length != partitionColumns.length) {
         throw new HoodieException("Failed to get partition column values from 
the partition-path:"
             + s"partition column size: ${partitionColumns.length}, parsed 
partition value size: ${rowValues.length}")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 34a67cfb3a5b..81b3471c8357 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -527,6 +527,14 @@ object HoodieFileIndex extends Logging {
       
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
 listingModeOverride)
     }
 
+    // Check if partition value extractor class need to be used during reads.
+    val usePartitionValueExtractorOnRead = getConfigValue(options, sqlConf,
+      DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key,
+      
DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.defaultValue())
+    
properties.setProperty(DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key,
+      usePartitionValueExtractorOnRead)
+
+    // Check if path filter optimized listing is enabled on reads.
     var pathFilterOptimizedListingEnabled = getConfigValue(options, sqlConf,
       
DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key, 
null)
     if (pathFilterOptimizedListingEnabled != null) {
@@ -545,6 +553,7 @@ object HoodieFileIndex extends Logging {
     if (tableConfig != null) {
       properties.setProperty(RECORDKEY_FIELD.key, 
tableConfig.getRecordKeyFields.orElse(Array.empty).mkString(","))
       properties.setProperty(PARTITIONPATH_FIELD.key, 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig).orElse(""))
+      
properties.setProperty(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key(), 
tableConfig.getPartitionExtractorClass.orElse(""))
 
       // for simple bucket index, we need to set the INDEX_TYPE, 
BUCKET_INDEX_HASH_FIELD, BUCKET_INDEX_NUM_BUCKETS
       val database = getDatabaseName(tableConfig, 
spark.catalog.currentDatabase)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f7db780e504c..eed8226183bb 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -291,6 +291,9 @@ class HoodieSparkSqlWriterInternal {
           if 
(StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)))
             
hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)
           else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig)
+        val partitionValueExtractorClassName = hoodieConfig.getStringOrDefault(
+          DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key(),
+          
hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
 null))
         val tableFormat = 
hoodieConfig.getStringOrDefault(HoodieTableConfig.TABLE_FORMAT)
         HoodieTableMetaClient.newTableBuilder()
           .setTableType(tableType)
@@ -310,6 +313,7 @@ class HoodieSparkSqlWriterInternal {
           
.setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
           
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
           .setKeyGeneratorClassProp(keyGenProp)
+          .setPartitionValueExtractorClass(partitionValueExtractorClassName)
           
.set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String, 
Object]])
           
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
           
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
@@ -749,7 +753,8 @@ class HoodieSparkSqlWriterInternal {
           
String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())
         ))
         val tableFormat = 
hoodieConfig.getStringOrDefault(HoodieTableConfig.TABLE_FORMAT)
-
+        val partitionValueExtractorClassName = hoodieConfig.getStringOrDefault(
+          DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key(), null)
         HoodieTableMetaClient.newTableBuilder()
           .setTableType(HoodieTableType.valueOf(tableType))
           .setTableName(tableName)
@@ -769,6 +774,7 @@ class HoodieSparkSqlWriterInternal {
           
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
           .setPopulateMetaFields(populateMetaFields)
           .setKeyGeneratorClassProp(keyGenProp)
+          .setPartitionValueExtractorClass(partitionValueExtractorClassName)
           
.set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String, 
Object]])
           
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
           
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index fdc41bcfbd51..e76af4a2733d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -323,6 +323,16 @@ object HoodieWriterUtils {
           && currentPartitionFields != tableConfigPartitionFields) {
           
diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n")
         }
+
+        // Validate partition value extractor
+        val currentPartitionValueExtractor = 
params.getOrElse(DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key(), null)
+        if (currentPartitionValueExtractor != null) {
+          val tableConfigPartitionValueExtractor = 
tableConfig.getString(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS)
+          if (tableConfigPartitionValueExtractor != null &&
+            
!currentPartitionValueExtractor.equals(tableConfigPartitionValueExtractor)) {
+            
diffConfigs.append(s"PartitionValueExtractor:\t$currentPartitionValueExtractor\t$tableConfigPartitionValueExtractor\n")
+          }
+        }
         // The value of `HoodieTableConfig.RECORD_MERGE_STRATEGY_ID` can be 
NULL or non-NULL.
         // The non-NULL value has been validated above in the regular code 
path.
         // Here we check the NULL case since if the value is NULL, the check 
is skipped above.
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 077e1129eb5e..a159610e0b00 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -122,6 +122,10 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
 
   protected lazy val shouldFastBootstrap = 
configProperties.getBoolean(DATA_QUERIES_ONLY.key, false)
 
+  protected lazy val usePartitionValueExtractorOnRead = 
configProperties.getBoolean(
+    DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key(),
+    
DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.defaultValue().toBoolean)
+
   /**
    * Get the partition schema from the hoodie.properties.
    */
@@ -436,9 +440,10 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
       partitionPath,
       getBasePath,
       schema,
-      metaClient.getTableConfig.propsMap,
+      metaClient.getTableConfig,
       configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, 
SQLConf.get.sessionLocalTimeZone),
-      shouldValidatePartitionColumns(spark))
+      shouldValidatePartitionColumns(spark),
+      usePartitionValueExtractorOnRead)
   }
 
   private def arePartitionPathsUrlEncoded: Boolean =
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index d461de145e5d..788fdbff5ca6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -20,14 +20,12 @@ package org.apache.spark.sql.catalyst.catalog
 import org.apache.hudi.{DataSourceOptionsHelper, HoodieSchemaConversionUtils}
 import org.apache.hudi.DataSourceWriteOptions.OPERATION
 import org.apache.hudi.HoodieWriterUtils._
-import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
TypedProperties}
+import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieConfig, TypedProperties}
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import 
org.apache.hudi.common.table.HoodieTableConfig.{HIVE_STYLE_PARTITIONING_ENABLE, 
URL_ENCODE_PARTITIONING}
 import org.apache.hudi.common.table.timeline.TimelineUtils
-import org.apache.hudi.common.util.StringUtils
-import org.apache.hudi.common.util.ValidationUtils
-import org.apache.hudi.common.util.ValidationUtils.checkArgument
+import org.apache.hudi.common.util.{HoodieTableConfigUtils, StringUtils, 
ValidationUtils}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.keygen.constant.{KeyGeneratorOptions, KeyGeneratorType}
@@ -61,7 +59,7 @@ import scala.collection.mutable
  */
 class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) 
extends Logging {
 
-  checkArgument(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
+  
ValidationUtils.checkArgument(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull
 == "hudi"
     || table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == 
"org.apache.hudi",
     s" ${table.qualifiedName} is not a Hudi table")
 
@@ -268,7 +266,7 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
         (tableSchema, options)
 
       case (_, false) =>
-        checkArgument(table.schema.nonEmpty,
+        ValidationUtils.checkArgument(table.schema.nonEmpty,
           s"Missing schema for Create Table: $catalogTableName")
         val schema = table.schema
         val options = extraTableConfig(tableExists = false, 
globalTableConfigs, sqlOptions) ++
@@ -345,6 +343,18 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
       extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
         DataSourceOptionsHelper.inferKeyGenClazz(primaryKeys, partitions)
     }
+    // Include Partition value extractor configuration.
+    if 
(originTableConfig.contains(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key)) {
+      extraConfig(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key) =
+        originTableConfig(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key)
+    } else {
+      val inferredClass = 
HoodieTableConfigUtils.inferPartitionValueExtractorClass(
+        new HoodieConfig(TypedProperties.fromMap(originTableConfig.asJava)))
+      if (inferredClass.isPresent) {
+        extraConfig(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key) = 
inferredClass.get()
+      }
+    }
+
     extraConfig.toMap
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 1cc39fa37af0..48bb7cf20f30 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -25,10 +25,10 @@ import 
org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonCo
 import org.apache.hudi.common.model.WriteOperationType
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME
-import org.apache.hudi.common.util.{ConfigUtils, ReflectionUtils, StringUtils}
+import org.apache.hudi.common.util.{ConfigUtils, HoodieTableConfigUtils, 
ReflectionUtils, StringUtils}
 import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig, 
HoodieWriteConfig}
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
-import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, 
MultiPartKeysValueExtractor}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, 
NonPartitionedExtractor}
 import org.apache.hudi.hive.ddl.HiveSyncMode
 import org.apache.hudi.index.HoodieIndex
 import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType
@@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory
 
 import java.util.Locale
 
-import scala.collection.JavaConverters
 import scala.collection.JavaConverters._
 
 trait ProvidesHoodieConfig extends Logging {
@@ -460,7 +459,25 @@ trait ProvidesHoodieConfig extends Logging {
     if (props.get(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key) != null) {
       hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS, 
props.getString(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key))
     }
-    
hiveSyncConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS,
 classOf[MultiPartKeysValueExtractor].getName)
+    // If partition value extractor is present in params Object we can use it.
+    // Here as well it checks first for 
hoodie.datasource.hive_sync.partition_extractor_class and then
+    // for hoodie.datasource.partition_extractor_class config.
+    // If either of the above configs are not provided then we can try 
inferring the partition extractor class name.
+    // Even if that fails, then we can assume it to be a NonPartitionExtractor.
+    val inferredPartitionValueExtractorClass = 
HoodieTableConfigUtils.inferPartitionValueExtractorClass(tableConfig)
+    if 
(props.containsKey(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key())) 
{
+      
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
+        
props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()))
+    } else if 
(props.containsKey(DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key())) {
+      
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
+        
props.getString(DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key()))
+    } else if (inferredPartitionValueExtractorClass.isPresent) {
+      
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
+        inferredPartitionValueExtractorClass.get())
+    } else {
+      
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
 classOf[NonPartitionedExtractor].getName)
+    }
+
     // This is hardcoded to true to ensure consistency as Spark syncs 
TIMESTAMP types as TIMESTAMP by default
     // via Spark's externalCatalog API, which is used by 
AlterHoodieTableCommand.
     
hiveSyncConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE,
 "true")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
index 90dfb4f83d72..59b2b444211b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
@@ -135,6 +135,7 @@ object CreateHoodieTableCommand {
       checkTableConfigEqual(originTableConfig, tableOptions, 
HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
       checkTableConfigEqual(originTableConfig, tableOptions, 
HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
       checkTableConfigEqual(originTableConfig, tableOptions, 
HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.key)
+      checkTableConfigEqual(originTableConfig, tableOptions, 
HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key)
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashKeyGenerator.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashKeyGenerator.scala
new file mode 100644
index 000000000000..a49c1c356640
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashKeyGenerator.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.keygen.{BuiltinKeyGenerator, ComplexAvroKeyGenerator, 
KeyGenUtils}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.Arrays
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Key generator that converts partition values to slash-separated partition 
paths.
+ *
+ * This generator is useful when you have partition columns like:
+ * - datestr: "yyyy-mm-dd" format
+ * - country, state, city: regular string values
+ *
+ * And you want to create partition paths like: yyyy/mm/dd/country/state/city
+ *
+ * The first partition field (typically a date) will have its hyphens replaced 
with slashes.
+ * All partition fields are then combined with "/" as the separator.
+ */
+class MockSlashKeyGenerator(props: TypedProperties) extends 
BuiltinKeyGenerator(props) {
+
+  private val complexAvroKeyGenerator: ComplexAvroKeyGenerator = new 
ComplexAvroKeyGenerator(props)
+
+  this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props)
+  this.partitionPathFields = 
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+    .split(",")
+    .map(_.trim)
+    .filter(_.nonEmpty)
+    .toList
+    .asJava
+
+  override def getRecordKey(record: GenericRecord): String = {
+    complexAvroKeyGenerator.getRecordKey(record)
+  }
+
+  override def getPartitionPath(record: GenericRecord): String = {
+    complexAvroKeyGenerator.getPartitionPath(record).replace('-', '/')
+  }
+
+  override def getRecordKey(row: Row): String = {
+    tryInitRowAccessor(row.schema)
+    combineRecordKey(getRecordKeyFieldNames, 
Arrays.asList(rowAccessor.getRecordKeyParts(row): _*))
+  }
+
+  override def getRecordKey(internalRow: InternalRow, schema: StructType): 
UTF8String = {
+    tryInitRowAccessor(schema)
+    combineRecordKeyUnsafe(getRecordKeyFieldNames, 
Arrays.asList(rowAccessor.getRecordKeyParts(internalRow): _*))
+  }
+
+  override def getPartitionPath(row: Row): String = {
+    tryInitRowAccessor(row.schema)
+    val partitionValues = rowAccessor.getRecordPartitionPathValues(row)
+    formatPartitionPath(partitionValues)
+  }
+
+  override def getPartitionPath(row: InternalRow, schema: StructType): 
UTF8String = {
+    tryInitRowAccessor(schema)
+    val partitionValues = rowAccessor.getRecordPartitionPathValues(row)
+    UTF8String.fromString(formatPartitionPath(partitionValues))
+  }
+
+  /**
+   * Formats the partition path by:
+   * 1. Converting the first partition value (date) from "yyyy-mm-dd" to 
"yyyy/mm/dd"
+   * 2. Combining all partition values with "/" separator
+   *
+   * @param partitionValues Array of partition field values
+   * @return Formatted partition path like "yyyy/mm/dd/country/state/city"
+   */
+  private def formatPartitionPath(partitionValues: Array[Object]): String = {
+    if (partitionValues == null || partitionValues.length == 0) {
+      ""
+    } else {
+      val partitionPath = new StringBuilder()
+
+      for (i <- partitionValues.indices) {
+        if (i > 0) {
+          partitionPath.append("/")
+        }
+
+        var value = getPartitionValue(partitionValues(i))
+
+        // For the first partition field (typically the date), replace hyphens 
with slashes
+        if (i == 0 && value.contains("-")) {
+          value = value.replace("-", "/")
+        }
+
+        partitionPath.append(value)
+      }
+
+      partitionPath.toString()
+    }
+  }
+
+  /**
+   * Extracts the string value from a partition field value object.
+   *
+   * @param value The partition field value
+   * @return String representation of the value
+   */
+  private def getPartitionValue(value: Object): String = {
+    value match {
+      case null => ""
+      case utf8: UTF8String => utf8.toString
+      case _ => String.valueOf(value)
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashPartitionValueExtractor.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashPartitionValueExtractor.scala
new file mode 100644
index 000000000000..b922473c729c
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashPartitionValueExtractor.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.sync.common.model.PartitionValueExtractor
+
+import java.util
+
+class MockSlashPartitionValueExtractor extends PartitionValueExtractor {
+
+  /**
+   *
+   * This method will extract Partition values from the partition path
+   * and provide date, country, state, city values separately.
+   * @param partitionPath PartitionPath provided will be in the format 
yyyy/mm/dd/country/stat/city
+   * @return is a List of string with values template such as yyyy-mm-dd, 
country, state, city
+   */
+  override def extractPartitionValuesInPath(partitionPath: String): 
util.List[String] = {
+    val partitionSplitsSeq = partitionPath.split("/")
+    val date = 
s"${partitionSplitsSeq(0)}-${partitionSplitsSeq(1)}-${partitionSplitsSeq(2)}"
+    val country = partitionSplitsSeq(3)
+    val state = partitionSplitsSeq(4)
+    val city = partitionSplitsSeq(5)
+    java.util.Arrays.asList(date, country, state, city)
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala
new file mode 100644
index 000000000000..ca1aa02b1ffd
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.hudi.storage.StoragePath
+
+import org.junit.jupiter.api.Assertions.assertTrue
+
+import java.util.stream.Collectors
+
+class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase {
+
+  test("Test custom partition value extractor interface") {
+    withTempDir { tmp =>
+      val targetTable = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+      // Enable partition value extractor on read
+      
spark.conf.set("hoodie.datasource.read.partition.value.extractor.enabled", 
"true")
+
+      spark.sql(
+        s"""
+           |create table $targetTable (
+           |  `id` string,
+           |  `name` string,
+           |  `ts` bigint,
+           |  `datestr` string,
+           |  `country` string,
+           |  `state` string,
+           |  `city` string
+           |) using hudi
+           | tblproperties (
+           |  'primaryKey' = 'id',
+           |  'type' = 'COW',
+           |  'preCombineField'='ts',
+           |  'hoodie.datasource.write.hive_style_partitioning'='false',
+           |  
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator',
+           |  
'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator',
+           |  
'${DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor',
+           |  
'${HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor'
+           | )
+           | partitioned by (`datestr`, `country`, `state`, `city`)
+           | location '$tablePath'
+        """.stripMargin)
+      // yyyy/mm/dd/country/state/city
+      import spark.implicits._
+      val df = Seq(
+        ("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        ("2", "a2", 2000, "2024-01-01", "USA", "TX", "AU"),
+        ("3", "a3", 3000, "2024-01-02", "USA", "CA", "LA"),
+        ("4", "a4", 4000, "2024-01-02", "USA", "WA", "SEA"),
+        ("5", "a5", 5000, "2024-01-03", "USA", "CA", "SFO")
+      ).toDF("id", "name", "ts", "datestr", "country", "state", "city")
+
+      df.write
+        .format("hudi")
+        .mode("append")
+        .save(tablePath)
+
+      // check result after insert and merge data into target table
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable"
+        + s" where state = 'CA'")(
+        Seq("1", "a1", 1000L, "2024-01-01", "USA", "CA", "SFO"),
+        Seq("3", "a3", 3000L, "2024-01-02", "USA", "CA", "LA"),
+        Seq("5", "a5", 5000L, "2024-01-03", "USA", "CA", "SFO")
+      )
+
+      // Verify table config has custom partition value extractor class set
+      val metaClient = HoodieTableMetaClient.builder()
+        
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+        .setBasePath(tablePath)
+        .build()
+      val tableConfig = metaClient.getTableConfig
+      val partitionExtractorClass = tableConfig.getProps.getProperty(
+        HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key())
+      assertTrue(partitionExtractorClass == 
"org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor",
+        s"Table config should have custom partition value extractor class set 
to MockSlashPartitionValueExtractor, but got $partitionExtractorClass")
+
+      // Verify that partition paths are created with slash separated format 
(yyyy/MM/dd/country/state/city)
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/01/USA/CA/SFO")),
+        s"Partition path 2024/01/01/USA/CA/SFO should exist")
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/01/USA/TX/AU")),
+        s"Partition path 2024/01/01/USA/TX/AU should exist")
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/02/USA/CA/LA")),
+        s"Partition path 2024/01/02/USA/CA/LA should exist")
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/02/USA/WA/SEA")),
+        s"Partition path 2024/01/02/USA/WA/SEA should exist")
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/03/USA/CA/SFO")),
+        s"Partition path 2024/01/03/USA/CA/SFO should exist")
+
+      val engine = new HoodieSparkEngineContext(spark.sparkContext)
+      val storage = metaClient.getStorage()
+      val metadataConfig = HoodieMetadataConfig.newBuilder().build()
+      val metadataTable = new HoodieBackedTableMetadata(engine, storage, 
metadataConfig, tablePath)
+      val partitionPaths = metadataTable.getAllPartitionPaths
+      assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO"))
+      assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU"))
+      assertTrue(partitionPaths.contains("2024/01/02/USA/CA/LA"))
+      assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA"))
+      assertTrue(partitionPaths.contains("2024/01/03/USA/CA/SFO"))
+      metadataTable.close()
+    }
+  }
+
+  test("Test custom partition value extractor with partition pruning and 
filtering") {
+    withTempDir { tmp =>
+      val targetTable = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+      // Enable partition value extractor on read
+      
spark.conf.set("hoodie.datasource.read.partition.value.extractor.enabled", 
"true")
+
+      spark.sql(
+        s"""
+           |create table $targetTable (
+           |  `id` string,
+           |  `name` string,
+           |  `ts` bigint,
+           |  `datestr` string,
+           |  `country` string,
+           |  `state` string,
+           |  `city` string
+           |) using hudi
+           | tblproperties (
+           |  'primaryKey' = 'id',
+           |  'type' = 'COW',
+           |  'preCombineField'='ts',
+           |  'hoodie.datasource.write.hive_style_partitioning'='false',
+           |  
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator',
+           |  
'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator',
+           |  
'${DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor',
+           |  
'${HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor'
+           | )
+           | partitioned by (`datestr`, `country`, `state`, `city`)
+           | location '$tablePath'
+        """.stripMargin)
+
+      // Insert data across multiple partitions
+      import spark.implicits._
+      val df = Seq(
+        ("1", "a1", 1000L, "2024-01-01", "USA", "CA", "SFO"),
+        ("2", "a2", 2000L, "2024-01-01", "USA", "CA", "LA"),
+        ("3", "a3", 3000L, "2024-01-01", "USA", "TX", "AU"),
+        ("4", "a4", 4000L, "2024-01-02", "USA", "CA", "SFO"),
+        ("5", "a5", 5000L, "2024-01-02", "USA", "WA", "SEA"),
+        ("6", "a6", 6000L, "2024-01-03", "USA", "CA", "LA"),
+        ("7", "a7", 7000L, "2024-01-03", "CAN", "ON", "TOR"),
+        ("8", "a8", 8000L, "2024-01-04", "USA", "NY", "NYC")
+      ).toDF("id", "name", "ts", "datestr", "country", "state", "city")
+
+      df.write
+        .format("hudi")
+        .mode("append")
+        .save(tablePath)
+
+      // Test partition pruning with single partition column filter (state)
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where state = 'CA' order by id")(
+        Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+        Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO"),
+        Seq("6", "a6", 6000, "2024-01-03", "USA", "CA", "LA")
+      )
+
+      // Test partition pruning with multiple partition column filters
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where state = 'CA' and city = 'SFO' order by id")(
+        Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO")
+      )
+
+      // Test partition pruning with date filter
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where datestr = '2024-01-01' order by id")(
+        Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+        Seq("3", "a3", 3000, "2024-01-01", "USA", "TX", "AU")
+      )
+
+      // Test partition pruning with country filter
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where country = 'CAN' order by id")(
+        Seq("7", "a7", 7000, "2024-01-03", "CAN", "ON", "TOR")
+      )
+
+      // Test partition pruning with combined date and state filter
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where datestr = '2024-01-02' and state = 'CA' order by id")(
+        Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO")
+      )
+
+      // Test partition pruning with IN clause
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where state IN ('CA', 'NY') order by id")(
+        Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+        Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO"),
+        Seq("6", "a6", 6000, "2024-01-03", "USA", "CA", "LA"),
+        Seq("8", "a8", 8000, "2024-01-04", "USA", "NY", "NYC")
+      )
+
+      // Test reading with _hoodie_partition_path to verify custom partition 
format
+      checkAnswer(s"select id, _hoodie_partition_path from $targetTable where 
state = 'CA' order by id")(
+        Seq("1", "2024/01/01/USA/CA/SFO"),
+        Seq("2", "2024/01/01/USA/CA/LA"),
+        Seq("4", "2024/01/02/USA/CA/SFO"),
+        Seq("6", "2024/01/03/USA/CA/LA")
+      )
+
+      // Verify partition pruning works by corrupting a parquet file in a 
partition that won't be queried
+      // We'll corrupt a file in the WA/SEA partition and query for CA - if 
partition pruning works, the query succeeds
+      val metaClient = HoodieTableMetaClient.builder()
+        
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+        .setBasePath(tablePath)
+        .build()
+
+      // Find and corrupt a parquet file in the 2024/01/02/USA/WA/SEA partition
+      val partitionPathForCorruption = new StoragePath(tablePath, 
"2024/01/02/USA/WA/SEA")
+      val storage = metaClient.getStorage
+      val parquetFiles = 
storage.listDirectEntries(partitionPathForCorruption).stream()
+        .filter(fileStatus => fileStatus.getPath.getName.endsWith(".parquet") 
&& !fileStatus.getPath.getName.startsWith("."))
+        .collect(Collectors.toList())
+
+      assertTrue(!parquetFiles.isEmpty, "Should have at least one parquet file 
in WA/SEA partition")
+
+      // Corrupt the first parquet file by writing garbage data
+      val fileToCorrupt = parquetFiles.get(0).getPath
+      val outputStream = storage.create(fileToCorrupt, true)
+      try {
+        outputStream.write("CORRUPTED_DATA".getBytes())
+      } finally {
+        outputStream.close()
+      }
+
+      // Query for state = 'CA' should still succeed because partition pruning 
avoids the corrupted WA partition
+      checkAnswer(s"select id, name, state from $targetTable where state = 
'CA' order by id")(
+        Seq("1", "a1", "CA"),
+        Seq("2", "a2", "CA"),
+        Seq("4", "a4", "CA"),
+        Seq("6", "a6", "CA")
+      )
+
+      // Similarly, query for state = 'TX' should succeed
+      checkAnswer(s"select id, name, state from $targetTable where state = 
'TX' order by id")(
+        Seq("3", "a3", "TX")
+      )
+
+      // Create DataFrame and analyze query plan to verify partition pruning
+      val dfWithStateFilter = spark.sql(s"select * from $targetTable where 
state = 'CA'")
+      val planWithStateFilter = 
dfWithStateFilter.queryExecution.executedPlan.toString()
+      // Verify partition filters are pushed down
+      assertTrue(planWithStateFilter.contains("PartitionFilters") || 
planWithStateFilter.contains("PushedFilters"),
+        s"Query plan should contain partition filters for state column")
+
+      // Test DataFrame API with multiple partition filters
+      val dfWithMultipleFilters = spark.table(targetTable)
+        .filter("state = 'CA' and datestr = '2024-01-01'")
+      val planWithMultipleFilters = 
dfWithMultipleFilters.queryExecution.executedPlan.toString()
+      assertTrue(planWithMultipleFilters.contains("PartitionFilters") || 
planWithMultipleFilters.contains("PushedFilters"),
+        s"Query plan should contain partition filters for multiple columns")
+
+      // Verify the filtered results
+      val multiFilterResults = dfWithMultipleFilters.select("id", "name", 
"state", "datestr").orderBy("id").collect()
+      assertTrue(multiFilterResults.length == 2, s"Expected 2 rows but got 
${multiFilterResults.length}")
+      assertTrue(multiFilterResults(0).getString(0) == "1", s"First row id 
should be 1")
+      assertTrue(multiFilterResults(0).getString(1) == "a1", s"First row name 
should be a1")
+      assertTrue(multiFilterResults(0).getString(2) == "CA", s"First row state 
should be CA")
+      assertTrue(multiFilterResults(0).getString(3) == "2024-01-01", s"First 
row datestr should be 2024-01-01")
+      assertTrue(multiFilterResults(1).getString(0) == "2", s"Second row id 
should be 2")
+      assertTrue(multiFilterResults(1).getString(1) == "a2", s"Second row name 
should be a2")
+
+      // Test DataFrame with country filter
+      val dfWithCountryFilter = spark.table(targetTable).filter("country = 
'CAN'")
+      val planWithCountryFilter = 
dfWithCountryFilter.queryExecution.executedPlan.toString()
+      assertTrue(planWithCountryFilter.contains("PartitionFilters") || 
planWithCountryFilter.contains("PushedFilters"),
+        s"Query plan should contain partition filters for country column")
+
+      val countryFilterResults = dfWithCountryFilter.select("id", 
"country").orderBy("id").collect()
+      assertTrue(countryFilterResults.length == 1, s"Expected 1 row but got 
${countryFilterResults.length}")
+      assertTrue(countryFilterResults(0).getString(0) == "7", s"Row id should 
be 7")
+      assertTrue(countryFilterResults(0).getString(1) == "CAN", s"Row country 
should be CAN")
+
+      // Verify all partitions exist as expected
+      val engine = new HoodieSparkEngineContext(spark.sparkContext)
+      val metadataConfig = HoodieMetadataConfig.newBuilder().build()
+      val metadataTable = new HoodieBackedTableMetadata(engine, storage, 
metadataConfig, tablePath)
+      val partitionPaths = metadataTable.getAllPartitionPaths
+
+      // Verify expected partition paths
+      assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO"))
+      assertTrue(partitionPaths.contains("2024/01/01/USA/CA/LA"))
+      assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU"))
+      assertTrue(partitionPaths.contains("2024/01/02/USA/CA/SFO"))
+      assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA"))
+      assertTrue(partitionPaths.contains("2024/01/03/USA/CA/LA"))
+      assertTrue(partitionPaths.contains("2024/01/03/CAN/ON/TOR"))
+      assertTrue(partitionPaths.contains("2024/01/04/USA/NY/NYC"))
+
+      metadataTable.close()
+    }
+  }
+
+  test("Test custom partition value extractor with URL encoding") {
+    withTempDir { tmp =>
+      val targetTable = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+      // Enable partition value extractor on read
+      
spark.conf.set("hoodie.datasource.read.partition.value.extractor.enabled", 
"true")
+
+      spark.sql(
+        s"""
+           |create table $targetTable (
+           |  `id` string,
+           |  `name` string,
+           |  `ts` bigint,
+           |  `datestr` string,
+           |  `country` string,
+           |  `state` string,
+           |  `city` string
+           |) using hudi
+           | tblproperties (
+           |  'primaryKey' = 'id',
+           |  'type' = 'COW',
+           |  'preCombineField'='ts',
+           |  'hoodie.datasource.write.hive_style_partitioning'='false',
+           |  
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator',
+           |  
'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator',
+           |  
'${DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor',
+           |  
'${HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor',
+           |  'hoodie.datasource.write.partitionpath.urlencode'='true'
+           | )
+           | partitioned by (`datestr`, `country`, `state`, `city`)
+           | location '$tablePath'
+        """.stripMargin)
+
+      // Insert data with special characters that will be URL encoded
+      import spark.implicits._
+      val df = Seq(
+        ("1", "a1", 1000L, "2024-01-01", "USA", "CA", "San Francisco"),
+        ("2", "a2", 2000L, "2024-01-01", "USA", "TX", "Austin+Dallas"),
+        ("3", "a3", 3000L, "2024-01-02", "USA", "CA", "Los Angeles")
+      ).toDF("id", "name", "ts", "datestr", "country", "state", "city")
+
+      df.write
+        .format("hudi")
+        .mode("append")
+        .save(tablePath)
+
+      // Verify that data can be read back correctly with URL-encoded 
partition paths
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where city = 'San Francisco' order by id")(
+        Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "San Francisco")
+      )
+
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where city = 'Austin+Dallas' order by id")(
+        Seq("2", "a2", 2000, "2024-01-01", "USA", "TX", "Austin+Dallas")
+      )
+
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where city = 'Los Angeles' order by id")(
+        Seq("3", "a3", 3000, "2024-01-02", "USA", "CA", "Los Angeles")
+      )
+
+      // Verify all data can be read
+      val allData = spark.sql(s"select id, city from $targetTable order by 
id").collect()
+      assertTrue(allData.length == 3, s"Expected 3 rows but got 
${allData.length}")
+      assertTrue(allData(0).getString(1) == "San Francisco", "First row city 
should be San Francisco")
+      assertTrue(allData(1).getString(1) == "Austin+Dallas", "Second row city 
should be Austin+Dallas")
+      assertTrue(allData(2).getString(1) == "Los Angeles", "Third row city 
should be Los Angeles")
+    }
+  }
+}
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
index 890c9712f714..53698d492d22 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.HadoopConfigUtils;
+import org.apache.hudi.common.util.HoodieTableConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
@@ -51,8 +52,6 @@ import static 
org.apache.hudi.common.config.HoodieCommonConfig.META_SYNC_BASE_PA
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
 import static org.apache.hudi.common.table.HoodieTableConfig.BASE_FILE_FORMAT;
 import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME;
-import static 
org.apache.hudi.common.table.HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING;
-import static 
org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_NAME_KEY;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING;
@@ -119,40 +118,10 @@ public class HoodieSyncConfig extends HoodieConfig {
   public static final ConfigProperty<String> 
META_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigProperty
       .key("hoodie.datasource.hive_sync.partition_extractor_class")
       .defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor")
-      .withInferFunction(cfg -> {
-        Option<String> partitionFieldsOpt;
-        if (StringUtils.nonEmpty(cfg.getString(META_SYNC_PARTITION_FIELDS))) {
-          partitionFieldsOpt = 
Option.ofNullable(cfg.getString(META_SYNC_PARTITION_FIELDS));
-        } else {
-          partitionFieldsOpt = HoodieTableConfig.getPartitionFieldProp(cfg)
-              .or(() -> 
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)));
-        }
-        if (!partitionFieldsOpt.isPresent()) {
-          return Option.empty();
-        }
-        String partitionFields = partitionFieldsOpt.get();
-        if (StringUtils.nonEmpty(partitionFields)) {
-          int numOfPartFields = partitionFields.split(",").length;
-          if (numOfPartFields == 1) {
-            if (cfg.contains(HIVE_STYLE_PARTITIONING_ENABLE)
-                && 
cfg.getString(HIVE_STYLE_PARTITIONING_ENABLE).equals("true")) {
-              return 
Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor");
-            } else if (cfg.contains(SLASH_SEPARATED_DATE_PARTITIONING)
-                && 
cfg.getString(SLASH_SEPARATED_DATE_PARTITIONING).equals("true")) {
-              return 
Option.of("org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor");
-            } else {
-              return 
Option.of("org.apache.hudi.hive.SinglePartPartitionValueExtractor");
-            }
-          } else {
-            return 
Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor");
-          }
-        } else {
-          return Option.of("org.apache.hudi.hive.NonPartitionedExtractor");
-        }
-      })
+      
.withInferFunction(HoodieTableConfigUtils::inferPartitionValueExtractorClass)
       .markAdvanced()
       .withDocumentation("Class which implements PartitionValueExtractor to 
extract the partition values, "
-          + "default 'org.apache.hudi.hive.MultiPartKeysValueExtractor'.");
+          + "default is inferred based on partition configuration.");
 
   public static final ConfigProperty<Boolean> META_SYNC_DECODE_PARTITION = 
ConfigProperty
       .key("hoodie.meta.sync.decode_partition")
diff --git 
a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncConfig.java
 
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncConfig.java
index 7d9472e82e3a..cdffa2b91c4d 100644
--- 
a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncConfig.java
+++ 
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncConfig.java
@@ -146,6 +146,22 @@ class TestHoodieSyncConfig {
     HoodieSyncConfig config5 = new HoodieSyncConfig(props5, new 
Configuration());
     assertEquals("org.apache.hudi.hive.SinglePartPartitionValueExtractor",
         config5.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+
+    Properties props6 = new Properties();
+    props6.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), 
"foo,bar");
+    props6.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(), "foo");
+    HoodieSyncConfig config6 = new HoodieSyncConfig(props6, new 
Configuration());
+    assertEquals("org.apache.hudi.hive.SinglePartPartitionValueExtractor",
+        config6.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS),
+        String.format("should infer from %s when explicitly configured",
+            HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key()));
+
+    Properties props7 = new Properties();
+    props7.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(), "foo");
+    
props7.setProperty(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.key(), 
"true");
+    HoodieSyncConfig config7 = new HoodieSyncConfig(props7, new 
Configuration());
+    assertEquals("org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor",
+        config7.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
   }
 
   @Test
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index d1db2855a6b3..98cd42b1b8f5 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -91,6 +91,7 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
 import org.apache.hudi.sync.common.util.SyncUtilHelpers;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.util.JavaScalaConverters;
@@ -452,7 +453,10 @@ public class StreamSync implements Serializable, Closeable 
{
       payloadClass = overridingMergeConfigs.get().getMiddle();
       mergeStrategyId = overridingMergeConfigs.get().getRight();
     }
-
+    String partitionValueExtractorClassName = props.getString(
+        DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS().key(),
+        
props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
+            
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.defaultValue()));
     return tableBuilder.setTableType(cfg.tableType)
         .setTableName(cfg.targetTableName)
         .setArchiveLogFolder(TIMELINE_HISTORY_PATH.defaultValue())
@@ -466,6 +470,7 @@ public class StreamSync implements Serializable, Closeable {
         
.setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
             HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))
         .setKeyGeneratorClassProp(keyGenClassName)
+        .setPartitionValueExtractorClass(partitionValueExtractorClassName)
         .setOrderingFields(cfg.sourceOrderingFields)
         
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
             
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))

Reply via email to