kbuci commented on code in PR #18539:
URL: https://github.com/apache/hudi/pull/18539#discussion_r3180024029


##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java:
##########
@@ -58,15 +63,58 @@ public class ParquetSchemaConverter {
   static final String LIST_REPEATED_NAME = "list";
   static final String LIST_ELEMENT_NAME = "element";
 
+  /**
+   * Converts a Parquet schema to a Flink RowType without a HoodieSchema 
reference.
+   * Variant detection relies solely on the Parquet {@code VARIANT} annotation 
— no structural
+   * fallback is applied, so a plain ROW with fields named metadata/value will 
NOT be
+   * mis-identified as a variant.
+   *
+   * <p>Prefer {@link #convertToRowType(MessageType, HoodieSchema)} when a 
HoodieSchema is
+   * available, which enables schema-driven variant detection (including Spark 
4.0 files
+   * that lack the annotation).
+   */
   public static RowType convertToRowType(MessageType messageType) {
-    List<RowType.RowField> dataFields =
-        messageType.asGroupType().getFields().stream()
-            .map(ParquetSchemaConverter::convertToRowField)
-            .collect(Collectors.toList());
+    return convertToRowType(messageType, null);
+  }
+
+  /**
+   * Converts a Parquet schema to a Flink RowType, using the HoodieSchema to 
drive variant
+   * detection. A Parquet group is treated as a variant only when:
+   * <ul>
+   *   <li>the Parquet group carries the {@code VARIANT} logical type 
annotation, OR</li>
+   *   <li>the corresponding HoodieSchema field is {@link 
HoodieSchemaType#VARIANT} (covers
+   *       Spark 4.0 files that lack the annotation)</li>
+   * </ul>
+   *
+   * @param messageType Parquet schema
+   * @param hoodieSchema HoodieSchema for the table, or null to use 
annotation-only detection
+   */
+  public static RowType convertToRowType(MessageType messageType, HoodieSchema 
hoodieSchema) {
+    List<RowType.RowField> dataFields = new ArrayList<>();
+    for (Type field : messageType.getFields()) {
+      HoodieSchema fieldSchema = resolveFieldSchema(hoodieSchema, 
field.getName());
+      dataFields.add(convertToRowField(field, fieldSchema));
+    }
     return new RowType(dataFields);
   }
 
+  /**
+   * Looks up the HoodieSchema for a named field within a record schema.
+   * Returns null if the record schema is null or does not contain the field.
+   */
+  static HoodieSchema resolveFieldSchema(HoodieSchema recordSchema, String 
fieldName) {

Review Comment:
   I ideally didn't want to make this expansive change in this PR, but I wanted 
to make sure that Flink would correctly infer parquet variant group written by 
Spark 4.0 (when parquet variant annotations aren't available), without always 
assuming that a parquet group with `metadata,value` is a variant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to