hudi-agent commented on code in PR #18539:
URL: https://github.com/apache/hudi/pull/18539#discussion_r3198805705
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java:
##########
@@ -273,6 +277,56 @@ public Object convert(HoodieSchema schema, Object object) {
};
}
+ /**
+ * Creates a converter for Flink 2.1+ VARIANT LogicalType. The converter
receives a Flink
+ * {@code Variant} object at runtime and extracts the raw metadata/value
byte arrays via
+ * reflection, then packs them into an Avro GenericRecord with the Variant
schema.
+ *
+ * <p>Reflection is required because the {@code Variant} interface and
{@code BinaryVariant}
+ * class only exist in Flink 2.1+, while this module compiles against Flink
1.20.
+ *
+ * <p>No shredded-variant check is needed here: {@code
HoodieSchemaConverter.convertVariant()}
+ * already rejects shredded variants before a Flink type or converter is
ever constructed,
+ * and Flink 2.1 itself only supports unshredded variants (FLIP-521).
+ */
+ private static RowDataToAvroConverter createVariantConverter() {
+ final java.lang.reflect.Method metadataMethod;
+ final java.lang.reflect.Method valueMethod;
+ try {
+ Class<?> binaryVariantClass =
+ Class.forName("org.apache.flink.types.variant.BinaryVariant");
+ metadataMethod = binaryVariantClass.getMethod("getMetadata");
+ valueMethod = binaryVariantClass.getMethod("getValue");
+ } catch (ClassNotFoundException e) {
+ throw new UnsupportedOperationException(
+ "VARIANT LogicalType requires Flink 2.1+ (BinaryVariant class not
found).", e);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("BinaryVariant getMetadata/getValue methods
not found.", e);
+ }
+
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(HoodieSchema schema, Object object) {
+ try {
+ byte[] metadata = (byte[]) metadataMethod.invoke(object);
+ byte[] value = (byte[]) valueMethod.invoke(object);
+
+ final GenericRecord record = new
GenericData.Record(schema.toAvroSchema());
+ record.put("metadata", ByteBuffer.wrap(metadata));
Review Comment:
🤖 nit: could you use `HoodieSchema.Variant.VARIANT_METADATA_FIELD` and
`VARIANT_VALUE_FIELD` here (and on the next line) instead of the raw
"metadata"/"value" strings?
`ParquetSchemaConverter.convertVariantToParquetType` already references those
constants, so using them here keeps the field names consistent across the
codebase.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]