This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 71aa121035cf fix(flink): enforce Parquet VARIANT annotation in Flink 
schema conversion for unshredded variant (#18539)
71aa121035cf is described below

commit 71aa121035cf912770f6ad4bf157a945ae3a8409
Author: Krishen <[email protected]>
AuthorDate: Mon May 18 21:26:41 2026 -0700

    fix(flink): enforce Parquet VARIANT annotation in Flink schema conversion 
for unshredded variant (#18539)
    
    * feat(flink): write/read unshredded variant to Flink parquet file 
writers/readers using Flink's Variant type
    
    ---------
    
    Co-authored-by: Krishen Bhan <“[email protected]”>
    Co-authored-by: Cursor <[email protected]>
---
 .../row/parquet/ParquetSchemaConverter.java        |  31 ++++-
 .../apache/hudi/util/HoodieSchemaConverter.java    |   4 +
 .../row/parquet/TestParquetSchemaConverter.java    | 148 +++++++++++++++++++++
 .../hudi/util/TestHoodieSchemaConverter.java       |  34 ++++-
 .../ITTestVariantCrossEngineCompatibility.java     |   3 +-
 .../org/apache/hudi/adapter/DataTypeAdapter.java   |  20 ++-
 .../org/apache/hudi/adapter/DataTypeAdapter.java   |  20 ++-
 .../org/apache/hudi/adapter/DataTypeAdapter.java   |  20 ++-
 .../org/apache/hudi/adapter/DataTypeAdapter.java   |  20 ++-
 .../org/apache/hudi/adapter/DataTypeAdapter.java   |  20 ++-
 .../org/apache/hudi/adapter/DataTypeAdapter.java   |  35 +++++
 11 files changed, 323 insertions(+), 32 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 668098314d48..dd19100f8ccd 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -49,6 +49,17 @@ import static 
org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
 /**
  * Schema converter converts Parquet schema to and from Flink internal types.
  *
+ * <p>On reads, this converter performs best-effort physical type mapping. It 
detects the
+ * Parquet {@code VARIANT} annotation and will reject shredded variants. Blob 
and Vector types
+ * cannot be distinguished from ordinary binary columns via Parquet schema 
alone.
+ *
+ * <p>On writes, this converter maps Flink {@code VariantType} to the 
canonical unshredded Parquet
+ * layout (group with binary metadata + value fields). The VARIANT logical 
type annotation is
+ * resolved by {@link DataTypeAdapter#variantParquetAnnotation()} — on Flink 
2.1+ with
+ * parquet-java 1.16.0+ the annotation is attached automatically; on pre-2.1 
Flink or with
+ * parquet < 1.16.0 the write throws {@link UnsupportedOperationException} 
because writing
+ * variant data without the annotation would produce files that no reader can 
identify as variant.
+ *
  * <p>Reference org.apache.flink.formats.parquet.utils.ParquetSchemaConverter 
to support timestamp of INT64 8 bytes.
  */
 @Slf4j
@@ -158,6 +169,9 @@ public class ParquetSchemaConverter {
                 convertToRowField(keyValueType.getLeft()).getType().copy(true),
                 convertToRowField(keyValueType.getRight()).getType()));
       } else if (hasVariantAnnotation(logicalType)) {
+        // Fires for files written with parquet-java that carry the VARIANT 
annotation.
+        // The reader infers the Flink RowType from the Parquet footer via 
convertToRowType(),
+        // so this annotation detection is the primary mechanism for 
recognizing Variant columns.
         if (isShreddedVariant(groupType)) {
           throw new UnsupportedOperationException(
               "Shredded Variant is not supported in Flink. "
@@ -223,10 +237,25 @@ public class ParquetSchemaConverter {
   /**
    * Converts a Variant column to the canonical unshredded Parquet layout:
    * a group with required binary {@code metadata} and required binary {@code 
value}.
+   *
+   * <p>No shredded-variant guard is needed here: Flink 2.1's {@code 
VariantType} is a single
+   * atomic {@code LogicalTypeRoot.VARIANT} with no shredding representation 
(FLIP-521 scopes
+   * shredding out), so a shredded variant can never arrive as a Flink 
LogicalType.
+   *
+   * <p>Delegates to {@link DataTypeAdapter#variantParquetAnnotation()} for 
the VARIANT logical
+   * type annotation. On Flink < 2.1 this throws (variant writes are 
unsupported). On Flink 2.1+
+   * with parquet-java < 1.16.0 this also throws, because writing variant data 
without the
+   * annotation would produce files that no reader can identify as variant.
    */
   private static Type convertVariantToParquetType(String name, Type.Repetition 
repetition) {
-    // TODO: add .as(LogicalTypeAnnotation.variantType()) once parquet-java is 
bumped to 1.16.0
+    LogicalTypeAnnotation annotation = 
DataTypeAdapter.variantParquetAnnotation()
+        .orElseThrow(() -> new UnsupportedOperationException(
+            "Cannot write Variant columns: parquet-java 1.16.0+ is required to 
emit the VARIANT "
+                + "logical type annotation. Without the annotation, readers 
cannot identify the "
+                + "column as Variant. Current parquet-java version does not 
support "
+                + "LogicalTypeAnnotation.variantType()."));
     return Types.buildGroup(repetition)
+        .as(annotation)
         .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.REQUIRED)
             .named(HoodieSchema.Variant.VARIANT_METADATA_FIELD))
         .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.REQUIRED)
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
index c49142800b45..c413101dde46 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
@@ -70,6 +70,10 @@ public class HoodieSchemaConverter {
    * <p>The "{rowName}." is used as the nested row type name prefix in order 
to generate
    * the right schema. Nested record types that only differ by type name are 
still compatible.
    *
+   * <p>On Flink 2.1+, {@code LogicalTypeRoot.VARIANT} is detected via string 
comparison
+   * (to avoid compile-time dependency) and mapped to {@link 
HoodieSchema#createVariant()}.
+   * Pre-2.1 Flink does not support Variant.
+   *
    * @param logicalType Flink logical type
    * @param rowName     the record name
    * @return HoodieSchema matching this logical type
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
index 318f2034d175..403b84068496 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -18,6 +18,10 @@
 
 package org.apache.hudi.io.storage.row.parquet;
 
+import org.apache.hudi.adapter.DataTypeAdapter;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
@@ -27,13 +31,19 @@ import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -41,6 +51,9 @@ import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test cases for {@link ParquetSchemaConverter}.
@@ -216,4 +229,139 @@ public class TestParquetSchemaConverter {
         + "}\n";
     assertThat(messageType.toString(), is(expected));
   }
+
+  /**
+   * A Parquet group with metadata + value binary fields but NO VARIANT 
annotation must be
+   * treated as a plain ROW. Only the Parquet {@code VARIANT} annotation 
triggers variant
+   * detection in this converter; unannotated groups are never guessed as 
variant.
+   */
+  @Test
+  void testVariantPhysicalLayoutTreatedAsRow() {
+    MessageType variantParquet = new MessageType(
+        "test",
+        Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
+            Type.Repetition.REQUIRED).named("id"),
+        Types.buildGroup(Type.Repetition.REQUIRED)
+            .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+                Type.Repetition.REQUIRED).named("metadata"))
+            .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+                Type.Repetition.REQUIRED).named("value"))
+            .named("data"));
+
+    RowType rowType = ParquetSchemaConverter.convertToRowType(variantParquet);
+    assertEquals(2, rowType.getFieldCount());
+    assertEquals("ROW", rowType.getTypeAt(1).getTypeRoot().name());
+  }
+
+  /**
+   * Unannotated group with metadata + value + typed_value (3 fields) is 
treated as a generic
+   * ROW when no annotation or schema hint is present.
+   */
+  @Test
+  void testUnannotatedShreddedGroupTreatedAsRow() {
+    MessageType shreddedNoAnnotation = new MessageType(
+        "test",
+        Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
+            Type.Repetition.REQUIRED).named("id"),
+        Types.buildGroup(Type.Repetition.REQUIRED)
+            .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+                Type.Repetition.REQUIRED).named("metadata"))
+            .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
+                Type.Repetition.REQUIRED).named("value"))
+            .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
+                Type.Repetition.OPTIONAL).named("typed_value"))
+            .named("data"));
+
+    RowType rowType = 
ParquetSchemaConverter.convertToRowType(shreddedNoAnnotation);
+    assertEquals(2, rowType.getFieldCount());
+    assertEquals("ROW", rowType.getTypeAt(1).getTypeRoot().name());
+  }
+
+  /**
+   * On Flink 2.1+ with parquet 1.16.0+, converting a RowType containing a 
Variant column to a
+   * Parquet MessageType should produce a group with the VARIANT annotation 
and required binary
+   * {@code metadata} and {@code value} fields.
+   * On pre-2.1 Flink this test is skipped since VariantType does not exist.
+   * On parquet < 1.16.0 the write is expected to fail (annotation 
unavailable).
+   */
+  @Test
+  void testVariantWritePathProducesCorrectLayout() {
+    LogicalType variantType;
+    try {
+      variantType = DataTypeAdapter.createVariantType().getLogicalType();
+    } catch (UnsupportedOperationException e) {
+      // Pre-2.1 Flink: VariantType doesn't exist, skip
+      return;
+    }
+
+    RowType rowType = RowType.of(
+        new LogicalType[]{new IntType(), variantType},
+        new String[]{"id", "data"});
+
+    if (!DataTypeAdapter.variantParquetAnnotation().isPresent()) {
+      // parquet < 1.16.0: write must fail because annotation is unavailable
+      UnsupportedOperationException ex = 
org.junit.jupiter.api.Assertions.assertThrows(
+          UnsupportedOperationException.class,
+          () -> ParquetSchemaConverter.convertToParquetMessageType("test", 
rowType));
+      assertTrue(ex.getMessage().contains("parquet-java 1.16.0+"),
+          "Error message should mention parquet version requirement");
+      return;
+    }
+
+    // parquet 1.16.0+: write succeeds with annotation
+    MessageType messageType = 
ParquetSchemaConverter.convertToParquetMessageType("test", rowType);
+    assertEquals(2, messageType.getFieldCount());
+
+    Type variantField = messageType.getType("data");
+    assertTrue(variantField instanceof GroupType, "Variant column should be a 
Parquet group");
+    GroupType variantGroup = (GroupType) variantField;
+    assertEquals(2, variantGroup.getFieldCount());
+    assertEquals(HoodieSchema.Variant.VARIANT_METADATA_FIELD, 
variantGroup.getType(0).getName());
+    assertEquals(HoodieSchema.Variant.VARIANT_VALUE_FIELD, 
variantGroup.getType(1).getName());
+    assertTrue(variantGroup.getType(0).isPrimitive());
+    assertTrue(variantGroup.getType(1).isPrimitive());
+    assertEquals(PrimitiveType.PrimitiveTypeName.BINARY,
+        variantGroup.getType(0).asPrimitiveType().getPrimitiveTypeName());
+    assertEquals(PrimitiveType.PrimitiveTypeName.BINARY,
+        variantGroup.getType(1).asPrimitiveType().getPrimitiveTypeName());
+    assertNotNull(variantGroup.getLogicalTypeAnnotation(),
+        "Variant group must carry the VARIANT annotation");
+  }
+
+  /**
+   * Verifies that writing a Variant column fails with a clear error when 
parquet-java on the
+   * classpath does not support the VARIANT annotation (< 1.16.0). On pre-2.1 
Flink the adapter
+   * throws directly; on Flink 2.1+ with parquet < 1.16.0 the write path 
throws.
+   */
+  @Test
+  void testVariantWriteFailsWithoutAnnotation() {
+    Option<LogicalTypeAnnotation> annotationOpt;
+    try {
+      annotationOpt = DataTypeAdapter.variantParquetAnnotation();
+    } catch (UnsupportedOperationException e) {
+      // Pre-2.1 Flink: expected to throw from the adapter
+      assertTrue(e.getMessage().contains("VARIANT type is only supported in 
Flink 2.1+"));
+      return;
+    }
+
+    if (annotationOpt.isPresent()) {
+      // parquet 1.16.0+: annotation is available, write succeeds — nothing to 
test here
+      return;
+    }
+
+    // Flink 2.1 + parquet < 1.16.0: annotation is null, write must fail
+    LogicalType variantType = 
DataTypeAdapter.createVariantType().getLogicalType();
+    RowType rowType = RowType.of(
+        new LogicalType[]{new IntType(), variantType},
+        new String[]{"id", "data"});
+
+    UnsupportedOperationException ex = 
org.junit.jupiter.api.Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () -> ParquetSchemaConverter.convertToParquetMessageType("test", 
rowType));
+    assertTrue(ex.getMessage().contains("parquet-java 1.16.0+"),
+        "Error message should mention the parquet version requirement");
+    assertTrue(ex.getMessage().contains("VARIANT"),
+        "Error message should mention VARIANT");
+  }
+
 }
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
index 9a4f920b859e..9f6ad8f53bf7 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -695,19 +695,16 @@ public class TestHoodieSchemaConverter {
 
   @Test
   public void testVariantTypeConversion() {
-    // Test direct Variant conversion
     HoodieSchema variantSchema = HoodieSchema.createVariant();
     DataType dataType = HoodieSchemaConverter.convertToDataType(variantSchema);
     assertNotNull(dataType);
 
-    // Verify it's a Variant
     assertThat("the return type should be variant",
         dataType.getLogicalType().asSummaryString(), is("VARIANT NOT NULL"));
   }
 
   @Test
   public void testVariantInRecordConversion() {
-    // Test Variant field within a record
     HoodieSchema recordWithVariant = HoodieSchema.createRecord(
         "test_record",
         null,
@@ -722,11 +719,40 @@ public class TestHoodieSchemaConverter {
     assertEquals(2, result.getFieldCount());
     assertEquals("data", result.getFieldNames().get(1));
 
-    // Verify variant field
     assertThat("the return type should be variant",
         result.getTypeAt(1).asSummaryString(), is("VARIANT NOT NULL"));
   }
 
+  @Test
+  public void testVariantInArrayConversion() {
+    HoodieSchema arrayOfVariant = 
HoodieSchema.createArray(HoodieSchema.createVariant());
+    DataType dataType = 
HoodieSchemaConverter.convertToDataType(arrayOfVariant);
+    assertNotNull(dataType);
+    assertInstanceOf(ArrayType.class, dataType.getLogicalType());
+    LogicalType elementType = ((ArrayType) 
dataType.getLogicalType()).getElementType();
+    assertEquals("VARIANT", elementType.getTypeRoot().name());
+  }
+
+  @Test
+  public void testVariantInMapConversion() {
+    HoodieSchema mapOfVariant = 
HoodieSchema.createMap(HoodieSchema.createVariant());
+    DataType dataType = HoodieSchemaConverter.convertToDataType(mapOfVariant);
+    assertNotNull(dataType);
+    assertInstanceOf(MapType.class, dataType.getLogicalType());
+    LogicalType valueType = ((MapType) 
dataType.getLogicalType()).getValueType();
+    assertEquals("VARIANT", valueType.getTypeRoot().name());
+  }
+
+  @Test
+  public void testShreddedVariantConversionThrows() {
+    HoodieSchema.Variant shredded = HoodieSchema.createVariantShredded(
+        HoodieSchema.create(HoodieSchemaType.STRING));
+    UnsupportedOperationException ex = assertThrows(
+        UnsupportedOperationException.class,
+        () -> HoodieSchemaConverter.convertToDataType(shredded));
+    assertTrue(ex.getMessage().contains("Shredded Variant is not yet supported 
in Flink"));
+  }
+
   @Test
   public void testBlobStructureValidation() {
     // Positive case: Create ROW matching BLOB structure
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
index de6e3835d120..e7f9ed3766f1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java
@@ -41,6 +41,7 @@ import static 
org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
+
 /**
  * Integration test for cross-engine compatibility - verifying that Flink can 
read Variant tables written by Spark 4.0.
  */
@@ -56,7 +57,6 @@ public class ITTestVariantCrossEngineCompatibility {
   private void verifyFlinkCanReadSparkVariantTable(String tablePath, String 
tableType, String testDescription) throws Exception {
     TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
 
-    // Create a Hudi table pointing to the Spark-written data
     String createTableDdl = String.format(
         "CREATE TABLE variant_table ("
             + "  id INT,"
@@ -73,7 +73,6 @@ public class ITTestVariantCrossEngineCompatibility {
 
     tableEnv.executeSql(createTableDdl);
 
-    // Query the table to verify Flink can read the data
     TableResult result = tableEnv.executeSql("SELECT id, name, v, ts FROM 
variant_table ORDER BY id");
     List<Row> rows = CollectionUtil.iteratorToList(result.collect());
 
diff --git 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
index 79b9e254dd61..e8e31b341a18 100644
--- 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
+++ 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
@@ -22,17 +22,27 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.variant.Variant;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 
 /**
  * Adapter utils to provide {@code DataType} utilities.
  */
 public class DataTypeAdapter {
+  private static final String VARIANT_UNSUPPORTED_MSG =
+      "VARIANT type is only supported in Flink 2.1+. "
+          + "Please upgrade your Flink version to use Variant columns.";
+
+  public static Option<LogicalTypeAnnotation> variantParquetAnnotation() {
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
+  }
+
   public static Variant getVariant(RowData rowData, int pos) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static Object createVariant(byte[] value, byte[] metadata) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static boolean isVariantType(LogicalType logicalType) {
@@ -40,14 +50,14 @@ public class DataTypeAdapter {
   }
 
   public static DataType createVariantType() {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static byte[] getVariantMetadata(Object obj) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static byte[] getVariantValue(Object obj) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
index 79b9e254dd61..e8e31b341a18 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
@@ -22,17 +22,27 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.variant.Variant;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 
 /**
  * Adapter utils to provide {@code DataType} utilities.
  */
 public class DataTypeAdapter {
+  private static final String VARIANT_UNSUPPORTED_MSG =
+      "VARIANT type is only supported in Flink 2.1+. "
+          + "Please upgrade your Flink version to use Variant columns.";
+
+  public static Option<LogicalTypeAnnotation> variantParquetAnnotation() {
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
+  }
+
   public static Variant getVariant(RowData rowData, int pos) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static Object createVariant(byte[] value, byte[] metadata) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static boolean isVariantType(LogicalType logicalType) {
@@ -40,14 +50,14 @@ public class DataTypeAdapter {
   }
 
   public static DataType createVariantType() {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static byte[] getVariantMetadata(Object obj) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static byte[] getVariantValue(Object obj) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
index 79b9e254dd61..e8e31b341a18 100644
--- 
a/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
+++ 
b/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
@@ -22,17 +22,27 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.variant.Variant;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 
 /**
  * Adapter utils to provide {@code DataType} utilities.
  */
 public class DataTypeAdapter {
+  private static final String VARIANT_UNSUPPORTED_MSG =
+      "VARIANT type is only supported in Flink 2.1+. "
+          + "Please upgrade your Flink version to use Variant columns.";
+
+  public static Option<LogicalTypeAnnotation> variantParquetAnnotation() {
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
+  }
+
   public static Variant getVariant(RowData rowData, int pos) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static Object createVariant(byte[] value, byte[] metadata) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static boolean isVariantType(LogicalType logicalType) {
@@ -40,14 +50,14 @@ public class DataTypeAdapter {
   }
 
   public static DataType createVariantType() {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static byte[] getVariantMetadata(Object obj) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static byte[] getVariantValue(Object obj) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
index 79b9e254dd61..e8e31b341a18 100644
--- 
a/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
+++ 
b/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
@@ -22,17 +22,27 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.variant.Variant;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 
 /**
  * Adapter utils to provide {@code DataType} utilities.
  */
 public class DataTypeAdapter {
+  private static final String VARIANT_UNSUPPORTED_MSG =
+      "VARIANT type is only supported in Flink 2.1+. "
+          + "Please upgrade your Flink version to use Variant columns.";
+
+  public static Option<LogicalTypeAnnotation> variantParquetAnnotation() {
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
+  }
+
   public static Variant getVariant(RowData rowData, int pos) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static Object createVariant(byte[] value, byte[] metadata) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static boolean isVariantType(LogicalType logicalType) {
@@ -40,14 +50,14 @@ public class DataTypeAdapter {
   }
 
   public static DataType createVariantType() {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static byte[] getVariantMetadata(Object obj) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static byte[] getVariantValue(Object obj) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
 
b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
index 566ca723648a..18db48f0fed1 100644
--- 
a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
+++ 
b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
@@ -22,17 +22,27 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.variant.Variant;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 
 /**
  * Adapter utils to provide {@code DataType} utilities.
  */
 public class DataTypeAdapter {
+  private static final String VARIANT_UNSUPPORTED_MSG =
+      "VARIANT type is only supported in Flink 2.1+. "
+          + "Please upgrade your Flink version to use Variant columns.";
+
+  public static Option<LogicalTypeAnnotation> variantParquetAnnotation() {
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
+  }
+
   public static Variant getVariant(RowData rowData, int pos) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static Object createVariant(byte[] value, byte[] metadata) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static boolean isVariantType(LogicalType logicalType) {
@@ -40,14 +50,14 @@ public class DataTypeAdapter {
   }
 
   public static DataType createVariantType() {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static byte[] getVariantMetadata(Object obj) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 
   public static byte[] getVariantValue(Object obj) {
-    throw new UnsupportedOperationException("Variant is not supported yet.");
+    throw new UnsupportedOperationException(VARIANT_UNSUPPORTED_MSG);
   }
 }
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
 
b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
index 6ae4c4a6baba..77d6f708ece3 100644
--- 
a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
+++ 
b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/DataTypeAdapter.java
@@ -25,11 +25,46 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.types.variant.BinaryVariant;
 import org.apache.flink.types.variant.Variant;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+
+import java.lang.reflect.Method;
 
 /**
  * Adapter utils to provide {@code DataType} utilities.
  */
 public class DataTypeAdapter {
+
+  /**
+   * The Parquet Variant binary format specification version passed to
+   * {@code LogicalTypeAnnotation.variantType(byte)}. Version 1 is the initial 
spec
+   * defined by the Parquet Variant proposal (parquet-format 2.11.0 / 
parquet-java 1.16.0).
+   */
+  private static final byte VARIANT_SPEC_VERSION = 1;
+
+  /**
+   * Cached VARIANT annotation resolved via reflection. Empty if parquet-java
+   * on the classpath predates {@code LogicalTypeAnnotation.variantType()} (< 
1.16.0).
+   */
+  private static final Option<LogicalTypeAnnotation> VARIANT_ANNOTATION = 
resolveVariantAnnotation();
+
+  private static Option<LogicalTypeAnnotation> resolveVariantAnnotation() {
+    try {
+      Method factory = LogicalTypeAnnotation.class.getMethod("variantType", 
byte.class);
+      return Option.of((LogicalTypeAnnotation) factory.invoke(null, 
VARIANT_SPEC_VERSION));
+    } catch (Exception e) {
+      return Option.empty();
+    }
+  }
+
+  /**
+   * Returns the Parquet VARIANT {@link LogicalTypeAnnotation} if parquet-java 
1.16.0+ is on the
+   * classpath, or empty if the annotation class is unavailable.
+   */
+  public static Option<LogicalTypeAnnotation> variantParquetAnnotation() {
+    return VARIANT_ANNOTATION;
+  }
+
   public static Variant getVariant(RowData rowData, int pos) {
     return rowData.getVariant(pos);
   }


Reply via email to