This is an automated email from the ASF dual-hosted git repository.
voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b7adeccfbb5e feat(schema): Config path implemented for spark record
type (#18062)
b7adeccfbb5e is described below
commit b7adeccfbb5ea6af5f41369bd452fdb326a37b04
Author: voonhous <[email protected]>
AuthorDate: Tue Jun 2 20:06:30 2026 +0800
feat(schema): Config path implemented for spark record type (#18062)
- Address comments
---
.../storage/row/HoodieRowParquetWriteSupport.java | 53 ++++++++++++++++++++--
.../hudi/common/config/HoodieStorageConfig.java | 45 ++++++++++++++++++
2 files changed, 94 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index 0c6f25e3e50c..f8204169d56c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -84,6 +84,9 @@ import scala.Enumeration;
import scala.Function1;
import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_FIELD_ID_WRITE_ENABLED;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED;
import static
org.apache.hudi.config.HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD;
import static org.apache.hudi.config.HoodieWriteConfig.AVRO_SCHEMA_STRING;
import static org.apache.hudi.config.HoodieWriteConfig.INTERNAL_SCHEMA_STRING;
@@ -116,6 +119,8 @@ public class HoodieRowParquetWriteSupport extends
WriteSupport<InternalRow> {
private static final String MAP_REPEATED_NAME = "key_value";
private static final String MAP_KEY_NAME = "key";
private static final String MAP_VALUE_NAME = "value";
+ private static final String SPARK_VARIANT_WRITE_SHREDDING_ENABLED =
"spark.sql.variant.writeShredding.enabled";
+ private static final String SPARK_VARIANT_ALLOW_READING_SHREDDED =
"spark.sql.variant.allowReadingShredded";
private static final String SESSION_LOCAL_TIME_ZONE_KEY =
"spark.sql.session.timeZone";
private static final String PARQUET_METADATA_TIME_ZONE_KEY =
"org.apache.spark.timeZone";
@@ -137,6 +142,8 @@ public class HoodieRowParquetWriteSupport extends
WriteSupport<InternalRow> {
* For non-shredded cases, this is identical to structType.
*/
private final StructType shreddedSchema;
+ private final boolean variantWriteShreddingEnabled;
+ private final String variantForceShreddingSchemaForTest;
private RecordConsumer recordConsumer;
public HoodieRowParquetWriteSupport(Configuration conf, StructType
structType, Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {
@@ -145,6 +152,16 @@ public class HoodieRowParquetWriteSupport extends
WriteSupport<InternalRow> {
hadoopConf.set("spark.sql.parquet.writeLegacyFormat",
writeLegacyFormatEnabled);
hadoopConf.set("spark.sql.parquet.outputTimestampType",
config.getStringOrDefault(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE));
hadoopConf.set("spark.sql.parquet.fieldId.write.enabled",
config.getStringOrDefault(PARQUET_FIELD_ID_WRITE_ENABLED));
+
+ // Variant shredding configs
+ this.variantWriteShreddingEnabled =
config.getBooleanOrDefault(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED);
+ this.variantForceShreddingSchemaForTest =
config.getString(PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST);
+ hadoopConf.setBoolean(SPARK_VARIANT_WRITE_SHREDDING_ENABLED,
variantWriteShreddingEnabled);
+ hadoopConf.setBoolean(SPARK_VARIANT_ALLOW_READING_SHREDDED,
config.getBooleanOrDefault(PARQUET_VARIANT_ALLOW_READING_SHREDDED));
+ if (variantForceShreddingSchemaForTest != null &&
!variantForceShreddingSchemaForTest.isEmpty()) {
+ hadoopConf.set("spark.sql.variant.forceShreddingSchemaForTest",
variantForceShreddingSchemaForTest);
+ }
+
this.writeLegacyListFormat = Boolean.parseBoolean(writeLegacyFormatEnabled)
||
Boolean.parseBoolean(config.getStringOrDefault(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
"false"));
this.structType = structType;
@@ -168,15 +185,34 @@ public class HoodieRowParquetWriteSupport extends
WriteSupport<InternalRow> {
/**
* Generates a shredded schema from the given structType and hoodieSchema.
* <p>
- * For Variant fields that are configured for shredding (based on
HoodieSchema.Variant.isShredded()),
- * the VariantType is replaced with a shredded struct schema. This method
recursively processes
- * nested struct fields to handle Variant fields at any depth.
+ * For Variant fields that are configured for shredding (based on
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded
struct schema.
+ * <p>
+ * Shredding behavior is controlled by:
+ * <ul>
+ * <li>{@code hoodie.parquet.variant.write.shredding.enabled} - Master
switch for shredding (default: true).
+ * When false, no shredding happens regardless of schema
configuration.</li>
+ * <li>{@code hoodie.parquet.variant.force.shredding.schema.for.test} -
When set, forces this DDL schema
+ * as the typed_value schema for ALL variant columns, overriding
schema-driven shredding.</li>
+ * </ul>
+ *
+ * This method recursively processes nested struct fields to handle Variant
fields at any depth.
*
* @param structType The original Spark StructType
* @param hoodieSchema The HoodieSchema containing shredding information
* @return A StructType with shredded Variant fields replaced by their
shredded schemas
*/
private StructType generateShreddedSchema(StructType structType,
HoodieSchema hoodieSchema) {
+ // If write shredding is disabled, skip shredding entirely
+ if (!variantWriteShreddingEnabled) {
+ return structType;
+ }
+
+ // Parse forced shredding schema if configured
+ StructType forcedShreddingSchema = null;
+ if (variantForceShreddingSchemaForTest != null &&
!variantForceShreddingSchemaForTest.isEmpty()) {
+ forcedShreddingSchema =
StructType.fromDDL(variantForceShreddingSchemaForTest);
+ }
+
StructField[] fields = structType.fields();
StructField[] shreddedFields = new StructField[fields.length];
boolean hasShredding = false;
@@ -185,6 +221,16 @@ public class HoodieRowParquetWriteSupport extends
WriteSupport<InternalRow> {
StructField field = fields[i];
DataType dataType = field.dataType();
+ // If a forced shredding schema is configured, use it for all variant
columns
+ if (forcedShreddingSchema != null
+ &&
SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) {
+ StructType markedShreddedStruct =
SparkAdapterSupport$.MODULE$.sparkAdapter()
+ .generateVariantWriteShreddingSchema(forcedShreddingSchema, true,
false);
+ shreddedFields[i] = new StructField(field.name(),
markedShreddedStruct, field.nullable(), field.metadata());
+ hasShredding = true;
+ continue;
+ }
+
// Get the HoodieSchema for this field (if available)
// Use getNonNullType() to unwrap nullable unions (e.g., ["null",
"string"] -> "string")
HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema)
@@ -193,7 +239,6 @@ public class HoodieRowParquetWriteSupport extends
WriteSupport<InternalRow> {
.map(HoodieSchemaField::schema)
.orElse(null);
- // Check if this is a Variant field that should be shredded
if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType))
{
if (fieldHoodieSchema != null && fieldHoodieSchema.getType() ==
HoodieSchemaType.VARIANT) {
HoodieSchema.Variant variantSchema = (HoodieSchema.Variant)
fieldHoodieSchema;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index 95947e6092c2..17f2bc6a9b89 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -213,6 +213,36 @@ public class HoodieStorageConfig extends HoodieConfig {
.withDocumentation("Control whether to write bloom filter or not.
Default true. "
+ "We can set to false in non bloom index cases for CPU resource
saving.");
+ public static final ConfigProperty<Boolean>
PARQUET_VARIANT_WRITE_SHREDDING_ENABLED = ConfigProperty
+ .key("hoodie.parquet.variant.write.shredding.enabled")
+ .defaultValue(true)
+ .sinceVersion("1.1.0")
+ .withDocumentation("Controls whether variant columns are written in
shredded format. "
+ + "When enabled (default), variant columns with shredding
information in the schema will be written "
+ + "in shredded format with typed_value columns. When disabled,
variant columns are always written "
+ + "in unshredded format regardless of the schema. "
+ + "Equivalent to Spark's spark.sql.variant.writeShredding.enabled.");
+
+ public static final ConfigProperty<String>
PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST = ConfigProperty
+ .key("hoodie.parquet.variant.force.shredding.schema.for.test")
+ .noDefaultValue()
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Forces a specific shredding schema for all variant
columns, intended for testing. "
+ + "The value should be a DDL-format schema string (e.g., 'a int, b
string, c decimal(15, 1)'). "
+ + "When set and write shredding is enabled, this schema overrides
the schema-driven shredding "
+ + "configuration for all variant columns. "
+ + "Equivalent to Spark's
spark.sql.variant.forceShreddingSchemaForTest.");
+
+ public static final ConfigProperty<Boolean>
PARQUET_VARIANT_ALLOW_READING_SHREDDED = ConfigProperty
+ .key("hoodie.parquet.variant.allow.reading.shredded")
+ .defaultValue(true)
+ .sinceVersion("1.1.0")
+ .withDocumentation("Controls whether shredded variant data can be read. "
+ + "When enabled (default), the reader will reconstruct variant
values from shredded components. "
+ + "When disabled, only unshredded variant data can be read. "
+ + "Equivalent to Spark's spark.sql.variant.allowReadingShredded.");
+
public static final ConfigProperty<Boolean> WRITE_UTC_TIMEZONE =
ConfigProperty
.key("hoodie.parquet.write.utc-timezone.enabled")
.defaultValue(true)
@@ -549,6 +579,21 @@ public class HoodieStorageConfig extends HoodieConfig {
return this;
}
+ public Builder parquetVariantWriteShreddingEnabled(boolean enabled) {
+ storageConfig.setValue(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED,
String.valueOf(enabled));
+ return this;
+ }
+
+ public Builder parquetVariantForceShreddingSchemaForTest(String
schemaString) {
+ storageConfig.setValue(PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST,
schemaString);
+ return this;
+ }
+
+ public Builder parquetVariantAllowReadingShredded(boolean allowed) {
+ storageConfig.setValue(PARQUET_VARIANT_ALLOW_READING_SHREDDED,
String.valueOf(allowed));
+ return this;
+ }
+
public Builder hfileCompressionAlgorithm(String hfileCompressionAlgorithm)
{
storageConfig.setValue(HFILE_COMPRESSION_ALGORITHM_NAME,
hfileCompressionAlgorithm);
return this;