hudi-agent commented on code in PR #18938:
URL: https://github.com/apache/hudi/pull/18938#discussion_r3376957028
##########
hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark41ParquetReader.scala:
##########
@@ -339,6 +340,12 @@ object Spark41ParquetReader extends
SparkParquetReaderBuilder {
)
hadoopConf.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
sqlConf.parquetInferTimestampNTZEnabled)
+ hadoopConf.setBoolean(SQLConf.VARIANT_ALLOW_READING_SHREDDED.key,
Review Comment:
🤖 This only reads from `options`, falling back to the Hudi default — it
never consults `sqlConf`. A user who runs `set
spark.sql.variant.allowReadingShredded = false` (or the hoodie key) via SQL has
that explicit choice silently overwritten with Hudi's default. The companion
`HoodieBaseHadoopFsRelationFactory` does a proper precedence resolve (options >
hoodie session > spark session > Hudi default); could the propagation here use
the same precedence, e.g. by falling back to
`sqlConf.getConfString(SQLConf.VARIANT_ALLOW_READING_SHREDDED.key,
hudiDefault)` instead of the raw default? The same issue applies to
Spark40ParquetReader.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala:
##########
@@ -276,6 +277,12 @@ object Spark40ParquetReader extends
SparkParquetReaderBuilder {
)
hadoopConf.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
sqlConf.parquetInferTimestampNTZEnabled)
+ hadoopConf.setBoolean(SQLConf.VARIANT_ALLOW_READING_SHREDDED.key,
Review Comment:
🤖 Same issue as the matching block in `Spark41ParquetReader.build`: this
reads only from `options` and bypasses the precedence resolution that
`HoodieBaseHadoopFsRelationFactory` already wrote into `sqlConf`. A user who
set `spark.sql.variant.allowReadingShredded` (or the Hudi key) via `set` in SQL
will have it silently overwritten with the Hudi default. Could the fallback go
through `sqlConf.getConfString(SQLConf.VARIANT_ALLOW_READING_SHREDDED.key, …)`
instead?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
@@ -74,6 +274,136 @@ public void addFooterMetadata(String key, String value) {
footerMetadata.put(key, value);
}
+ /**
+ * Bundles the Avro sub-schema and {@link HoodieSchema.Variant} for a
shredded variant field,
+ * keyed by effective-schema field index in {@link #shreddedVariantFields}.
+ */
+ private static final class ShreddedVariantField {
+ private final Schema avroSchema;
+ private final HoodieSchema.Variant hoodieSchema;
+
+ ShreddedVariantField(Schema avroSchema, HoodieSchema.Variant hoodieSchema)
{
+ this.avroSchema = avroSchema;
+ this.hoodieSchema = hoodieSchema;
+ }
+ }
+
+ private static final Pattern DECIMAL_PATTERN = Pattern.compile(
+ "decimal\\s*\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)");
+
+ /**
+ * Applies a forced shredding schema to all variant fields in the given
schema.
+ * The forced schema DDL (e.g., {@code "a int, b string"}) defines the
typed_value
+ * fields that will be added to each variant column.
+ */
+ private static HoodieSchema applyForcedShreddingSchema(HoodieSchema schema,
String ddl) {
+ if (schema.getType() != HoodieSchemaType.RECORD) {
+ return schema;
+ }
+
+ Map<String, HoodieSchema> shreddedFields = parseShreddingDDL(ddl);
+
+ List<HoodieSchemaField> fields = schema.getFields();
+ List<HoodieSchemaField> newFields = new ArrayList<>();
+ boolean changed = false;
+
+ for (HoodieSchemaField field : fields) {
+ HoodieSchema fieldSchema = field.schema();
+ boolean wasNullable = fieldSchema.isNullable();
+ HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() :
fieldSchema;
+
+ if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+ HoodieSchema.Variant shreddedVariant =
HoodieSchema.createVariantShreddedObject(
+ unwrapped.getAvroSchema().getName(),
+ unwrapped.getAvroSchema().getNamespace(),
+ unwrapped.getAvroSchema().getDoc(),
+ shreddedFields);
+ HoodieSchema replacement = wasNullable
+ ? HoodieSchema.createNullable(shreddedVariant) : shreddedVariant;
+
newFields.add(HoodieSchemaUtils.createNewSchemaField(field.makeNullable().withSchema(replacement)));
+ changed = true;
+ } else {
+ newFields.add(HoodieSchemaUtils.createNewSchemaField(field));
+ }
+ }
+
+ if (!changed) {
+ return schema;
+ }
+
+ return HoodieSchema.createRecord(
+ schema.getAvroSchema().getName(),
+ schema.getAvroSchema().getNamespace(),
+ schema.getAvroSchema().getDoc(),
+ newFields);
+ }
+
+ /**
+ * Parses a DDL-style shredding schema string (e.g., {@code "a int, b
string, c decimal(15,1)"})
+ * into a map of field names to their HoodieSchema types.
+ */
+ private static Map<String, HoodieSchema> parseShreddingDDL(String ddl) {
+ Map<String, HoodieSchema> fields = new LinkedHashMap<>();
+ for (String fieldDef : ddl.split(",")) {
+ String trimmed = fieldDef.trim();
+ if (trimmed.isEmpty()) {
+ continue;
+ }
+ String[] parts = trimmed.split("\\s+", 2);
+ if (parts.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid shredding DDL field definition (expected 'name type'): "
+ trimmed);
+ }
+ fields.put(parts[0].trim(), parseSimpleType(parts[1].trim()));
+ }
Review Comment:
🤖 `ddl.split(",")` splits inside `decimal(p, s)` since the type itself
contains a comma. The doc example `'a int, b string, c decimal(15, 1)'` would
break here (the second/third pieces become `" b decimal(15"` and `" 1)"`, and
`parseSimpleType("decimal(15")` throws on the DECIMAL_PATTERN miss). Could you
either tokenize with parenthesis awareness or drop the decimal example from the
docs?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-common/src/main/java/org/apache/hudi/avro/VariantSchemaUtils.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Shared helpers for converting between shredded and unshredded variant
schemas.
+ * Used by both the write path ({@link HoodieAvroWriteSupport}) and the read
path
+ * (variant reconstruction in the parquet reader).
+ */
+public class VariantSchemaUtils {
Review Comment:
🤖 nit: could you rename this to `HoodieVariantSchemaUtils`? Hudi's public
utility classes consistently carry the `Hoodie` prefix (see `HoodieAvroUtils`,
`HoodieSchemaUtils`, `HoodieParquetUtils`), so `VariantSchemaUtils` stands out
as an exception that future contributors might trip over.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/VariantReconstruction.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.avro.VariantSchemaUtils;
+import org.apache.hudi.avro.VariantShreddingProvider;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Reconstructs unshredded variants when reading an already-shredded base file
on the Avro
+ * ({@code HoodieRecordType.AVRO}) read path.
+ *
+ * <p>parquet-avro does not understand variant shredding, so a shredded
variant column comes back as
+ * a raw {@code {metadata, value, typed_value}} record. This reads such
columns at their shredded
+ * (typed_value-bearing) schema, then rebuilds the standard unshredded {@code
{metadata, value}}
+ * variant via {@link VariantShreddingProvider#rebuildVariantRecord} before
records reach the
+ * merger/writer. The Spark/InternalRow read path reconstructs natively and
does not use this.
+ *
+ * <p>See https://github.com/apache/hudi/issues/18931.
+ */
+final class VariantReconstruction {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(VariantReconstruction.class);
+
+ // Classpath fallback when the provider class config is not set; mirrors
HoodieAvroFileWriterFactory.
+ private static final String[] PROVIDER_CANDIDATES = {
+ "org.apache.hudi.variant.Spark4VariantShreddingProvider"
+ };
+
+ private final HoodieSchema intermediateSchema;
+ private final Schema outputAvroSchema;
+ private final VariantShreddingProvider provider;
+ // Indexed by field position in the (requested == output) record. For target
fields, the file's
+ // shredded sub-schema and the unshredded target sub-schema for rebuild;
null for non-targets.
+ private final boolean[] isTarget;
+ private final Schema[] shreddedSubSchemas;
+ private final Schema[] unshreddedSubSchemas;
+
+ private VariantReconstruction(HoodieSchema intermediateSchema, Schema
outputAvroSchema,
+ VariantShreddingProvider provider, boolean[]
isTarget,
+ Schema[] shreddedSubSchemas, Schema[]
unshreddedSubSchemas) {
+ this.intermediateSchema = intermediateSchema;
+ this.outputAvroSchema = outputAvroSchema;
+ this.provider = provider;
+ this.isTarget = isTarget;
+ this.shreddedSubSchemas = shreddedSubSchemas;
+ this.unshreddedSubSchemas = unshreddedSubSchemas;
+ }
+
+ /**
+ * Schema to read the parquet file with: the requested schema, but with
shredded variant columns
+ * swapped to their file (typed_value-bearing) form so parquet-avro
materializes {@code typed_value}.
+ */
+ HoodieSchema intermediateSchema() {
+ return intermediateSchema;
+ }
+
+ /**
+ * Builds a reconstruction for the given file and requested schemas, or
returns {@code null} when
+ * none is needed (no shredded variant columns in the file, reading shredded
variants disabled, or
+ * no provider available - in which case the read proceeds unchanged).
+ */
+ static VariantReconstruction create(HoodieSchema fileSchema, HoodieSchema
requestedSchema, HoodieStorage storage) {
+ if (requestedSchema.getType() != HoodieSchemaType.RECORD ||
fileSchema.getType() != HoodieSchemaType.RECORD) {
+ return null;
+ }
+ if
(!storage.getConf().getBoolean(HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key(),
+
HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.defaultValue())) {
+ return null;
+ }
+
+ List<HoodieSchemaField> requestedFields = requestedSchema.getFields();
+ List<HoodieSchemaField> intermediateFields = new ArrayList<>();
+ boolean[] isTarget = new boolean[requestedFields.size()];
+ boolean anyTarget = false;
+ for (int i = 0; i < requestedFields.size(); i++) {
+ HoodieSchemaField requestedField = requestedFields.get(i);
+ Option<HoodieSchemaField> fileField =
fileSchema.getField(requestedField.name());
+ if (fileField.isPresent() &&
isShreddedVariant(fileField.get().schema())) {
+ isTarget[i] = true;
+ anyTarget = true;
+ // Read this column in its on-disk shredded shape.
+
intermediateFields.add(requestedField.withSchema(fileField.get().schema()));
+ } else {
+ intermediateFields.add(requestedField);
+ }
+ }
+ if (!anyTarget) {
+ return null;
+ }
+
+ VariantShreddingProvider provider = loadProvider(storage);
+ if (provider == null) {
+ LOG.warn("Base file has shredded variant column(s) but no
VariantShreddingProvider is available; "
+ + "variants will not be reconstructed. Set {} or add a provider
implementation to the classpath.",
+ HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key());
+ return null;
+ }
+
+ HoodieSchema intermediateSchema = HoodieSchema.createRecord(
+ requestedSchema.getAvroSchema().getName(),
+ requestedSchema.getAvroSchema().getNamespace(),
+ requestedSchema.getAvroSchema().getDoc(),
+ intermediateFields);
+ // Records leave this reader unshredded; output field order matches the
requested/intermediate order.
+ HoodieSchema outputSchema =
VariantSchemaUtils.stripVariantShredding(requestedSchema);
+
+ Schema[] shreddedSubSchemas = new Schema[requestedFields.size()];
+ Schema[] unshreddedSubSchemas = new Schema[requestedFields.size()];
+ for (int i = 0; i < requestedFields.size(); i++) {
+ if (isTarget[i]) {
+ shreddedSubSchemas[i] =
nonNull(fileSchema.getField(requestedFields.get(i).name()).get().schema()).getAvroSchema();
+ unshreddedSubSchemas[i] =
nonNull(outputSchema.getFields().get(i).schema()).getAvroSchema();
+ }
+ }
+
+ return new VariantReconstruction(intermediateSchema,
outputSchema.toAvroSchema(), provider,
+ isTarget, shreddedSubSchemas, unshreddedSubSchemas);
+ }
+
+ /**
+ * Rebuilds shredded variant columns of {@code in} (read in the intermediate
shredded shape) into
+ * a record conforming to the unshredded output schema.
+ */
+ IndexedRecord reconstruct(IndexedRecord in) {
+ GenericRecord out = new GenericData.Record(outputAvroSchema);
+ for (int i = 0; i < isTarget.length; i++) {
+ Object value = in.get(i);
+ if (isTarget[i] && value instanceof GenericRecord) {
+ out.put(i, provider.rebuildVariantRecord((GenericRecord) value,
shreddedSubSchemas[i], unshreddedSubSchemas[i]));
+ } else {
+ // Non-variant column, or a null variant column: pass through
unchanged.
+ out.put(i, value);
+ }
+ }
+ return out;
+ }
+
+ private static boolean isShreddedVariant(HoodieSchema schema) {
+ HoodieSchema unwrapped = nonNull(schema);
+ return unwrapped.getType() == HoodieSchemaType.VARIANT
+ && ((HoodieSchema.Variant) unwrapped).isShredded();
+ }
+
+ private static HoodieSchema nonNull(HoodieSchema schema) {
Review Comment:
🤖 nit: `nonNull` reads like a null assertion (cf. `Objects.requireNonNull`)
but it actually unwraps a nullable HoodieSchema. The sibling
`Spark4VariantShreddingProvider` already uses `unwrapNullable` for the same
operation — could you align the name here to reduce surprise?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * Interface for shredding variant values at write time.
+ * <p>
+ * Implementations parse variant binary data (value + metadata bytes) and
produce
+ * a shredded {@link GenericRecord} with typed_value columns populated
according
+ * to the shredding schema.
+ * <p>
+ * This interface allows the variant binary parsing logic (which may depend on
+ * engine-specific libraries like Spark's variant module) to be loaded via
reflection,
+ * keeping the core write support free of engine-specific dependencies.
+ */
+public interface VariantShreddingProvider {
Review Comment:
🤖 nit: same convention note as `VariantSchemaUtils` — could this be
`HoodieVariantShreddingProvider`? Existing Hudi interfaces like
`HoodieRecordMerger` and `HoodieBloomFilter` follow the `Hoodie` prefix, so the
current name is an outlier.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]