This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c656cf72a71 [FLINK-39023][Formats][Avro] Add avro to variant datatype 
converter
c656cf72a71 is described below

commit c656cf72a712c73c2dfc023b0eb1f3b020d0cf89
Author: Swapna Marru <[email protected]>
AuthorDate: Wed Apr 22 01:01:39 2026 -0700

    [FLINK-39023][Formats][Avro] Add avro to variant datatype converter
---
 .../formats/avro/AvroToVariantDataConverters.java  | 265 ++++++++++++++++++
 .../avro/AvroToVariantDataConvertersTest.java      | 306 +++++++++++++++++++++
 2 files changed, 571 insertions(+)

diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToVariantDataConverters.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToVariantDataConverters.java
new file mode 100644
index 00000000000..13813c76a3a
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToVariantDataConverters.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.types.variant.Variant;
+import org.apache.flink.types.variant.VariantBuilder;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Internal
+public class AvroToVariantDataConverters {
+    private static final VariantBuilder SHARED_BUILDER = Variant.newBuilder();
+
+    public static AvroToVariantDataConverter createVariantConverter(Schema 
schema) {
+        return createNullableConverter(schema);
+    }
+
+    /** Creates a converter that directly maps Avro types to Variant types. */
+    private static AvroToVariantDataConverter 
createAvroToVariantConverter(Schema schema) {
+        LogicalType logicalType = schema.getLogicalType();
+
+        switch (schema.getType()) {
+            case RECORD:
+                List<Schema.Field> fields = schema.getFields();
+                String[] fieldNames = new String[fields.size()];
+                AvroToVariantDataConverter[] fieldConverters =
+                        new AvroToVariantDataConverter[fields.size()];
+
+                for (int i = 0; i < fields.size(); i++) {
+                    Schema.Field field = fields.get(i);
+                    fieldNames[i] = field.name();
+                    fieldConverters[i] = 
createNullableConverter(field.schema());
+                }
+
+                return (avroObject) -> {
+                    GenericRecord record = (GenericRecord) avroObject;
+                    VariantBuilder.VariantObjectBuilder variantObjectBuilder =
+                            SHARED_BUILDER.object();
+
+                    for (int i = 0; i < fieldNames.length; i++) {
+                        String fieldName = fieldNames[i];
+                        AvroToVariantDataConverter converter = 
fieldConverters[i];
+                        Object fieldValue = record.get(fieldName);
+
+                        variantObjectBuilder.add(fieldName, 
converter.convert(fieldValue));
+                    }
+
+                    return variantObjectBuilder.build();
+                };
+
+            case NULL:
+                return (avroObject) -> SHARED_BUILDER.ofNull();
+
+            case BOOLEAN:
+                return (avroObject) -> SHARED_BUILDER.of((Boolean) avroObject);
+
+            case INT:
+                if (logicalType == LogicalTypes.date()) {
+                    return (avroObject) -> 
SHARED_BUILDER.of(convertToDate((Integer) avroObject));
+                } // Time-millis (logical type represents a time of day, with 
no reference to a
+                // particular calendar).
+                // Store it as LONG by converting millis to micros as the 
logical type millis or
+                // micros is lost.
+                else if (logicalType == LogicalTypes.timeMillis()) {
+                    return (avroObject) -> SHARED_BUILDER.of((Integer) 
avroObject * 1000L);
+                } else {
+                    return (avroObject) -> SHARED_BUILDER.of((Integer) 
avroObject);
+                }
+
+            case LONG:
+                if (logicalType == LogicalTypes.timestampMillis()
+                        || logicalType == LogicalTypes.timestampMicros()) {
+                    return (avroObject) ->
+                            SHARED_BUILDER.of(
+                                    convertToTimestamp(
+                                            (Long) avroObject,
+                                            LogicalTypes.timestampMicros() == 
logicalType));
+                } else {
+                    return (avroObject) -> SHARED_BUILDER.of((Long) 
avroObject);
+                }
+
+            case FLOAT:
+                return (avroObject) -> SHARED_BUILDER.of((Float) avroObject);
+
+            case DOUBLE:
+                return (avroObject) -> SHARED_BUILDER.of((Double) avroObject);
+
+            case BYTES:
+            case FIXED:
+                if (logicalType instanceof LogicalTypes.Decimal) {
+                    LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) 
logicalType;
+                    return createDecimalConverter(
+                            decimalType.getPrecision(), 
decimalType.getScale());
+                } else {
+                    return object -> SHARED_BUILDER.of(convertToBytes(object));
+                }
+
+            case STRING:
+            case ENUM:
+                return (avroObject) -> 
SHARED_BUILDER.of(avroObject.toString());
+
+            case ARRAY:
+                Schema elementSchema = schema.getElementType();
+                AvroToVariantDataConverter elementConverter =
+                        createNullableConverter(elementSchema);
+                return createArrayConverter(elementConverter);
+
+            case MAP:
+                Schema valueSchema = schema.getValueType();
+                AvroToVariantDataConverter valueConverter = 
createNullableConverter(valueSchema);
+                return createMapConverter(valueConverter);
+
+            case UNION:
+                // Handle nullable types (union with null)
+                List<Schema> nonNullUnionType =
+                        schema.getTypes().stream()
+                                .filter(t -> t.getType() != Schema.Type.NULL)
+                                .collect(Collectors.toList());
+
+                if (nonNullUnionType.size() == 1) {
+                    return createNullableConverter(nonNullUnionType.get(0));
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Avro Union with NULL type is only supported. 
Unsupported types: "
+                                    + schema.getTypes());
+                }
+
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + 
schema.getType());
+        }
+    }
+
+    /** Creates an array converter that works directly with Avro elements. */
+    private static AvroToVariantDataConverter createArrayConverter(
+            AvroToVariantDataConverter elementConverter) {
+        return (avroObject) -> {
+            List<?> list = (List<?>) avroObject;
+            VariantBuilder.VariantArrayBuilder variantArrayBuilder = 
SHARED_BUILDER.array();
+
+            for (Object item : list) {
+                Variant convertedItem = elementConverter.convert(item);
+                variantArrayBuilder.add(convertedItem);
+            }
+
+            return variantArrayBuilder.build();
+        };
+    }
+
+    /** Creates a map converter that works directly with Avro values. */
+    private static AvroToVariantDataConverter createMapConverter(
+            AvroToVariantDataConverter valueConverter) {
+        return (avroObject) -> {
+            Map<?, ?> map = (Map<?, ?>) avroObject;
+            VariantBuilder.VariantObjectBuilder variantObjectBuilder = 
SHARED_BUILDER.object();
+
+            for (Map.Entry<?, ?> entry : map.entrySet()) {
+                String key = entry.getKey().toString(); // Avro maps always 
have string keys
+                Variant convertedValue = 
valueConverter.convert(entry.getValue());
+                variantObjectBuilder.add(key, convertedValue);
+            }
+
+            return variantObjectBuilder.build();
+        };
+    }
+
+    /** Creates a decimal converter for the specified precision and scale. */
+    private static AvroToVariantDataConverter createDecimalConverter(int 
precision, int scale) {
+        return avroObject -> {
+            if (avroObject instanceof BigDecimal) {
+                return SHARED_BUILDER.of((BigDecimal) avroObject);
+            }
+
+            return SHARED_BUILDER.of(
+                    DecimalData.fromUnscaledBytes(convertToBytes(avroObject), 
precision, scale)
+                            .toBigDecimal());
+        };
+    }
+
+    /*
+    timestamp-millis logical type annotates an Avro long, where the long stores
+    the number of milliseconds from the unix epoch, 1 January 1970 
00:00:00.000 UTC.
+     */
+    private static LocalDateTime convertToTimestamp(Long object, boolean 
isMicros) {
+        int nanos;
+        long secs;
+
+        if (isMicros) {
+            secs = object / 1000_000;
+            nanos = (int) (object - secs * 1000_000) * 1000;
+        } else {
+            secs = object / 1000L;
+            nanos = (int) (object - secs * 1000L) * 1000_000;
+        }
+
+        return LocalDateTime.ofEpochSecond(secs, nanos, ZoneOffset.UTC);
+    }
+
+    /*
+    The date logical type represents a date within the calendar, with no 
reference to a particular time zone or time of day.
+    A date logical type annotates an Avro int, where the int stores the number 
of days from the unix epoch, 1 January 1970 (ISO calendar).
+     */
+    private static LocalDate convertToDate(Integer numOfDays) {
+        return LocalDate.ofEpochDay(numOfDays);
+    }
+
+    private static byte[] convertToBytes(Object object) {
+        if (object instanceof GenericFixed) {
+            return ((GenericFixed) object).bytes();
+        } else if (object instanceof ByteBuffer) {
+            ByteBuffer byteBuffer = (ByteBuffer) object;
+            byte[] bytes = new byte[byteBuffer.remaining()];
+            byteBuffer.get(bytes);
+            return bytes;
+        } else {
+            return (byte[]) object;
+        }
+    }
+
+    private static AvroToVariantDataConverter createNullableConverter(Schema 
schema) {
+        final AvroToVariantDataConverter converter = 
createAvroToVariantConverter(schema);
+        return avroObject -> {
+            if (avroObject == null) {
+                return SHARED_BUILDER.ofNull();
+            }
+            return converter.convert(avroObject);
+        };
+    }
+
+    @FunctionalInterface
+    public interface AvroToVariantDataConverter extends Serializable {
+        Variant convert(Object object);
+    }
+}
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroToVariantDataConvertersTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroToVariantDataConvertersTest.java
new file mode 100644
index 00000000000..f2dfe624d45
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroToVariantDataConvertersTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.types.variant.Variant;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AvroToVariantDataConvertersTest {
+
+    private static final Schema TEST_SCHEMA_1 =
+            new Schema.Parser()
+                    .parse(
+                            ""
+                                    + "{\"namespace\": 
\"org.apache.flink.formats.avro.generated\",\n"
+                                    + " \"type\": \"record\",\n"
+                                    + " \"name\": \"Data\",\n"
+                                    + " \"fields\": [\n"
+                                    + "     {\"name\": \"a\", \"type\": 
\"string\"},\n"
+                                    + "     {\"name\": \"b\", \"type\": 
\"int\"}\n"
+                                    + "  ]\n"
+                                    + "}");
+
+    private static final Schema TEST_SCHEMA_2 =
+            new Schema.Parser()
+                    .parse(
+                            ""
+                                    + "{\"namespace\": 
\"org.apache.flink.formats.avro.generated\",\n"
+                                    + " \"type\": \"record\",\n"
+                                    + " \"name\": \"Timestamps\",\n"
+                                    + " \"fields\": [\n"
+                                    + "     {\"name\": \"tsMillis\", \"type\": 
{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}},\n"
+                                    + "     {\"name\": \"tsMicros\", \"type\": 
{\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}},\n"
+                                    + "     {\"name\": \"tsMicros_union\",\n"
+                                    + "      \"type\" : [ \"null\", {\n"
+                                    + "        \"type\" : \"long\",\n"
+                                    + "        \"logicalType\" : 
\"timestamp-micros\"\n"
+                                    + "      } ],\n"
+                                    + "      \"default\": null},\n"
+                                    + "     {\"name\": \"timeMillis\", 
\"type\": {\"type\": \"int\", \"logicalType\": \"time-millis\"}},\n"
+                                    + "     {\"name\": \"date\", \"type\": 
{\"type\": \"int\", \"logicalType\": \"date\"}}\n"
+                                    + "  ]\n"
+                                    + "}");
+
+    private static final Schema INNER_SCHEMA =
+            new Schema.Parser()
+                    .parse(
+                            ""
+                                    + "{\"namespace\": 
\"org.apache.flink.formats.avro.generated\",\n"
+                                    + " \"type\": \"record\",\n"
+                                    + " \"name\": \"InnerType\",\n"
+                                    + " \"fields\": [\n"
+                                    + "     {\"name\": \"id\", \"type\": 
\"int\"},\n"
+                                    + "     {\"name\": \"key\", \"type\": 
\"string\"}\n"
+                                    + "  ]\n"
+                                    + "}");
+
+    private static final Schema COMPLEX_UNION_SCHEMA =
+            new Schema.Parser()
+                    .parse(
+                            ""
+                                    + "{\"namespace\": 
\"org.apache.flink.formats.avro.generated\",\n"
+                                    + " \"type\": \"record\",\n"
+                                    + " \"name\": \"ComplexWithUnionType\",\n"
+                                    + " \"fields\": [\n"
+                                    + "     {\"name\": \"id\", \"type\": 
\"int\"},\n"
+                                    + "     {\"name\": \"value\", \"type\": 
\"string\"},\n"
+                                    + "     { \"name\" : \"nullableCol\", 
\"type\" : [ \"null\", \"string\" ], \"default\" : null },"
+                                    + "     {\"name\": \"inner\", \"type\": [ 
\"null\", "
+                                    + INNER_SCHEMA
+                                    + " ], \"default\" : null }"
+                                    + "  ]\n"
+                                    + "}");
+
+    private static final Schema ARRAY_SCHEMA =
+            new Schema.Parser()
+                    .parse(
+                            "{\n"
+                                    + "  \"type\": \"record\",\n"
+                                    + "  \"name\": \"ArrayRecord\",\n"
+                                    + "  \"namespace\": 
\"org.apache.flink\",\n"
+                                    + "  \"fields\": [\n"
+                                    + "    {\n"
+                                    + "      \"name\": \"values\",\n"
+                                    + "      \"type\": {\n"
+                                    + "        \"type\": \"array\",\n"
+                                    + "        \"items\": \"string\"\n"
+                                    + "      }\n"
+                                    + "    }\n"
+                                    + "  ]\n"
+                                    + "}");
+
+    private static final Schema SIMPLE_TYPES_SCHEMA =
+            new Schema.Parser()
+                    .parse(
+                            ""
+                                    + "{\"namespace\": 
\"org.apache.flink.formats.avro.generated\",\n"
+                                    + " \"type\": \"record\",\n"
+                                    + " \"name\": \"SimpleTypes\",\n"
+                                    + " \"fields\": [\n"
+                                    + "     {\"name\": \"boolField\", 
\"type\": \"boolean\"},\n"
+                                    + "     {\"name\": \"intField\", \"type\": 
\"int\"},\n"
+                                    + "     {\"name\": \"longField\", 
\"type\": \"long\"},\n"
+                                    + "     {\"name\": \"floatField\", 
\"type\": \"float\"},\n"
+                                    + "     {\"name\": \"doubleField\", 
\"type\": \"double\"},\n"
+                                    + "     {\"name\": \"stringField\", 
\"type\": \"string\"}\n"
+                                    + "  ]\n"
+                                    + "}");
+
+    @Test
+    public void testBasicAvroToVariantConversion() {
+        var converter = 
AvroToVariantDataConverters.createVariantConverter(TEST_SCHEMA_1);
+
+        GenericRecord record =
+                new GenericRecordBuilder(TEST_SCHEMA_1).set("a", 
"hello").set("b", 42).build();
+
+        Variant variant = converter.convert(record);
+
+        assertTrue(variant.isObject());
+        assertEquals("hello", variant.getField("a").getString());
+        assertEquals(42, variant.getField("b").getInt());
+    }
+
+    @Test
+    public void testSimpleTypesConversion() {
+        var converter = 
AvroToVariantDataConverters.createVariantConverter(SIMPLE_TYPES_SCHEMA);
+
+        GenericRecord record =
+                new GenericRecordBuilder(SIMPLE_TYPES_SCHEMA)
+                        .set("boolField", true)
+                        .set("intField", 123)
+                        .set("longField", 456L)
+                        .set("floatField", 78.9f)
+                        .set("doubleField", 12.34)
+                        .set("stringField", "test")
+                        .build();
+
+        Variant variant = converter.convert(record);
+
+        assertTrue(variant.isObject());
+        assertTrue(variant.getField("boolField").getBoolean());
+        assertEquals(123, variant.getField("intField").getInt());
+        assertEquals(456L, variant.getField("longField").getLong());
+        assertEquals(78.9f, variant.getField("floatField").getFloat(), 0.001f);
+        assertEquals(12.34, variant.getField("doubleField").getDouble(), 
0.001);
+        assertEquals("test", variant.getField("stringField").getString());
+    }
+
+    @Test
+    public void testTimestampAndDateLogicalTypes() {
+        var converter = 
AvroToVariantDataConverters.createVariantConverter(TEST_SCHEMA_2);
+
+        long currentTimeMillis = System.currentTimeMillis();
+        long currentTimeMicros = currentTimeMillis * 1000 + 123;
+        int currentDateDays = (int) (currentTimeMillis / (24 * 60 * 60 * 
1000L));
+        int timeMillis8am = (int) TimeUnit.HOURS.toMillis(8);
+
+        long currentTimeSeconds = currentTimeMillis / 1000;
+        int nanosTsMicros = (int) (currentTimeMicros - currentTimeSeconds * 
1000_000) * 1000;
+        int nanosTsMillis = (int) (currentTimeMillis - currentTimeSeconds * 
1000) * 1000_000;
+
+        GenericRecord record =
+                new GenericRecordBuilder(TEST_SCHEMA_2)
+                        .set("tsMillis", currentTimeMillis)
+                        .set("tsMicros", currentTimeMicros)
+                        .set("tsMicros_union", currentTimeMicros)
+                        .set("timeMillis", timeMillis8am)
+                        .set("date", currentDateDays)
+                        .build();
+
+        Variant variant = converter.convert(record);
+
+        assertTrue(variant.isObject());
+
+        // Timestamps should be converted to LocalDateTime
+        assertEquals(variant.getField("tsMillis").getType(), 
Variant.Type.TIMESTAMP);
+        assertEquals(variant.getField("tsMicros").getType(), 
Variant.Type.TIMESTAMP);
+        assertEquals(variant.getField("tsMicros_union").getType(), 
Variant.Type.TIMESTAMP);
+        assertEquals(
+                variant.getField("tsMillis").getDateTime(),
+                LocalDateTime.ofEpochSecond(currentTimeSeconds, nanosTsMillis, 
ZoneOffset.UTC));
+        assertEquals(
+                variant.getField("tsMicros").getDateTime(),
+                LocalDateTime.ofEpochSecond(currentTimeSeconds, nanosTsMicros, 
ZoneOffset.UTC));
+        assertEquals(
+                variant.getField("tsMicros_union").getDateTime(),
+                LocalDateTime.ofEpochSecond(currentTimeSeconds, nanosTsMicros, 
ZoneOffset.UTC));
+
+        // Time-millis should be converted to long (micros)
+        assertEquals(timeMillis8am * 1000L, 
variant.getField("timeMillis").getLong());
+
+        // Date should be converted to LocalDate
+        assertEquals(variant.getField("date").getType(), Variant.Type.DATE);
+        LocalDate expectedDate = LocalDate.ofEpochDay(currentDateDays);
+        assertEquals(expectedDate, variant.getField("date").getDate());
+    }
+
+    @Test
+    public void testComplexRecordWithUnion() {
+        var converter = 
AvroToVariantDataConverters.createVariantConverter(COMPLEX_UNION_SCHEMA);
+
+        GenericData.Record inner =
+                new GenericRecordBuilder(INNER_SCHEMA).set("id", 
10).set("key", "test_key").build();
+        GenericData.Record record =
+                new GenericRecordBuilder(COMPLEX_UNION_SCHEMA)
+                        .set("id", 1)
+                        .set("value", "main_value")
+                        .set("nullableCol", "not_null")
+                        .set("inner", inner)
+                        .build();
+
+        Variant variant = converter.convert(record);
+
+        assertTrue(variant.isObject());
+        assertEquals(1, variant.getField("id").getInt());
+        assertEquals("main_value", variant.getField("value").getString());
+        assertEquals("not_null", variant.getField("nullableCol").getString());
+
+        var innerVariant = variant.getField("inner");
+        assertTrue(innerVariant.isObject());
+        assertEquals(10, innerVariant.getField("id").getInt());
+        assertEquals("test_key", innerVariant.getField("key").getString());
+    }
+
+    @Test
+    public void testComplexRecordWithNullValues() {
+        var converter = 
AvroToVariantDataConverters.createVariantConverter(COMPLEX_UNION_SCHEMA);
+
+        GenericData.Record record =
+                new GenericRecordBuilder(COMPLEX_UNION_SCHEMA)
+                        .set("id", 1)
+                        .set("value", "main_value")
+                        .set("nullableCol", null)
+                        .set("inner", null)
+                        .build();
+
+        Variant variant = converter.convert(record);
+
+        assertTrue(variant.isObject());
+        assertEquals(1, variant.getField("id").getInt());
+        assertEquals("main_value", variant.getField("value").getString());
+        assertTrue(variant.getField("nullableCol").isNull());
+        assertTrue(variant.getField("inner").isNull());
+    }
+
+    @Test
+    public void testArrayConversion() {
+        var converter = 
AvroToVariantDataConverters.createVariantConverter(ARRAY_SCHEMA);
+
+        List<String> stringList = new ArrayList<>();
+        stringList.add("item1");
+        stringList.add("item2");
+        stringList.add("item3");
+
+        GenericData.Record record =
+                new GenericRecordBuilder(ARRAY_SCHEMA).set("values", 
stringList).build();
+
+        Variant variant = converter.convert(record);
+
+        assertTrue(variant.isObject());
+        var arrayVariant = variant.getField("values");
+        assertTrue(arrayVariant.isArray());
+
+        assertEquals("item1", arrayVariant.getElement(0).getString());
+        assertEquals("item2", arrayVariant.getElement(1).getString());
+        assertEquals("item3", arrayVariant.getElement(2).getString());
+    }
+
+    @Test
+    public void testNullRecord() {
+        var converter = 
AvroToVariantDataConverters.createVariantConverter(TEST_SCHEMA_1);
+
+        Variant variant = converter.convert(null);
+        assertTrue(variant.isNull());
+    }
+}

Reply via email to