hudi-agent commented on code in PR #18938: URL: https://github.com/apache/hudi/pull/18938#discussion_r3373486278
########## hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/VariantReconstruction.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage.hadoop; + +import org.apache.hudi.avro.VariantSchemaUtils; +import org.apache.hudi.avro.VariantShreddingProvider; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.storage.HoodieStorage; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Reconstructs unshredded variants when reading an already-shredded base file on the Avro + * ({@code HoodieRecordType.AVRO}) read path. + * + * <p>parquet-avro does not understand variant shredding, so a shredded variant column comes back as + * a raw {@code {metadata, value, typed_value}} record. This reads such columns at their shredded + * (typed_value-bearing) schema, then rebuilds the standard unshredded {@code {metadata, value}} + * variant via {@link VariantShreddingProvider#rebuildVariantRecord} before records reach the + * merger/writer. The Spark/InternalRow read path reconstructs natively and does not use this. + * + * <p>See https://github.com/apache/hudi/issues/18931. + */ +final class VariantReconstruction { + + private static final Logger LOG = LoggerFactory.getLogger(VariantReconstruction.class); + + // Classpath fallback when the provider class config is not set; mirrors HoodieAvroFileWriterFactory. + private static final String[] PROVIDER_CANDIDATES = { + "org.apache.hudi.variant.Spark4VariantShreddingProvider" + }; + + private final HoodieSchema intermediateSchema; + private final Schema outputAvroSchema; + private final VariantShreddingProvider provider; + // Indexed by field position in the (requested == output) record. For target fields, the file's + // shredded sub-schema and the unshredded target sub-schema for rebuild; null for non-targets. + private final boolean[] isTarget; + private final Schema[] shreddedSubSchemas; + private final Schema[] unshreddedSubSchemas; + + private VariantReconstruction(HoodieSchema intermediateSchema, Schema outputAvroSchema, + VariantShreddingProvider provider, boolean[] isTarget, + Schema[] shreddedSubSchemas, Schema[] unshreddedSubSchemas) { + this.intermediateSchema = intermediateSchema; + this.outputAvroSchema = outputAvroSchema; + this.provider = provider; + this.isTarget = isTarget; + this.shreddedSubSchemas = shreddedSubSchemas; + this.unshreddedSubSchemas = unshreddedSubSchemas; + } + + /** + * Schema to read the parquet file with: the requested schema, but with shredded variant columns + * swapped to their file (typed_value-bearing) form so parquet-avro materializes {@code typed_value}. + */ + HoodieSchema intermediateSchema() { + return intermediateSchema; + } + + /** + * Builds a reconstruction for the given file and requested schemas, or returns {@code null} when + * none is needed (no shredded variant columns in the file, reading shredded variants disabled, or + * no provider available - in which case the read proceeds unchanged). + */ + static VariantReconstruction create(HoodieSchema fileSchema, HoodieSchema requestedSchema, HoodieStorage storage) { + if (requestedSchema.getType() != HoodieSchemaType.RECORD || fileSchema.getType() != HoodieSchemaType.RECORD) { + return null; + } Review Comment: 🤖 When the file has shredded variants but no provider can be loaded, we log a warning and return null — the reader then proceeds with the unshredded requested schema against a file that actually has a `typed_value` column and a sparse/optional `value`. Won't this either fail loudly downstream or silently produce variants whose `value` is null for rows where data was carried in `typed_value`? Have you considered failing fast here so compaction/clustering doesn't silently mangle data? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java: ########## @@ -74,6 +274,136 @@ public void addFooterMetadata(String key, String value) { footerMetadata.put(key, value); } + /** + * Bundles the Avro sub-schema and {@link HoodieSchema.Variant} for a shredded variant field, + * keyed by effective-schema field index in {@link #shreddedVariantFields}. + */ + private static final class ShreddedVariantField { + private final Schema avroSchema; + private final HoodieSchema.Variant hoodieSchema; + + ShreddedVariantField(Schema avroSchema, HoodieSchema.Variant hoodieSchema) { + this.avroSchema = avroSchema; + this.hoodieSchema = hoodieSchema; + } + } + + private static final Pattern DECIMAL_PATTERN = Pattern.compile( + "decimal\\s*\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)"); + + /** + * Applies a forced shredding schema to all variant fields in the given schema. + * The forced schema DDL (e.g., {@code "a int, b string"}) defines the typed_value + * fields that will be added to each variant column. + */ + private static HoodieSchema applyForcedShreddingSchema(HoodieSchema schema, String ddl) { + if (schema.getType() != HoodieSchemaType.RECORD) { + return schema; + } + + Map<String, HoodieSchema> shreddedFields = parseShreddingDDL(ddl); + + List<HoodieSchemaField> fields = schema.getFields(); + List<HoodieSchemaField> newFields = new ArrayList<>(); + boolean changed = false; + + for (HoodieSchemaField field : fields) { + HoodieSchema fieldSchema = field.schema(); + boolean wasNullable = fieldSchema.isNullable(); + HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() : fieldSchema; + + if (unwrapped.getType() == HoodieSchemaType.VARIANT) { + HoodieSchema.Variant shreddedVariant = HoodieSchema.createVariantShreddedObject( + unwrapped.getAvroSchema().getName(), + unwrapped.getAvroSchema().getNamespace(), + unwrapped.getAvroSchema().getDoc(), + shreddedFields); + HoodieSchema replacement = wasNullable + ? HoodieSchema.createNullable(shreddedVariant) : shreddedVariant; + newFields.add(HoodieSchemaUtils.createNewSchemaField(field.makeNullable().withSchema(replacement))); Review Comment: 🤖 `field.makeNullable().withSchema(replacement)` — since `withSchema(...)` recreates the field from the replacement schema (and `replacement` already wraps in nullable when `wasNullable` was true), the intermediate `makeNullable()` looks like a no-op for the non-nullable case. Is the intent that forced-shredding always makes the variant field nullable, or was this meant to mirror the existing nullability? If the former, the implementation doesn't quite achieve that. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.hudi.common.schema.HoodieSchema; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +/** + * Interface for shredding variant values at write time. + * <p> + * Implementations parse variant binary data (value + metadata bytes) and produce + * a shredded {@link GenericRecord} with typed_value columns populated according + * to the shredding schema. + * <p> + * This interface allows the variant binary parsing logic (which may depend on + * engine-specific libraries like Spark's variant module) to be loaded via reflection, + * keeping the core write support free of engine-specific dependencies. + */ +public interface VariantShreddingProvider { + + /** Review Comment: 🤖 nit: same naming convention issue as `VariantSchemaUtils` — have you considered `HoodieVariantShreddingProvider`? This is a public interface in `hudi-common` and will be referenced by its unqualified name in reflection configs, so aligning with the `Hoodie` prefix now avoids a rename later. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java: ########## @@ -0,0 +1,627 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.variant; + +import org.apache.hudi.avro.VariantShreddingProvider; +import org.apache.hudi.common.schema.HoodieSchema; + +import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.types.variant.ShreddingUtils; +import org.apache.spark.types.variant.Variant; +import org.apache.spark.types.variant.VariantSchema; +import org.apache.spark.types.variant.VariantShreddingWriter; +import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResult; +import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResultBuilder; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Implementation of {@link VariantShreddingProvider} using Spark 4's variant parsing library. + * + * <p>This class bridges the Avro record path and Spark's {@link VariantShreddingWriter} + * to allow {@code HoodieRecordType.AVRO} to write shredded variant types. It converts + * the shredded output into Avro {@link GenericRecord}s that can be written via + * {@link org.apache.hudi.avro.HoodieAvroWriteSupport}.</p> + * + * <p>The shredding logic is delegated to {@link VariantShreddingWriter#castShredded}, + * which handles scalar, object, and array shredding including residual value construction + * for non-matching fields. This class implements the {@link ShreddedResult} and + * {@link ShreddedResultBuilder} interfaces to collect the shredded components into + * Avro GenericRecords.</p> + */ +public class Spark4VariantShreddingProvider implements VariantShreddingProvider { + + private static final String VALUE_FIELD = "value"; + private static final String METADATA_FIELD = "metadata"; + private static final String TYPED_VALUE_FIELD = "typed_value"; + + @Override + public GenericRecord shredVariantRecord( + GenericRecord unshreddedVariant, + Schema shreddedSchema, + HoodieSchema.Variant variantSchema) { + + ByteBuffer valueBuf = (ByteBuffer) unshreddedVariant.get(VALUE_FIELD); + ByteBuffer metadataBuf = (ByteBuffer) unshreddedVariant.get(METADATA_FIELD); + + if (valueBuf == null || metadataBuf == null) { + return null; + } + + byte[] valueBytes = toByteArray(valueBuf); + byte[] metadataBytes = toByteArray(metadataBuf); + + Variant variant = new Variant(valueBytes, metadataBytes); + + // Build VariantSchema from the Avro shredded schema, registering + // Avro schemas at each level for GenericRecord construction. + AvroShreddedResultBuilder builder = new AvroShreddedResultBuilder(); + VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true, builder); + + // Delegate to Spark's VariantShreddingWriter for the actual shredding logic. + AvroShreddedResult result = (AvroShreddedResult) + VariantShreddingWriter.castShredded(variant, sparkSchema, builder); + + return result.toGenericRecord(); + } + + @Override + public GenericRecord rebuildVariantRecord( + GenericRecord shreddedVariant, Review Comment: 🤖 If the input `shreddedVariant` is non-null but `metadataBuf` happens to be null, we return null and the caller in `VariantReconstruction.reconstruct` puts that null into the output record — silently losing the variant data. Since `metadata` is REQUIRED in the shredded parquet schema, hitting this branch likely indicates a malformed file. Would a thrown exception (or at minimum a warn-log) be safer than silent data drop? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/avro/VariantSchemaUtils.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; + +import java.util.ArrayList; +import java.util.List; + +/** + * Shared helpers for converting between shredded and unshredded variant schemas. + * Used by both the write path ({@link HoodieAvroWriteSupport}) and the read path + * (variant reconstruction in the parquet reader). + */ +public class VariantSchemaUtils { + + private VariantSchemaUtils() { Review Comment: 🤖 nit: could you rename this to `HoodieVariantSchemaUtils`? Every other public utility class in this package (e.g. `HoodieAvroUtils`, `HoodieSchemaUtils`) carries the `Hoodie` prefix, so this one stands out as inconsistent. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
