This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1203b215b03c feat: Use PartitionValueExtractor interface in Spark
reader path (#17850)
1203b215b03c is described below
commit 1203b215b03cb5c76fffb46a360efe0c4ce21fe0
Author: Surya Prasanna <[email protected]>
AuthorDate: Sun Mar 1 21:56:36 2026 -0800
feat: Use PartitionValueExtractor interface in Spark reader path (#17850)
Users can now configure custom partition value extractors for read
operations using the hoodie.datasource.read.partition.value.extractor.class
option, enabling support for non-standard partition path formats.
Changes:
Moved PartitionValueExtractor interface from hudi-sync-common to
hudi-common for broader accessibility
Added `hoodie.datasource.partition_extractor_class` as writer property and
`hoodie.table.partition_extractor_class` as table property to assist with the
same.
Impact
Public API Changes:
New config option: hoodie.datasource.read.partition.value.extractor.class
for Spark reads
PartitionValueExtractor interface relocated from
org.apache.hudi.sync.common.model to org.apache.hudi.hive.sync
User-Facing Changes:
Users can now customize partition value extraction during read operations
by providing a custom PartitionValueExtractor implementation
---
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 53 ++-
.../hudi/common/table/HoodieTableConfig.java | 13 +
.../hudi/common/table/HoodieTableMetaClient.java | 12 +
.../hudi/common/util/HoodieTableConfigUtils.java | 40 ++-
.../sync/common/model/PartitionValueExtractor.java | 0
.../apache/hudi/configuration/FlinkOptions.java | 7 +
.../java/org/apache/hudi/util/StreamerUtil.java | 1 +
.../hudi/common/table/TestHoodieTableConfig.java | 2 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 23 ++
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 10 +-
.../scala/org/apache/hudi/HoodieFileIndex.scala | 9 +
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 8 +-
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 10 +
.../apache/hudi/SparkHoodieTableFileIndex.scala | 9 +-
.../sql/catalyst/catalog/HoodieCatalogTable.scala | 22 +-
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 25 +-
.../hudi/command/CreateHoodieTableCommand.scala | 1 +
.../sql/hudi/common/MockSlashKeyGenerator.scala | 134 +++++++
.../common/MockSlashPartitionValueExtractor.scala | 40 +++
.../common/TestCustomParitionValueExtractor.scala | 386 +++++++++++++++++++++
.../apache/hudi/sync/common/HoodieSyncConfig.java | 37 +-
.../hudi/sync/common/TestHoodieSyncConfig.java | 16 +
.../apache/hudi/utilities/streamer/StreamSync.java | 7 +-
23 files changed, 804 insertions(+), 61 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 08c315b0e78d..03294b83c0f2 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.TimestampKeyGeneratorConfig
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.schema.HoodieSchema
-import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.common.util.{StringUtils, Option => HOption}
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.hudi.storage.StoragePath
@@ -31,6 +31,8 @@ import org.apache.hudi.util.ExceptionWrappingIterator
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.sync.common.model.PartitionValueExtractor
+
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
@@ -224,11 +226,13 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
partitionPath: String,
tableBasePath: StoragePath,
tableSchema: StructType,
- tableConfig: java.util.Map[String, String],
+ tableConfig: HoodieTableConfig,
timeZoneId: String,
- shouldValidatePartitionColumns: Boolean):
Array[Object] = {
+ shouldValidatePartitionColumns: Boolean,
+ usePartitionValueExtractorOnRead: Boolean):
Array[Object] = {
val keyGeneratorClass =
KeyGeneratorType.getKeyGeneratorClassName(tableConfig)
- val timestampKeyGeneratorType =
tableConfig.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
+ val timestampKeyGeneratorType =
tableConfig.propsMap().get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
+ val partitionValueExtractorClass =
tableConfig.getPartitionExtractorClass.orElse("")
if (null != keyGeneratorClass
&& null != timestampKeyGeneratorType
@@ -238,10 +242,43 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
// we couldn't reconstruct initial partition column values from
partition paths due to lost data after formatting.
// But the output for these cases is in a string format, so we can pass
partitionPath as UTF8String
Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath))
+ } else if(usePartitionValueExtractorOnRead &&
!StringUtils.isNullOrEmpty(partitionValueExtractorClass)) {
+
parsePartitionValuesBasedOnPartitionValueExtractor(partitionValueExtractorClass,
partitionPath,
+ partitionColumns, tableSchema)
} else {
doParsePartitionColumnValues(partitionColumns, partitionPath,
tableBasePath, tableSchema, timeZoneId,
- shouldValidatePartitionColumns,
tableConfig.getOrDefault(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.key,
-
HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.defaultValue).toBoolean)
+ shouldValidatePartitionColumns,
tableConfig.getSlashSeparatedDatePartitioning)
+ }
+ }
+
+ /**
+ * Parses partition values from partition path using a custom
PartitionValueExtractor.
+ *
+ * @param partitionValueExtractorClass Fully qualified class name of the
PartitionValueExtractor implementation
+ * @param partitionPath The partition path to extract values from
+ * @param partitionColumns Array of partition column names
+ * @param tableSchema The schema of the table
+ * @return Array of partition values as Objects, properly typed according to
the schema
+ */
+ private def parsePartitionValuesBasedOnPartitionValueExtractor(
+ partitionValueExtractorClass: String,
+ partitionPath: String,
+ partitionColumns: Array[String],
+ tableSchema: StructType): Array[Object] = {
+ try {
+ val partitionValueExtractor = Class.forName(partitionValueExtractorClass)
+ .getDeclaredConstructor()
+ .newInstance()
+ .asInstanceOf[PartitionValueExtractor]
+ val partitionValues =
partitionValueExtractor.extractPartitionValuesInPath(partitionPath).asScala.toArray
+ val partitionSchema = buildPartitionSchemaForNestedFields(tableSchema,
partitionColumns)
+ val typedValues = partitionValues.zip(partitionSchema.fields).map { case
(stringValue, field) =>
+ castStringToType(stringValue, field.dataType)
+ }
+ typedValues.map(_.asInstanceOf[Object])
+ } catch {
+ case e: Exception =>
+ throw new RuntimeException(s"Failed to extract partition value using
$partitionValueExtractorClass class", e)
}
}
@@ -336,7 +373,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
).toSeq(partitionSchema)
}
- private def buildPartitionSchemaForNestedFields(schema: StructType,
partitionColumns: Array[String]): StructType = {
+ def buildPartitionSchemaForNestedFields(schema: StructType,
partitionColumns: Array[String]): StructType = {
val partitionFields = partitionColumns.flatMap { partitionCol =>
extractNestedField(schema, partitionCol)
}
@@ -364,7 +401,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
traverseSchema(schema, pathParts.toList, fieldPath)
}
- private def castStringToType(value: String, dataType:
org.apache.spark.sql.types.DataType): Any = {
+ def castStringToType(value: String, dataType:
org.apache.spark.sql.types.DataType): Any = {
import org.apache.spark.sql.types._
// handling cases where the value contains path separators or is complex
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 3de45e1754ee..29d2ee9f0ca5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -50,6 +50,7 @@ import
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.BinaryUtil;
import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.HoodieTableConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -335,6 +336,14 @@ public class HoodieTableConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Key Generator type to determine key generator
class");
+ public static final ConfigProperty<String> PARTITION_EXTRACTOR_CLASS =
ConfigProperty
+ .key("hoodie.table.partition_extractor_class")
+ .noDefaultValue()
+
.withInferFunction(HoodieTableConfigUtils::inferPartitionValueExtractorClass)
+ .markAdvanced()
+ .withDocumentation("Class which implements PartitionValueExtractor to
extract the partition values, "
+ + "default is inferred based on partition configuration.");
+
// TODO: this has to be UTC. why is it not the default?
public static final ConfigProperty<HoodieTimelineTimeZone> TIMELINE_TIMEZONE
= ConfigProperty
.key("hoodie.table.timeline.timezone")
@@ -1208,6 +1217,10 @@ public class HoodieTableConfig extends HoodieConfig {
return KeyGeneratorType.getKeyGeneratorClassName(this);
}
+ public Option<String> getPartitionExtractorClass() {
+ return Option.ofNullable(getString(PARTITION_EXTRACTOR_CLASS));
+ }
+
public HoodieTimelineTimeZone getTimelineTimezone() {
return
HoodieTimelineTimeZone.valueOf(getStringOrDefault(TIMELINE_TIMEZONE));
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 6ddd459e3d6b..e20d9d3a1220 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -1070,6 +1070,7 @@ public class HoodieTableMetaClient implements
Serializable {
private Boolean bootstrapIndexEnable;
private Boolean populateMetaFields;
private String keyGeneratorClassProp;
+ private String partitionValueExtractorClass;
private String keyGeneratorType;
private Boolean slashSeparatedDatePartitioning;
private Boolean hiveStylePartitioningEnable;
@@ -1243,6 +1244,11 @@ public class HoodieTableMetaClient implements
Serializable {
return this;
}
+ public TableBuilder setPartitionValueExtractorClass(String
partitionValueExtractorClass) {
+ this.partitionValueExtractorClass = partitionValueExtractorClass;
+ return this;
+ }
+
public TableBuilder setHiveStylePartitioningEnable(Boolean
hiveStylePartitioningEnable) {
this.hiveStylePartitioningEnable = hiveStylePartitioningEnable;
return this;
@@ -1445,6 +1451,9 @@ public class HoodieTableMetaClient implements
Serializable {
if
(hoodieConfig.contains(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING)) {
setSlashSeparatedDatePartitioning(hoodieConfig.getBoolean(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING));
}
+ if (hoodieConfig.contains(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS)) {
+
setPartitionValueExtractorClass(hoodieConfig.getString(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS));
+ }
if
(hoodieConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)) {
setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
}
@@ -1588,6 +1597,9 @@ public class HoodieTableMetaClient implements
Serializable {
if (null != slashSeparatedDatePartitioning) {
tableConfig.setValue(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING,
Boolean.toString(slashSeparatedDatePartitioning));
}
+ if (null != partitionValueExtractorClass) {
+ tableConfig.setValue(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS,
partitionValueExtractorClass);
+ }
if (null != hiveStylePartitioningEnable) {
tableConfig.setValue(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE,
Boolean.toString(hiveStylePartitioningEnable));
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
index b15c97164fa2..bc4db3789093 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTableConfigUtils.java
@@ -21,12 +21,12 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import java.util.Arrays;
import java.util.stream.Collectors;
public class HoodieTableConfigUtils {
-
/**
* This function returns the partition fields joined by
BaseKeyGenerator.FIELD_SEPARATOR. It will also
* include the key generator partition type with the field. The key
generator partition type is used for
@@ -75,7 +75,7 @@ public class HoodieTableConfigUtils {
?
partitionField.split(BaseKeyGenerator.CUSTOM_KEY_GENERATOR_SPLIT_REGEX)[0]
: partitionField;
}
-
+
/**
* This function returns the hoodie.table.version from hoodie.properties
file.
*/
@@ -84,4 +84,40 @@ public class HoodieTableConfigUtils {
?
HoodieTableVersion.fromVersionCode(config.getInt(HoodieTableConfig.VERSION))
: HoodieTableConfig.VERSION.defaultValue();
}
+
+ /**
+ * Infers the appropriate PartitionValueExtractor class based on table
configuration.
+ * This function determines the correct extractor based on the number of
partition fields
+ * and partitioning style (Hive-style vs non-Hive-style).
+ *
+ * @param cfg HoodieConfig containing table configuration
+ * @return Option containing the inferred PartitionValueExtractor class name
+ */
+ public static Option<String> inferPartitionValueExtractorClass(HoodieConfig
cfg) {
+ Option<String> partitionFieldsOpt =
HoodieTableConfig.getPartitionFieldProp(cfg)
+ .or(() ->
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)));
+ if (!partitionFieldsOpt.isPresent()) {
+ return Option.empty();
+ }
+
+ String partitionFields = partitionFieldsOpt.get();
+ if (StringUtils.nonEmpty(partitionFields)) {
+ int numOfPartFields = partitionFields.split(",").length;
+ if (numOfPartFields == 1) {
+ if
(cfg.contains(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key())
+ &&
cfg.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key()).equals("true"))
{
+ return
Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor");
+ } else if
(cfg.contains(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING)
+ &&
cfg.getString(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING).equals("true"))
{
+ return
Option.of("org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor");
+ } else {
+ return
Option.of("org.apache.hudi.hive.SinglePartPartitionValueExtractor");
+ }
+ } else {
+ return Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor");
+ }
+ } else {
+ return Option.of("org.apache.hudi.hive.NonPartitionedExtractor");
+ }
+ }
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java
b/hudi-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java
similarity index 100%
rename from
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java
rename to
hudi-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index a7b33421e72b..489a54837b82 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -650,6 +650,13 @@ public class FlinkOptions extends HoodieConfig {
+ "**Note** This is being actively worked on. Please use "
+ "`hoodie.datasource.write.keygenerator.class` instead.");
+ @AdvancedConfig
+ public static final ConfigOption<String> PARTITION_VALUE_EXTRACTOR =
ConfigOptions
+ .key(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key())
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Partition value extractor class helps extract the
partition value from partition paths");
+
public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH";
public static final String PARTITION_FORMAT_DAY = "yyyyMMdd";
public static final String PARTITION_FORMAT_DASHED_DAY = "yyyy-MM-dd";
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index ecf7b73e312d..ffe94fcfd558 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -314,6 +314,7 @@ public class StreamerUtil {
.setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD.key(),
null))
.setKeyGeneratorClassProp(
conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName()))
+
.setPartitionValueExtractorClass(conf.getString(FlinkOptions.PARTITION_VALUE_EXTRACTOR.key(),
null))
.setHiveStylePartitioningEnable(conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(conf.get(FlinkOptions.URL_ENCODE_PARTITIONING))
.setCDCEnabled(conf.get(FlinkOptions.CDC_ENABLED))
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 8f561425cdd7..9f84887b1dab 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -384,7 +384,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness
{
@Test
void testDefinedTableConfigs() {
List<ConfigProperty<?>> configProperties =
HoodieTableConfig.definedTableConfigs();
- assertEquals(43, configProperties.size());
+ assertEquals(44, configProperties.size());
configProperties.forEach(c -> {
assertNotNull(c);
assertFalse(c.doc().isEmpty());
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 7e92b47f3321..111630b3694c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -290,6 +290,15 @@ object DataSourceReadOptions {
.sinceVersion("1.1.0")
.withDocumentation("Fully qualified class name of the catalog that is used
by the Polaris spark client.")
+ val USE_PARTITION_VALUE_EXTRACTOR_ON_READ: ConfigProperty[String] =
ConfigProperty
+ .key("hoodie.datasource.read.partition.value.extractor.enabled")
+ .defaultValue("false")
+ .markAdvanced()
+ .sinceVersion("1.2.0")
+ .withDocumentation("This config helps whether PartitionValueExtractor
interface can be used" +
+ " for parsing partition values from partition path. When this config is
enabled, it uses" +
+ " PartitionValueExtractor class value stored in the hoodie.properties
file.")
+
/** @deprecated Use {@link QUERY_TYPE} and its methods instead */
@Deprecated
val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
@@ -513,6 +522,14 @@ object DataSourceWriteOptions {
val KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED:
ConfigProperty[String] =
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED
+ val PARTITION_EXTRACTOR_CLASS: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.datasource.partition_extractor_class")
+ .noDefaultValue()
+ .markAdvanced()
+ .sinceVersion("1.2.0")
+ .withDocumentation("PartitionValueExtractor implementation used by Spark
datasource write/read paths " +
+ "to parse partition values from partition paths.")
+
val ENABLE_ROW_WRITER: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.row.writer.enable")
.defaultValue("true")
@@ -1074,6 +1091,12 @@ object DataSourceOptionsHelper {
if (!params.contains(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key())
&& tableConfig.getKeyGeneratorClassName != null) {
missingWriteConfigs ++=
Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() ->
tableConfig.getKeyGeneratorClassName)
}
+ if
(!params.contains(DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key())
+ && tableConfig.getPartitionExtractorClass.isPresent) {
+ missingWriteConfigs ++= Map(
+ DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key() ->
tableConfig.getPartitionExtractorClass.get()
+ )
+ }
if (!params.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()) &&
tableConfig.getPayloadClass != null) {
missingWriteConfigs ++=
Map(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() ->
tableConfig.getPayloadClass)
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 56280da8a9d7..ecd6a0188d17 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -477,6 +477,11 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
getPartitionColumnsAsInternalRowInternal(file,
metaClient.getBasePath, extractPartitionValuesFromPartitionPath = true)
+ protected def usePartitionValueExtractorOnRead(optParams: Map[String,
String], sparkSession: SparkSession): Boolean = {
+
optParams.getOrElse(DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key,
+
DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.defaultValue).toBoolean
+ }
+
protected def getPartitionColumnsAsInternalRowInternal(file:
StoragePathInfo, basePath: StoragePath,
extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
if (extractPartitionValuesFromPartitionPath) {
@@ -489,9 +494,10 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
relativePath,
basePath,
tableStructSchema,
- tableConfig.propsMap,
+ tableConfig,
timeZoneId,
- conf.getBoolean("spark.sql.sources.validatePartitionColumns", true))
+ conf.getBoolean("spark.sql.sources.validatePartitionColumns", true),
+ usePartitionValueExtractorOnRead(optParams, sparkSession))
if(rowValues.length != partitionColumns.length) {
throw new HoodieException("Failed to get partition column values from
the partition-path:"
+ s"partition column size: ${partitionColumns.length}, parsed
partition value size: ${rowValues.length}")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 34a67cfb3a5b..81b3471c8357 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -527,6 +527,14 @@ object HoodieFileIndex extends Logging {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
listingModeOverride)
}
+ // Check if partition value extractor class need to be used during reads.
+ val usePartitionValueExtractorOnRead = getConfigValue(options, sqlConf,
+ DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key,
+
DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.defaultValue())
+
properties.setProperty(DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key,
+ usePartitionValueExtractorOnRead)
+
+ // Check if path filter optimized listing is enabled on reads.
var pathFilterOptimizedListingEnabled = getConfigValue(options, sqlConf,
DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
null)
if (pathFilterOptimizedListingEnabled != null) {
@@ -545,6 +553,7 @@ object HoodieFileIndex extends Logging {
if (tableConfig != null) {
properties.setProperty(RECORDKEY_FIELD.key,
tableConfig.getRecordKeyFields.orElse(Array.empty).mkString(","))
properties.setProperty(PARTITIONPATH_FIELD.key,
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig).orElse(""))
+
properties.setProperty(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key(),
tableConfig.getPartitionExtractorClass.orElse(""))
// for simple bucket index, we need to set the INDEX_TYPE,
BUCKET_INDEX_HASH_FIELD, BUCKET_INDEX_NUM_BUCKETS
val database = getDatabaseName(tableConfig,
spark.catalog.currentDatabase)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f7db780e504c..eed8226183bb 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -291,6 +291,9 @@ class HoodieSparkSqlWriterInternal {
if
(StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)))
hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)
else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig)
+ val partitionValueExtractorClassName = hoodieConfig.getStringOrDefault(
+ DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key(),
+
hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
null))
val tableFormat =
hoodieConfig.getStringOrDefault(HoodieTableConfig.TABLE_FORMAT)
HoodieTableMetaClient.newTableBuilder()
.setTableType(tableType)
@@ -310,6 +313,7 @@ class HoodieSparkSqlWriterInternal {
.setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
.setKeyGeneratorClassProp(keyGenProp)
+ .setPartitionValueExtractorClass(partitionValueExtractorClassName)
.set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String,
Object]])
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
@@ -749,7 +753,8 @@ class HoodieSparkSqlWriterInternal {
String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())
))
val tableFormat =
hoodieConfig.getStringOrDefault(HoodieTableConfig.TABLE_FORMAT)
-
+ val partitionValueExtractorClassName = hoodieConfig.getStringOrDefault(
+ DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key(), null)
HoodieTableMetaClient.newTableBuilder()
.setTableType(HoodieTableType.valueOf(tableType))
.setTableName(tableName)
@@ -769,6 +774,7 @@ class HoodieSparkSqlWriterInternal {
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
.setPopulateMetaFields(populateMetaFields)
.setKeyGeneratorClassProp(keyGenProp)
+ .setPartitionValueExtractorClass(partitionValueExtractorClassName)
.set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String,
Object]])
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index fdc41bcfbd51..e76af4a2733d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -323,6 +323,16 @@ object HoodieWriterUtils {
&& currentPartitionFields != tableConfigPartitionFields) {
diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n")
}
+
+ // Validate partition value extractor
+ val currentPartitionValueExtractor =
params.getOrElse(DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key(), null)
+ if (currentPartitionValueExtractor != null) {
+ val tableConfigPartitionValueExtractor =
tableConfig.getString(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS)
+ if (tableConfigPartitionValueExtractor != null &&
+
!currentPartitionValueExtractor.equals(tableConfigPartitionValueExtractor)) {
+
diffConfigs.append(s"PartitionValueExtractor:\t$currentPartitionValueExtractor\t$tableConfigPartitionValueExtractor\n")
+ }
+ }
// The value of `HoodieTableConfig.RECORD_MERGE_STRATEGY_ID` can be
NULL or non-NULL.
// The non-NULL value has been validated above in the regular code
path.
// Here we check the NULL case since if the value is NULL, the check
is skipped above.
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 077e1129eb5e..a159610e0b00 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -122,6 +122,10 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
protected lazy val shouldFastBootstrap =
configProperties.getBoolean(DATA_QUERIES_ONLY.key, false)
+ protected lazy val usePartitionValueExtractorOnRead =
configProperties.getBoolean(
+ DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key(),
+
DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.defaultValue().toBoolean)
+
/**
* Get the partition schema from the hoodie.properties.
*/
@@ -436,9 +440,10 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
partitionPath,
getBasePath,
schema,
- metaClient.getTableConfig.propsMap,
+ metaClient.getTableConfig,
configProperties.getString(DateTimeUtils.TIMEZONE_OPTION,
SQLConf.get.sessionLocalTimeZone),
- shouldValidatePartitionColumns(spark))
+ shouldValidatePartitionColumns(spark),
+ usePartitionValueExtractorOnRead)
}
private def arePartitionPathsUrlEncoded: Boolean =
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index d461de145e5d..788fdbff5ca6 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -20,14 +20,12 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.hudi.{DataSourceOptionsHelper, HoodieSchemaConversionUtils}
import org.apache.hudi.DataSourceWriteOptions.OPERATION
import org.apache.hudi.HoodieWriterUtils._
-import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
TypedProperties}
+import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
HoodieConfig, TypedProperties}
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import
org.apache.hudi.common.table.HoodieTableConfig.{HIVE_STYLE_PARTITIONING_ENABLE,
URL_ENCODE_PARTITIONING}
import org.apache.hudi.common.table.timeline.TimelineUtils
-import org.apache.hudi.common.util.StringUtils
-import org.apache.hudi.common.util.ValidationUtils
-import org.apache.hudi.common.util.ValidationUtils.checkArgument
+import org.apache.hudi.common.util.{HoodieTableConfigUtils, StringUtils,
ValidationUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.keygen.constant.{KeyGeneratorOptions, KeyGeneratorType}
@@ -61,7 +59,7 @@ import scala.collection.mutable
*/
class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable)
extends Logging {
- checkArgument(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
+
ValidationUtils.checkArgument(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull
== "hudi"
|| table.provider.map(_.toLowerCase(Locale.ROOT)).orNull ==
"org.apache.hudi",
s" ${table.qualifiedName} is not a Hudi table")
@@ -268,7 +266,7 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
(tableSchema, options)
case (_, false) =>
- checkArgument(table.schema.nonEmpty,
+ ValidationUtils.checkArgument(table.schema.nonEmpty,
s"Missing schema for Create Table: $catalogTableName")
val schema = table.schema
val options = extraTableConfig(tableExists = false,
globalTableConfigs, sqlOptions) ++
@@ -345,6 +343,18 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
DataSourceOptionsHelper.inferKeyGenClazz(primaryKeys, partitions)
}
+ // Include Partition value extractor configuration.
+ if
(originTableConfig.contains(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key)) {
+ extraConfig(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key) =
+ originTableConfig(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key)
+ } else {
+ val inferredClass =
HoodieTableConfigUtils.inferPartitionValueExtractorClass(
+ new HoodieConfig(TypedProperties.fromMap(originTableConfig.asJava)))
+ if (inferredClass.isPresent) {
+ extraConfig(HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key) =
inferredClass.get()
+ }
+ }
+
extraConfig.toMap
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 1cc39fa37af0..48bb7cf20f30 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -25,10 +25,10 @@ import
org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonCo
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME
-import org.apache.hudi.common.util.{ConfigUtils, ReflectionUtils, StringUtils}
+import org.apache.hudi.common.util.{ConfigUtils, HoodieTableConfigUtils,
ReflectionUtils, StringUtils}
import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig,
HoodieWriteConfig}
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
-import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder,
MultiPartKeysValueExtractor}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder,
NonPartitionedExtractor}
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType
@@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory
import java.util.Locale
-import scala.collection.JavaConverters
import scala.collection.JavaConverters._
trait ProvidesHoodieConfig extends Logging {
@@ -460,7 +459,25 @@ trait ProvidesHoodieConfig extends Logging {
if (props.get(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key) != null) {
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS,
props.getString(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key))
}
-
hiveSyncConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS,
classOf[MultiPartKeysValueExtractor].getName)
+ // If partition value extractor is present in params Object we can use it.
+ // Here as well it checks first for
hoodie.datasource.hive_sync.partition_extractor_class and then
+ // for hoodie.datasource.partition_extractor_class config.
+ // If either of the above configs are not provided then we can try
inferring the partition extractor class name.
+ // Even if that fails, then we can assume it to be a NonPartitionExtractor.
+ val inferredPartitionValueExtractorClass =
HoodieTableConfigUtils.inferPartitionValueExtractorClass(tableConfig)
+ if
(props.containsKey(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()))
{
+
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
+
props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()))
+ } else if
(props.containsKey(DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key())) {
+
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
+
props.getString(DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key()))
+ } else if (inferredPartitionValueExtractorClass.isPresent) {
+
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
+ inferredPartitionValueExtractorClass.get())
+ } else {
+
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
classOf[NonPartitionedExtractor].getName)
+ }
+
// This is hardcoded to true to ensure consistency as Spark syncs
TIMESTAMP types as TIMESTAMP by default
// via Spark's externalCatalog API, which is used by
AlterHoodieTableCommand.
hiveSyncConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE,
"true")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
index 90dfb4f83d72..59b2b444211b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
@@ -135,6 +135,7 @@ object CreateHoodieTableCommand {
checkTableConfigEqual(originTableConfig, tableOptions,
HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
checkTableConfigEqual(originTableConfig, tableOptions,
HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
checkTableConfigEqual(originTableConfig, tableOptions,
HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.key)
+ checkTableConfigEqual(originTableConfig, tableOptions,
HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashKeyGenerator.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashKeyGenerator.scala
new file mode 100644
index 000000000000..a49c1c356640
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashKeyGenerator.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.keygen.{BuiltinKeyGenerator, ComplexAvroKeyGenerator,
KeyGenUtils}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.Arrays
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Key generator that converts partition values to slash-separated partition
paths.
+ *
+ * This generator is useful when you have partition columns like:
+ * - datestr: "yyyy-mm-dd" format
+ * - country, state, city: regular string values
+ *
+ * And you want to create partition paths like: yyyy/mm/dd/country/state/city
+ *
+ * The first partition field (typically a date) will have its hyphens replaced
with slashes.
+ * All partition fields are then combined with "/" as the separator.
+ */
+class MockSlashKeyGenerator(props: TypedProperties) extends
BuiltinKeyGenerator(props) {
+
+ private val complexAvroKeyGenerator: ComplexAvroKeyGenerator = new
ComplexAvroKeyGenerator(props)
+
+ this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props)
+ this.partitionPathFields =
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+ .split(",")
+ .map(_.trim)
+ .filter(_.nonEmpty)
+ .toList
+ .asJava
+
+ override def getRecordKey(record: GenericRecord): String = {
+ complexAvroKeyGenerator.getRecordKey(record)
+ }
+
+ override def getPartitionPath(record: GenericRecord): String = {
+ complexAvroKeyGenerator.getPartitionPath(record).replace('-', '/')
+ }
+
+ override def getRecordKey(row: Row): String = {
+ tryInitRowAccessor(row.schema)
+ combineRecordKey(getRecordKeyFieldNames,
Arrays.asList(rowAccessor.getRecordKeyParts(row): _*))
+ }
+
+ override def getRecordKey(internalRow: InternalRow, schema: StructType):
UTF8String = {
+ tryInitRowAccessor(schema)
+ combineRecordKeyUnsafe(getRecordKeyFieldNames,
Arrays.asList(rowAccessor.getRecordKeyParts(internalRow): _*))
+ }
+
+ override def getPartitionPath(row: Row): String = {
+ tryInitRowAccessor(row.schema)
+ val partitionValues = rowAccessor.getRecordPartitionPathValues(row)
+ formatPartitionPath(partitionValues)
+ }
+
+ override def getPartitionPath(row: InternalRow, schema: StructType):
UTF8String = {
+ tryInitRowAccessor(schema)
+ val partitionValues = rowAccessor.getRecordPartitionPathValues(row)
+ UTF8String.fromString(formatPartitionPath(partitionValues))
+ }
+
+ /**
+ * Formats the partition path by:
+ * 1. Converting the first partition value (date) from "yyyy-mm-dd" to
"yyyy/mm/dd"
+ * 2. Combining all partition values with "/" separator
+ *
+ * @param partitionValues Array of partition field values
+ * @return Formatted partition path like "yyyy/mm/dd/country/state/city"
+ */
+ private def formatPartitionPath(partitionValues: Array[Object]): String = {
+ if (partitionValues == null || partitionValues.length == 0) {
+ ""
+ } else {
+ val partitionPath = new StringBuilder()
+
+ for (i <- partitionValues.indices) {
+ if (i > 0) {
+ partitionPath.append("/")
+ }
+
+ var value = getPartitionValue(partitionValues(i))
+
+ // For the first partition field (typically the date), replace hyphens
with slashes
+ if (i == 0 && value.contains("-")) {
+ value = value.replace("-", "/")
+ }
+
+ partitionPath.append(value)
+ }
+
+ partitionPath.toString()
+ }
+ }
+
+ /**
+ * Extracts the string value from a partition field value object.
+ *
+ * @param value The partition field value
+ * @return String representation of the value
+ */
+ private def getPartitionValue(value: Object): String = {
+ value match {
+ case null => ""
+ case utf8: UTF8String => utf8.toString
+ case _ => String.valueOf(value)
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashPartitionValueExtractor.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashPartitionValueExtractor.scala
new file mode 100644
index 000000000000..b922473c729c
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashPartitionValueExtractor.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.sync.common.model.PartitionValueExtractor
+
+import java.util
+
+class MockSlashPartitionValueExtractor extends PartitionValueExtractor {
+
+ /**
+ *
+ * This method will extract Partition values from the partition path
+ * and provide date, country, state, city values separately.
+ * @param partitionPath PartitionPath provided will be in the format
yyyy/mm/dd/country/stat/city
+ * @return is a List of string with values template such as yyyy-mm-dd,
country, state, city
+ */
+ override def extractPartitionValuesInPath(partitionPath: String):
util.List[String] = {
+ val partitionSplitsSeq = partitionPath.split("/")
+ val date =
s"${partitionSplitsSeq(0)}-${partitionSplitsSeq(1)}-${partitionSplitsSeq(2)}"
+ val country = partitionSplitsSeq(3)
+ val state = partitionSplitsSeq(4)
+ val city = partitionSplitsSeq(5)
+ java.util.Arrays.asList(date, country, state, city)
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala
new file mode 100644
index 000000000000..ca1aa02b1ffd
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.hudi.storage.StoragePath
+
+import org.junit.jupiter.api.Assertions.assertTrue
+
+import java.util.stream.Collectors
+
+class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase {
+
+ test("Test custom partition value extractor interface") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+ // Enable partition value extractor on read
+
spark.conf.set("hoodie.datasource.read.partition.value.extractor.enabled",
"true")
+
+ spark.sql(
+ s"""
+ |create table $targetTable (
+ | `id` string,
+ | `name` string,
+ | `ts` bigint,
+ | `datestr` string,
+ | `country` string,
+ | `state` string,
+ | `city` string
+ |) using hudi
+ | tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = 'COW',
+ | 'preCombineField'='ts',
+ | 'hoodie.datasource.write.hive_style_partitioning'='false',
+ |
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator',
+ |
'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator',
+ |
'${DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor',
+ |
'${HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor'
+ | )
+ | partitioned by (`datestr`, `country`, `state`, `city`)
+ | location '$tablePath'
+ """.stripMargin)
+ // yyyy/mm/dd/country/state/city
+ import spark.implicits._
+ val df = Seq(
+ ("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ ("2", "a2", 2000, "2024-01-01", "USA", "TX", "AU"),
+ ("3", "a3", 3000, "2024-01-02", "USA", "CA", "LA"),
+ ("4", "a4", 4000, "2024-01-02", "USA", "WA", "SEA"),
+ ("5", "a5", 5000, "2024-01-03", "USA", "CA", "SFO")
+ ).toDF("id", "name", "ts", "datestr", "country", "state", "city")
+
+ df.write
+ .format("hudi")
+ .mode("append")
+ .save(tablePath)
+
+ // check result after insert and merge data into target table
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable"
+ + s" where state = 'CA'")(
+ Seq("1", "a1", 1000L, "2024-01-01", "USA", "CA", "SFO"),
+ Seq("3", "a3", 3000L, "2024-01-02", "USA", "CA", "LA"),
+ Seq("5", "a5", 5000L, "2024-01-03", "USA", "CA", "SFO")
+ )
+
+ // Verify table config has custom partition value extractor class set
+ val metaClient = HoodieTableMetaClient.builder()
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+ .setBasePath(tablePath)
+ .build()
+ val tableConfig = metaClient.getTableConfig
+ val partitionExtractorClass = tableConfig.getProps.getProperty(
+ HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key())
+ assertTrue(partitionExtractorClass ==
"org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor",
+ s"Table config should have custom partition value extractor class set
to MockSlashPartitionValueExtractor, but got $partitionExtractorClass")
+
+ // Verify that partition paths are created with slash separated format
(yyyy/MM/dd/country/state/city)
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/01/USA/CA/SFO")),
+ s"Partition path 2024/01/01/USA/CA/SFO should exist")
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/01/USA/TX/AU")),
+ s"Partition path 2024/01/01/USA/TX/AU should exist")
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/02/USA/CA/LA")),
+ s"Partition path 2024/01/02/USA/CA/LA should exist")
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/02/USA/WA/SEA")),
+ s"Partition path 2024/01/02/USA/WA/SEA should exist")
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/03/USA/CA/SFO")),
+ s"Partition path 2024/01/03/USA/CA/SFO should exist")
+
+ val engine = new HoodieSparkEngineContext(spark.sparkContext)
+ val storage = metaClient.getStorage()
+ val metadataConfig = HoodieMetadataConfig.newBuilder().build()
+ val metadataTable = new HoodieBackedTableMetadata(engine, storage,
metadataConfig, tablePath)
+ val partitionPaths = metadataTable.getAllPartitionPaths
+ assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO"))
+ assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU"))
+ assertTrue(partitionPaths.contains("2024/01/02/USA/CA/LA"))
+ assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA"))
+ assertTrue(partitionPaths.contains("2024/01/03/USA/CA/SFO"))
+ metadataTable.close()
+ }
+ }
+
+ test("Test custom partition value extractor with partition pruning and
filtering") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+ // Enable partition value extractor on read
+
spark.conf.set("hoodie.datasource.read.partition.value.extractor.enabled",
"true")
+
+ spark.sql(
+ s"""
+ |create table $targetTable (
+ | `id` string,
+ | `name` string,
+ | `ts` bigint,
+ | `datestr` string,
+ | `country` string,
+ | `state` string,
+ | `city` string
+ |) using hudi
+ | tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = 'COW',
+ | 'preCombineField'='ts',
+ | 'hoodie.datasource.write.hive_style_partitioning'='false',
+ |
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator',
+ |
'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator',
+ |
'${DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor',
+ |
'${HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor'
+ | )
+ | partitioned by (`datestr`, `country`, `state`, `city`)
+ | location '$tablePath'
+ """.stripMargin)
+
+ // Insert data across multiple partitions
+ import spark.implicits._
+ val df = Seq(
+ ("1", "a1", 1000L, "2024-01-01", "USA", "CA", "SFO"),
+ ("2", "a2", 2000L, "2024-01-01", "USA", "CA", "LA"),
+ ("3", "a3", 3000L, "2024-01-01", "USA", "TX", "AU"),
+ ("4", "a4", 4000L, "2024-01-02", "USA", "CA", "SFO"),
+ ("5", "a5", 5000L, "2024-01-02", "USA", "WA", "SEA"),
+ ("6", "a6", 6000L, "2024-01-03", "USA", "CA", "LA"),
+ ("7", "a7", 7000L, "2024-01-03", "CAN", "ON", "TOR"),
+ ("8", "a8", 8000L, "2024-01-04", "USA", "NY", "NYC")
+ ).toDF("id", "name", "ts", "datestr", "country", "state", "city")
+
+ df.write
+ .format("hudi")
+ .mode("append")
+ .save(tablePath)
+
+ // Test partition pruning with single partition column filter (state)
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where state = 'CA' order by id")(
+ Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+ Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO"),
+ Seq("6", "a6", 6000, "2024-01-03", "USA", "CA", "LA")
+ )
+
+ // Test partition pruning with multiple partition column filters
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where state = 'CA' and city = 'SFO' order by id")(
+ Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO")
+ )
+
+ // Test partition pruning with date filter
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where datestr = '2024-01-01' order by id")(
+ Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+ Seq("3", "a3", 3000, "2024-01-01", "USA", "TX", "AU")
+ )
+
+ // Test partition pruning with country filter
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where country = 'CAN' order by id")(
+ Seq("7", "a7", 7000, "2024-01-03", "CAN", "ON", "TOR")
+ )
+
+ // Test partition pruning with combined date and state filter
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where datestr = '2024-01-02' and state = 'CA' order by id")(
+ Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO")
+ )
+
+ // Test partition pruning with IN clause
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where state IN ('CA', 'NY') order by id")(
+ Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+ Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO"),
+ Seq("6", "a6", 6000, "2024-01-03", "USA", "CA", "LA"),
+ Seq("8", "a8", 8000, "2024-01-04", "USA", "NY", "NYC")
+ )
+
+ // Test reading with _hoodie_partition_path to verify custom partition
format
+ checkAnswer(s"select id, _hoodie_partition_path from $targetTable where
state = 'CA' order by id")(
+ Seq("1", "2024/01/01/USA/CA/SFO"),
+ Seq("2", "2024/01/01/USA/CA/LA"),
+ Seq("4", "2024/01/02/USA/CA/SFO"),
+ Seq("6", "2024/01/03/USA/CA/LA")
+ )
+
+ // Verify partition pruning works by corrupting a parquet file in a
partition that won't be queried
+ // We'll corrupt a file in the WA/SEA partition and query for CA - if
partition pruning works, the query succeeds
+ val metaClient = HoodieTableMetaClient.builder()
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+ .setBasePath(tablePath)
+ .build()
+
+ // Find and corrupt a parquet file in the 2024/01/02/USA/WA/SEA partition
+ val partitionPathForCorruption = new StoragePath(tablePath,
"2024/01/02/USA/WA/SEA")
+ val storage = metaClient.getStorage
+ val parquetFiles =
storage.listDirectEntries(partitionPathForCorruption).stream()
+ .filter(fileStatus => fileStatus.getPath.getName.endsWith(".parquet")
&& !fileStatus.getPath.getName.startsWith("."))
+ .collect(Collectors.toList())
+
+ assertTrue(!parquetFiles.isEmpty, "Should have at least one parquet file
in WA/SEA partition")
+
+ // Corrupt the first parquet file by writing garbage data
+ val fileToCorrupt = parquetFiles.get(0).getPath
+ val outputStream = storage.create(fileToCorrupt, true)
+ try {
+ outputStream.write("CORRUPTED_DATA".getBytes())
+ } finally {
+ outputStream.close()
+ }
+
+ // Query for state = 'CA' should still succeed because partition pruning
avoids the corrupted WA partition
+ checkAnswer(s"select id, name, state from $targetTable where state =
'CA' order by id")(
+ Seq("1", "a1", "CA"),
+ Seq("2", "a2", "CA"),
+ Seq("4", "a4", "CA"),
+ Seq("6", "a6", "CA")
+ )
+
+ // Similarly, query for state = 'TX' should succeed
+ checkAnswer(s"select id, name, state from $targetTable where state =
'TX' order by id")(
+ Seq("3", "a3", "TX")
+ )
+
+ // Create DataFrame and analyze query plan to verify partition pruning
+ val dfWithStateFilter = spark.sql(s"select * from $targetTable where
state = 'CA'")
+ val planWithStateFilter =
dfWithStateFilter.queryExecution.executedPlan.toString()
+ // Verify partition filters are pushed down
+ assertTrue(planWithStateFilter.contains("PartitionFilters") ||
planWithStateFilter.contains("PushedFilters"),
+ s"Query plan should contain partition filters for state column")
+
+ // Test DataFrame API with multiple partition filters
+ val dfWithMultipleFilters = spark.table(targetTable)
+ .filter("state = 'CA' and datestr = '2024-01-01'")
+ val planWithMultipleFilters =
dfWithMultipleFilters.queryExecution.executedPlan.toString()
+ assertTrue(planWithMultipleFilters.contains("PartitionFilters") ||
planWithMultipleFilters.contains("PushedFilters"),
+ s"Query plan should contain partition filters for multiple columns")
+
+ // Verify the filtered results
+ val multiFilterResults = dfWithMultipleFilters.select("id", "name",
"state", "datestr").orderBy("id").collect()
+ assertTrue(multiFilterResults.length == 2, s"Expected 2 rows but got
${multiFilterResults.length}")
+ assertTrue(multiFilterResults(0).getString(0) == "1", s"First row id
should be 1")
+ assertTrue(multiFilterResults(0).getString(1) == "a1", s"First row name
should be a1")
+ assertTrue(multiFilterResults(0).getString(2) == "CA", s"First row state
should be CA")
+ assertTrue(multiFilterResults(0).getString(3) == "2024-01-01", s"First
row datestr should be 2024-01-01")
+ assertTrue(multiFilterResults(1).getString(0) == "2", s"Second row id
should be 2")
+ assertTrue(multiFilterResults(1).getString(1) == "a2", s"Second row name
should be a2")
+
+ // Test DataFrame with country filter
+ val dfWithCountryFilter = spark.table(targetTable).filter("country =
'CAN'")
+ val planWithCountryFilter =
dfWithCountryFilter.queryExecution.executedPlan.toString()
+ assertTrue(planWithCountryFilter.contains("PartitionFilters") ||
planWithCountryFilter.contains("PushedFilters"),
+ s"Query plan should contain partition filters for country column")
+
+ val countryFilterResults = dfWithCountryFilter.select("id",
"country").orderBy("id").collect()
+ assertTrue(countryFilterResults.length == 1, s"Expected 1 row but got
${countryFilterResults.length}")
+ assertTrue(countryFilterResults(0).getString(0) == "7", s"Row id should
be 7")
+ assertTrue(countryFilterResults(0).getString(1) == "CAN", s"Row country
should be CAN")
+
+ // Verify all partitions exist as expected
+ val engine = new HoodieSparkEngineContext(spark.sparkContext)
+ val metadataConfig = HoodieMetadataConfig.newBuilder().build()
+ val metadataTable = new HoodieBackedTableMetadata(engine, storage,
metadataConfig, tablePath)
+ val partitionPaths = metadataTable.getAllPartitionPaths
+
+ // Verify expected partition paths
+ assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO"))
+ assertTrue(partitionPaths.contains("2024/01/01/USA/CA/LA"))
+ assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU"))
+ assertTrue(partitionPaths.contains("2024/01/02/USA/CA/SFO"))
+ assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA"))
+ assertTrue(partitionPaths.contains("2024/01/03/USA/CA/LA"))
+ assertTrue(partitionPaths.contains("2024/01/03/CAN/ON/TOR"))
+ assertTrue(partitionPaths.contains("2024/01/04/USA/NY/NYC"))
+
+ metadataTable.close()
+ }
+ }
+
+ test("Test custom partition value extractor with URL encoding") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+ // Enable partition value extractor on read
+
spark.conf.set("hoodie.datasource.read.partition.value.extractor.enabled",
"true")
+
+ spark.sql(
+ s"""
+ |create table $targetTable (
+ | `id` string,
+ | `name` string,
+ | `ts` bigint,
+ | `datestr` string,
+ | `country` string,
+ | `state` string,
+ | `city` string
+ |) using hudi
+ | tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = 'COW',
+ | 'preCombineField'='ts',
+ | 'hoodie.datasource.write.hive_style_partitioning'='false',
+ |
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator',
+ |
'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator',
+ |
'${DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor',
+ |
'${HoodieTableConfig.PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor',
+ | 'hoodie.datasource.write.partitionpath.urlencode'='true'
+ | )
+ | partitioned by (`datestr`, `country`, `state`, `city`)
+ | location '$tablePath'
+ """.stripMargin)
+
+ // Insert data with special characters that will be URL encoded
+ import spark.implicits._
+ val df = Seq(
+ ("1", "a1", 1000L, "2024-01-01", "USA", "CA", "San Francisco"),
+ ("2", "a2", 2000L, "2024-01-01", "USA", "TX", "Austin+Dallas"),
+ ("3", "a3", 3000L, "2024-01-02", "USA", "CA", "Los Angeles")
+ ).toDF("id", "name", "ts", "datestr", "country", "state", "city")
+
+ df.write
+ .format("hudi")
+ .mode("append")
+ .save(tablePath)
+
+ // Verify that data can be read back correctly with URL-encoded
partition paths
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where city = 'San Francisco' order by id")(
+ Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "San Francisco")
+ )
+
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where city = 'Austin+Dallas' order by id")(
+ Seq("2", "a2", 2000, "2024-01-01", "USA", "TX", "Austin+Dallas")
+ )
+
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where city = 'Los Angeles' order by id")(
+ Seq("3", "a3", 3000, "2024-01-02", "USA", "CA", "Los Angeles")
+ )
+
+ // Verify all data can be read
+ val allData = spark.sql(s"select id, city from $targetTable order by
id").collect()
+ assertTrue(allData.length == 3, s"Expected 3 rows but got
${allData.length}")
+ assertTrue(allData(0).getString(1) == "San Francisco", "First row city
should be San Francisco")
+ assertTrue(allData(1).getString(1) == "Austin+Dallas", "Second row city
should be Austin+Dallas")
+ assertTrue(allData(2).getString(1) == "Los Angeles", "Third row city
should be Los Angeles")
+ }
+ }
+}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
index 890c9712f714..53698d492d22 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.HadoopConfigUtils;
+import org.apache.hudi.common.util.HoodieTableConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
@@ -51,8 +52,6 @@ import static
org.apache.hudi.common.config.HoodieCommonConfig.META_SYNC_BASE_PA
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.table.HoodieTableConfig.BASE_FILE_FORMAT;
import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME;
-import static
org.apache.hudi.common.table.HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING;
-import static
org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
import static
org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_NAME_KEY;
import static
org.apache.hudi.common.table.HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY;
import static
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING;
@@ -119,40 +118,10 @@ public class HoodieSyncConfig extends HoodieConfig {
public static final ConfigProperty<String>
META_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigProperty
.key("hoodie.datasource.hive_sync.partition_extractor_class")
.defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor")
- .withInferFunction(cfg -> {
- Option<String> partitionFieldsOpt;
- if (StringUtils.nonEmpty(cfg.getString(META_SYNC_PARTITION_FIELDS))) {
- partitionFieldsOpt =
Option.ofNullable(cfg.getString(META_SYNC_PARTITION_FIELDS));
- } else {
- partitionFieldsOpt = HoodieTableConfig.getPartitionFieldProp(cfg)
- .or(() ->
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)));
- }
- if (!partitionFieldsOpt.isPresent()) {
- return Option.empty();
- }
- String partitionFields = partitionFieldsOpt.get();
- if (StringUtils.nonEmpty(partitionFields)) {
- int numOfPartFields = partitionFields.split(",").length;
- if (numOfPartFields == 1) {
- if (cfg.contains(HIVE_STYLE_PARTITIONING_ENABLE)
- &&
cfg.getString(HIVE_STYLE_PARTITIONING_ENABLE).equals("true")) {
- return
Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor");
- } else if (cfg.contains(SLASH_SEPARATED_DATE_PARTITIONING)
- &&
cfg.getString(SLASH_SEPARATED_DATE_PARTITIONING).equals("true")) {
- return
Option.of("org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor");
- } else {
- return
Option.of("org.apache.hudi.hive.SinglePartPartitionValueExtractor");
- }
- } else {
- return
Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor");
- }
- } else {
- return Option.of("org.apache.hudi.hive.NonPartitionedExtractor");
- }
- })
+
.withInferFunction(HoodieTableConfigUtils::inferPartitionValueExtractorClass)
.markAdvanced()
.withDocumentation("Class which implements PartitionValueExtractor to
extract the partition values, "
- + "default 'org.apache.hudi.hive.MultiPartKeysValueExtractor'.");
+ + "default is inferred based on partition configuration.");
public static final ConfigProperty<Boolean> META_SYNC_DECODE_PARTITION =
ConfigProperty
.key("hoodie.meta.sync.decode_partition")
diff --git
a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncConfig.java
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncConfig.java
index 7d9472e82e3a..cdffa2b91c4d 100644
---
a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncConfig.java
+++
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncConfig.java
@@ -146,6 +146,22 @@ class TestHoodieSyncConfig {
HoodieSyncConfig config5 = new HoodieSyncConfig(props5, new
Configuration());
assertEquals("org.apache.hudi.hive.SinglePartPartitionValueExtractor",
config5.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+
+ Properties props6 = new Properties();
+ props6.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(),
"foo,bar");
+ props6.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(), "foo");
+ HoodieSyncConfig config6 = new HoodieSyncConfig(props6, new
Configuration());
+ assertEquals("org.apache.hudi.hive.SinglePartPartitionValueExtractor",
+ config6.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS),
+ String.format("should infer from %s when explicitly configured",
+ HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key()));
+
+ Properties props7 = new Properties();
+ props7.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(), "foo");
+
props7.setProperty(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.key(),
"true");
+ HoodieSyncConfig config7 = new HoodieSyncConfig(props7, new
Configuration());
+ assertEquals("org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor",
+ config7.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
}
@Test
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index d1db2855a6b3..98cd42b1b8f5 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -91,6 +91,7 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.util.JavaScalaConverters;
@@ -452,7 +453,10 @@ public class StreamSync implements Serializable, Closeable
{
payloadClass = overridingMergeConfigs.get().getMiddle();
mergeStrategyId = overridingMergeConfigs.get().getRight();
}
-
+ String partitionValueExtractorClassName = props.getString(
+ DataSourceWriteOptions.PARTITION_EXTRACTOR_CLASS().key(),
+
props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
+
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.defaultValue()));
return tableBuilder.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder(TIMELINE_HISTORY_PATH.defaultValue())
@@ -466,6 +470,7 @@ public class StreamSync implements Serializable, Closeable {
.setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))
.setKeyGeneratorClassProp(keyGenClassName)
+ .setPartitionValueExtractorClass(partitionValueExtractorClassName)
.setOrderingFields(cfg.sourceOrderingFields)
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))