hudi-agent commented on code in PR #18539:
URL: https://github.com/apache/hudi/pull/18539#discussion_r3180295231
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java:
##########
@@ -315,4 +388,58 @@ public static int computeMinBytesForDecimalPrecision(int
precision) {
}
return numBytes;
}
+
+ /**
+ * Converts a Variant column to the canonical unshredded Parquet layout:
+ * a group with required binary {@code metadata} and required binary {@code
value}.
+ */
+ private static Type convertVariantToParquetType(String name, Type.Repetition
repetition) {
+ // TODO: add .as(LogicalTypeAnnotation.variantType()) once parquet-java is
bumped to 1.16.0
+ return Types.buildGroup(repetition)
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
+ .named(HoodieSchema.Variant.VARIANT_METADATA_FIELD))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
+ .named(HoodieSchema.Variant.VARIANT_VALUE_FIELD))
+ .named(name);
+ }
+
+ /**
+ * Detects whether a Parquet group represents a Variant (shredded or
unshredded).
+ *
+ * <p>Detection uses three signals, checked in priority order:
+ * <ol>
+ * <li><b>Annotation:</b> parquet-java 1.15.2+ annotates variant groups
with
+ * {@code VARIANT(specVersion)}. Detected by class name
(reflection-safe).</li>
+ * <li><b>Schema hint:</b> if the HoodieSchema says the field is {@code
VARIANT},
+ * we trust it — this covers Spark 4.0 files that lack the
annotation.</li>
+ * <li><b>No structural guessing:</b> without an annotation or schema
hint, a group
+ * is never treated as variant, even if its fields are named
metadata/value.</li>
+ * </ol>
+ */
+ public static boolean isVariantGroup(
+ GroupType groupType, LogicalTypeAnnotation logicalType, HoodieSchemaType
schemaHint) {
+ if (hasVariantAnnotation(logicalType)) {
+ return true;
+ }
Review Comment:
🤖 nit: `groupType` is unused in `isVariantGroup` — only `logicalType` and
`schemaHint` are read. Could you either drop the parameter or wire it into a
structural check, since right now its presence implies it's part of the
detection logic?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java:
##########
@@ -51,6 +52,36 @@
*/
public class HoodieSchemaConverter {
+ private static Constructor<?> variantTypeCtor;
+ private static boolean variantTypeResolved;
+
+ /**
+ * Returns a Flink {@code VariantType} DataType if the runtime Flink version
is 2.1+,
+ * or {@code null} if the class is not on the classpath (pre-2.1 Flink).
+ * The reflection result is cached so the class lookup happens at most once
per JVM.
+ * Only called during schema conversion (cold path), never per row.
+ */
Review Comment:
🤖 nit: could you use the lazy-holder idiom here (like `GetVariantHolder` in
`AbstractHoodieRowData`)? That avoids the `synchronized` on every call and
keeps the two reflection-cache patterns in this PR consistent.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java:
##########
@@ -58,15 +63,58 @@ public class ParquetSchemaConverter {
static final String LIST_REPEATED_NAME = "list";
static final String LIST_ELEMENT_NAME = "element";
+ /**
+ * Converts a Parquet schema to a Flink RowType without a HoodieSchema
reference.
+ * Variant detection relies solely on the Parquet {@code VARIANT} annotation
— no structural
+ * fallback is applied, so a plain ROW with fields named metadata/value will
NOT be
+ * mis-identified as a variant.
+ *
+ * <p>Prefer {@link #convertToRowType(MessageType, HoodieSchema)} when a
HoodieSchema is
+ * available, which enables schema-driven variant detection (including Spark
4.0 files
+ * that lack the annotation).
+ */
public static RowType convertToRowType(MessageType messageType) {
- List<RowType.RowField> dataFields =
- messageType.asGroupType().getFields().stream()
- .map(ParquetSchemaConverter::convertToRowField)
- .collect(Collectors.toList());
+ return convertToRowType(messageType, null);
+ }
+
+ /**
+ * Converts a Parquet schema to a Flink RowType, using the HoodieSchema to
drive variant
+ * detection. A Parquet group is treated as a variant only when:
+ * <ul>
+ * <li>the Parquet group carries the {@code VARIANT} logical type
annotation, OR</li>
+ * <li>the corresponding HoodieSchema field is {@link
HoodieSchemaType#VARIANT} (covers
+ * Spark 4.0 files that lack the annotation)</li>
+ * </ul>
+ *
+ * @param messageType Parquet schema
+ * @param hoodieSchema HoodieSchema for the table, or null to use
annotation-only detection
+ */
+ public static RowType convertToRowType(MessageType messageType, HoodieSchema
hoodieSchema) {
+ List<RowType.RowField> dataFields = new ArrayList<>();
+ for (Type field : messageType.getFields()) {
+ HoodieSchema fieldSchema = resolveFieldSchema(hoodieSchema,
field.getName());
+ dataFields.add(convertToRowField(field, fieldSchema));
+ }
return new RowType(dataFields);
}
+ /**
+ * Looks up the HoodieSchema for a named field within a record schema.
+ * Returns null if the record schema is null or does not contain the field.
+ */
+ static HoodieSchema resolveFieldSchema(HoodieSchema recordSchema, String
fieldName) {
+ if (recordSchema == null || recordSchema.getType() !=
HoodieSchemaType.RECORD) {
+ return null;
+ }
+ org.apache.hudi.common.util.Option<HoodieSchemaField> field =
recordSchema.getField(fieldName);
Review Comment:
🤖 nit: could you import `org.apache.hudi.common.util.Option` instead of
fully-qualifying it inline? It would match the import style used elsewhere in
this file.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]