This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 83d68cd3549 [FLINK-39053][format/avro] Support VARCHAR to ENUM 
conversion in avro-confluent format
83d68cd3549 is described below

commit 83d68cd3549c1988598df397a45356d9812ab795
Author: ddebowczyk92 <[email protected]>
AuthorDate: Sun Mar 22 19:18:06 2026 +0100

    [FLINK-39053][format/avro] Support VARCHAR to ENUM conversion in 
avro-confluent format
---
 .../formats/avro/RowDataToAvroConverters.java      |   3 +
 .../AvroRowDataSchemaProvidedSerDeSchemaTest.java  | 281 +++++++++++++++++++++
 2 files changed, 284 insertions(+)

diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
index af7a936b270..202cd601bc0 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
@@ -139,6 +139,9 @@ public class RowDataToAvroConverters {
 
                             @Override
                             public Object convert(Schema schema, Object 
object) {
+                                if (schema.getType() == Schema.Type.ENUM) {
+                                    return new GenericData.EnumSymbol(schema, 
object.toString());
+                                }
                                 return new Utf8(object.toString());
                             }
                         };
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataSchemaProvidedSerDeSchemaTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataSchemaProvidedSerDeSchemaTest.java
new file mode 100644
index 00000000000..c07f153170a
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataSchemaProvidedSerDeSchemaTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding;
+import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.Encoder;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.apache.flink.formats.avro.utils.AvroTestUtils.createEncoder;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link AvroRowDataSerializationSchema} and {@link 
AvroRowDataDeserializationSchema}
+ * when the Avro schema is provided upfront rather than derived from the Flink 
RowType.
+ *
+ * <p>This is the counterpart of {@link AvroRowDataDeSerializationSchemaTest} 
which tests the flow
+ * where the Avro schema is derived from the RowType. Having a predefined Avro 
schema is the typical
+ * case for schema-registry based formats (e.g. avro-confluent) where the Avro 
schema may contain
+ * types (such as ENUM) that have no direct equivalent in Flink's type system.
+ */
+class AvroRowDataSchemaProvidedSerDeSchemaTest {
+
+    private static final Schema COLORS_SCHEMA = Colors.getClassSchema();
+
+    @ParameterizedTest
+    @EnumSource(AvroEncoding.class)
+    void testSerializeDeserializeWithPredefinedEnumSchema(AvroEncoding 
encoding) throws Exception {
+        final Schema avroSchema =
+                SchemaBuilder.record("TestRecord")
+                        .namespace("org.apache.flink.formats.avro.generated")
+                        .fields()
+                        .requiredString("name")
+                        .name("type_enum")
+                        .type(COLORS_SCHEMA)
+                        .noDefault()
+                        .requiredInt("id")
+                        .endRecord();
+
+        final RowType rowType =
+                (RowType)
+                        ROW(
+                                        FIELD("name", STRING()),
+                                        FIELD("type_enum", STRING()),
+                                        FIELD("id", INT()))
+                                .notNull()
+                                .getLogicalType();
+
+        AvroRowDataSerializationSchema serializationSchema =
+                createSerializationSchema(rowType, avroSchema, encoding);
+        AvroRowDataDeserializationSchema deserializationSchema =
+                createDeserializationSchema(rowType, avroSchema, encoding);
+
+        GenericRowData rowData = new GenericRowData(3);
+        rowData.setField(0, StringData.fromString("Alice"));
+        rowData.setField(1, StringData.fromString("RED"));
+        rowData.setField(2, 42);
+
+        byte[] serialized = serializationSchema.serialize(rowData);
+        RowData deserialized = deserializationSchema.deserialize(serialized);
+
+        assertThat(deserialized.getString(0).toString()).isEqualTo("Alice");
+        assertThat(deserialized.getString(1).toString()).isEqualTo("RED");
+        assertThat(deserialized.getInt(2)).isEqualTo(42);
+    }
+
+    @ParameterizedTest
+    @EnumSource(AvroEncoding.class)
+    void testSerializeDeserializeWithNullableEnumSchema(AvroEncoding encoding) 
throws Exception {
+        final Schema nullableColorsSchema =
+                Schema.createUnion(SchemaBuilder.builder().nullType(), 
COLORS_SCHEMA);
+        final Schema avroSchema =
+                SchemaBuilder.record("TestNullableEnum")
+                        .namespace("org.apache.flink.formats.avro.generated")
+                        .fields()
+                        .requiredString("name")
+                        .name("type_enum")
+                        .type(nullableColorsSchema)
+                        .withDefault(null)
+                        .endRecord();
+
+        final RowType rowType =
+                (RowType)
+                        ROW(FIELD("name", STRING()), FIELD("type_enum", 
STRING()))
+                                .notNull()
+                                .getLogicalType();
+
+        AvroRowDataSerializationSchema serializationSchema =
+                createSerializationSchema(rowType, avroSchema, encoding);
+        AvroRowDataDeserializationSchema deserializationSchema =
+                createDeserializationSchema(rowType, avroSchema, encoding);
+
+        // Test with a non-null enum value
+        GenericRowData rowData = new GenericRowData(2);
+        rowData.setField(0, StringData.fromString("Bob"));
+        rowData.setField(1, StringData.fromString("GREEN"));
+
+        byte[] serialized = serializationSchema.serialize(rowData);
+        RowData deserialized = deserializationSchema.deserialize(serialized);
+
+        assertThat(deserialized.getString(0).toString()).isEqualTo("Bob");
+        assertThat(deserialized.getString(1).toString()).isEqualTo("GREEN");
+
+        // Test with a null enum value
+        GenericRowData rowDataWithNull = new GenericRowData(2);
+        rowDataWithNull.setField(0, StringData.fromString("Charlie"));
+        rowDataWithNull.setField(1, null);
+
+        byte[] serializedNull = serializationSchema.serialize(rowDataWithNull);
+        RowData deserializedNull = 
deserializationSchema.deserialize(serializedNull);
+
+        
assertThat(deserializedNull.getString(0).toString()).isEqualTo("Charlie");
+        assertThat(deserializedNull.isNullAt(1)).isTrue();
+    }
+
+    @ParameterizedTest
+    @EnumSource(AvroEncoding.class)
+    void testSerializeWithInvalidEnumSymbol(AvroEncoding encoding) throws 
Exception {
+        final Schema avroSchema =
+                SchemaBuilder.record("TestRecord")
+                        .namespace("org.apache.flink.formats.avro.generated")
+                        .fields()
+                        .name("type_enum")
+                        .type(COLORS_SCHEMA)
+                        .noDefault()
+                        .endRecord();
+
+        final RowType rowType =
+                (RowType) ROW(FIELD("type_enum", 
STRING())).notNull().getLogicalType();
+
+        AvroRowDataSerializationSchema serializationSchema =
+                createSerializationSchema(rowType, avroSchema, encoding);
+
+        GenericRowData rowData = new GenericRowData(1);
+        rowData.setField(0, StringData.fromString("YELLOW"));
+
+        assertThatThrownBy(() -> serializationSchema.serialize(rowData))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("Failed to serialize row.");
+    }
+
+    @ParameterizedTest
+    @EnumSource(AvroEncoding.class)
+    void testDeserializeEnumFromPredefinedSchema(AvroEncoding encoding) throws 
Exception {
+        final Schema avroSchema =
+                SchemaBuilder.record("TestRecord")
+                        .namespace("org.apache.flink.formats.avro.generated")
+                        .fields()
+                        .name("type_enum")
+                        .type(COLORS_SCHEMA)
+                        .noDefault()
+                        .requiredString("label")
+                        .endRecord();
+
+        final GenericRecord record = new GenericData.Record(avroSchema);
+        record.put("type_enum", new GenericData.EnumSymbol(COLORS_SCHEMA, 
"BLUE"));
+        record.put("label", "urgent");
+
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        GenericDatumWriter<IndexedRecord> datumWriter = new 
GenericDatumWriter<>(avroSchema);
+        Encoder encoder = createEncoder(encoding, avroSchema, 
byteArrayOutputStream);
+        datumWriter.write(record, encoder);
+        encoder.flush();
+        byte[] input = byteArrayOutputStream.toByteArray();
+
+        final RowType rowType =
+                (RowType)
+                        ROW(FIELD("type_enum", STRING()), FIELD("label", 
STRING()))
+                                .notNull()
+                                .getLogicalType();
+
+        AvroRowDataDeserializationSchema deserializationSchema =
+                createDeserializationSchema(rowType, avroSchema, encoding);
+
+        RowData deserialized = deserializationSchema.deserialize(input);
+
+        assertThat(deserialized.getString(0).toString()).isEqualTo("BLUE");
+        assertThat(deserialized.getString(1).toString()).isEqualTo("urgent");
+    }
+
+    @ParameterizedTest
+    @EnumSource(AvroEncoding.class)
+    void testSerializeDeserializeRoundTripWithEnumSchema(AvroEncoding 
encoding) throws Exception {
+        final Schema avroSchema =
+                SchemaBuilder.record("TestRecord")
+                        .namespace("org.apache.flink.formats.avro.generated")
+                        .fields()
+                        .requiredInt("id")
+                        .name("type_enum")
+                        .type(COLORS_SCHEMA)
+                        .noDefault()
+                        .endRecord();
+
+        final RowType rowType =
+                (RowType)
+                        ROW(FIELD("id", INT()), FIELD("type_enum", STRING()))
+                                .notNull()
+                                .getLogicalType();
+
+        AvroRowDataSerializationSchema serializationSchema =
+                createSerializationSchema(rowType, avroSchema, encoding);
+        AvroRowDataDeserializationSchema deserializationSchema =
+                createDeserializationSchema(rowType, avroSchema, encoding);
+
+        // Serialize from Avro GenericRecord to get reference bytes
+        final GenericRecord record = new GenericData.Record(avroSchema);
+        record.put("id", 7);
+        record.put("type_enum", new GenericData.EnumSymbol(COLORS_SCHEMA, 
"GREEN"));
+
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        GenericDatumWriter<IndexedRecord> datumWriter = new 
GenericDatumWriter<>(avroSchema);
+        Encoder encoder = createEncoder(encoding, avroSchema, 
byteArrayOutputStream);
+        datumWriter.write(record, encoder);
+        encoder.flush();
+        byte[] referenceBytes = byteArrayOutputStream.toByteArray();
+
+        // Deserialize reference bytes and re-serialize
+        RowData rowData = deserializationSchema.deserialize(referenceBytes);
+        byte[] reserializedBytes = serializationSchema.serialize(rowData);
+
+        assertThat(reserializedBytes).isEqualTo(referenceBytes);
+    }
+
+    private static AvroRowDataSerializationSchema createSerializationSchema(
+            RowType rowType, Schema avroSchema, AvroEncoding encoding) throws 
Exception {
+        AvroRowDataSerializationSchema serializationSchema =
+                new AvroRowDataSerializationSchema(
+                        rowType,
+                        AvroSerializationSchema.forGeneric(avroSchema, 
encoding),
+                        RowDataToAvroConverters.createConverter(rowType));
+        serializationSchema.open(null);
+        return serializationSchema;
+    }
+
+    private static AvroRowDataDeserializationSchema 
createDeserializationSchema(
+            RowType rowType, Schema avroSchema, AvroEncoding encoding) throws 
Exception {
+        AvroRowDataDeserializationSchema deserializationSchema =
+                new AvroRowDataDeserializationSchema(
+                        AvroDeserializationSchema.forGeneric(avroSchema, 
encoding),
+                        AvroToRowDataConverters.createRowConverter(rowType),
+                        InternalTypeInfo.of(rowType));
+        deserializationSchema.open(null);
+        return deserializationSchema;
+    }
+}

Reply via email to