hudi-agent commented on code in PR #18539:
URL: https://github.com/apache/hudi/pull/18539#discussion_r3198805705


##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java:
##########
@@ -273,6 +277,56 @@ public Object convert(HoodieSchema schema, Object object) {
     };
   }
 
+  /**
+   * Creates a converter for Flink 2.1+ VARIANT LogicalType. The converter 
receives a Flink
+   * {@code Variant} object at runtime and extracts the raw metadata/value 
byte arrays via
+   * reflection, then packs them into an Avro GenericRecord with the Variant 
schema.
+   *
+   * <p>Reflection is required because the {@code Variant} interface and 
{@code BinaryVariant}
+   * class only exist in Flink 2.1+, while this module compiles against Flink 
1.20.
+   *
+   * <p>No shredded-variant check is needed here: {@code 
HoodieSchemaConverter.convertVariant()}
+   * already rejects shredded variants before a Flink type or converter is 
ever constructed,
+   * and Flink 2.1 itself only supports unshredded variants (FLIP-521).
+   */
+  private static RowDataToAvroConverter createVariantConverter() {
+    final java.lang.reflect.Method metadataMethod;
+    final java.lang.reflect.Method valueMethod;
+    try {
+      Class<?> binaryVariantClass =
+          Class.forName("org.apache.flink.types.variant.BinaryVariant");
+      metadataMethod = binaryVariantClass.getMethod("getMetadata");
+      valueMethod = binaryVariantClass.getMethod("getValue");
+    } catch (ClassNotFoundException e) {
+      throw new UnsupportedOperationException(
+          "VARIANT LogicalType requires Flink 2.1+ (BinaryVariant class not 
found).", e);
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException("BinaryVariant getMetadata/getValue methods 
not found.", e);
+    }
+
+    return new RowDataToAvroConverter() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Object convert(HoodieSchema schema, Object object) {
+        try {
+          byte[] metadata = (byte[]) metadataMethod.invoke(object);
+          byte[] value = (byte[]) valueMethod.invoke(object);
+
+          final GenericRecord record = new 
GenericData.Record(schema.toAvroSchema());
+          record.put("metadata", ByteBuffer.wrap(metadata));

Review Comment:
   🤖 nit: could you use `HoodieSchema.Variant.VARIANT_METADATA_FIELD` and 
`VARIANT_VALUE_FIELD` here (and on the next line) instead of the raw 
"metadata"/"value" strings? 
`ParquetSchemaConverter.convertVariantToParquetType` already references those 
constants, so using them here keeps the field names consistent across the 
codebase.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to