voonhous commented on code in PR #18065:
URL: https://github.com/apache/hudi/pull/18065#discussion_r3419680935
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
@@ -20,38 +20,289 @@
package org.apache.hudi.avro;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.schema.MessageType;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED;
/**
- * Wrap AvroWriterSupport for plugging in the bloom filter.
+ * Wrap AvroWriterSupport for plugging in the bloom filter and variant
shredding support.
+ *
+ * <p>When variant columns are configured for shredding (via {@link
HoodieSchema.Variant#isShredded()}),
+ * this class transforms variant records at write time to populate {@code
typed_value} columns
+ * by parsing variant binary data using a {@link VariantShreddingProvider}
loaded via reflection.</p>
*/
public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> {
private final Option<HoodieBloomFilterWriteSupport<String>>
bloomFilterWriteSupportOpt;
private final Map<String, String> footerMetadata = new HashMap<>();
protected final Properties properties;
+ /**
+ * Whether variant write shredding is enabled via config.
+ */
+ private final boolean variantWriteShreddingEnabled;
+
+ /**
+ * The effective (possibly shredded) HoodieSchema used for writing.
+ */
+ private final HoodieSchema effectiveHoodieSchema;
+
+ /**
+ * The effective Avro schema (derived from effectiveHoodieSchema).
+ */
+ private final Schema effectiveAvroSchema;
+
+ /**
+ * Variant fields that need shredding, keyed by their index in the effective
schema.
+ * Empty if no shredding is needed.
+ */
+ private final Map<Integer, ShreddedVariantField> shreddedVariantFields;
+
+ /**
+ * Provider for variant shredding (loaded via reflection). Null if no
shredding is needed.
+ */
+ private final VariantShreddingProvider shreddingProvider;
+
+ /**
+ * Names of all variant-typed top-level fields, regardless of shredding.
Used to fail fast on the
+ * not-yet-supported read-then-reshred path (compaction/clustering over an
already-shredded base
+ * file). See https://github.com/apache/hudi/issues/18931.
+ */
+ private final String[] variantFieldNames;
+
public HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema,
Option<BloomFilter> bloomFilterOpt,
Properties properties) {
- super(schema, hoodieSchema.toAvroSchema(), ConvertingGenericData.INSTANCE);
+ this(schema, hoodieSchema, generateEffectiveSchema(hoodieSchema,
properties), bloomFilterOpt, properties);
+ }
+
+ private HoodieAvroWriteSupport(MessageType schema, HoodieSchema
hoodieSchema, HoodieSchema effectiveSchema,
+ Option<BloomFilter> bloomFilterOpt,
Properties properties) {
+ super(schema, effectiveSchema.toAvroSchema(),
ConvertingGenericData.INSTANCE);
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
this.properties = properties;
String vectorMeta =
HoodieSchema.buildVectorColumnsMetadataValue(hoodieSchema);
if (!vectorMeta.isEmpty()) {
footerMetadata.put(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, vectorMeta);
}
+
+ this.effectiveHoodieSchema = effectiveSchema;
+ this.effectiveAvroSchema = effectiveSchema.toAvroSchema();
+ this.variantWriteShreddingEnabled = Boolean.parseBoolean(
+ properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+ // Identify variant fields that need shredding
+ Map<Integer, ShreddedVariantField> shreddedFields = new LinkedHashMap<>();
+
+ if (variantWriteShreddingEnabled && effectiveSchema.getType() ==
HoodieSchemaType.RECORD) {
+ List<HoodieSchemaField> fields = effectiveSchema.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ HoodieSchemaField field = fields.get(i);
+ HoodieSchema fieldSchema = field.schema();
+ // Unwrap nullable union to get the underlying type
+ if (fieldSchema.isNullable()) {
+ fieldSchema = fieldSchema.getNonNullType();
+ }
+ if (fieldSchema.getType() == HoodieSchemaType.VARIANT) {
+ HoodieSchema.Variant variant = (HoodieSchema.Variant) fieldSchema;
+ if (variant.isShredded() &&
variant.getTypedValueField().isPresent()) {
+ // Get the Avro sub-schema for this variant field from the
effective schema
+ Schema fieldAvroSchema =
effectiveAvroSchema.getFields().get(i).schema();
+ // Unwrap nullable union
+ if (fieldAvroSchema.getType() == Schema.Type.UNION) {
+ fieldAvroSchema = getNonNullFromUnion(fieldAvroSchema);
+ }
+ shreddedFields.put(i, new ShreddedVariantField(fieldAvroSchema,
variant));
+ }
+ }
+ }
+ }
+
+ this.shreddedVariantFields = shreddedFields;
+
+ // Collect every variant-typed field name (independent of shredding) for
the read-then-reshred guard.
+ List<String> variantNames = new ArrayList<>();
+ if (effectiveSchema.getType() == HoodieSchemaType.RECORD) {
+ for (HoodieSchemaField field : effectiveSchema.getFields()) {
+ HoodieSchema fieldSchema = field.schema();
+ if (fieldSchema.isNullable()) {
+ fieldSchema = fieldSchema.getNonNullType();
Review Comment:
Resolved in c14ae042 - extracted the shredding branch into a private
`shredRecord(IndexedRecord)`, so `write()` is now just the guard plus
shred-or-passthrough.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]