This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c656cf72a71 [FLINK-39023][Formats][Avro] Add avro to variant datatype
converter
c656cf72a71 is described below
commit c656cf72a712c73c2dfc023b0eb1f3b020d0cf89
Author: Swapna Marru <[email protected]>
AuthorDate: Wed Apr 22 01:01:39 2026 -0700
[FLINK-39023][Formats][Avro] Add avro to variant datatype converter
---
.../formats/avro/AvroToVariantDataConverters.java | 265 ++++++++++++++++++
.../avro/AvroToVariantDataConvertersTest.java | 306 +++++++++++++++++++++
2 files changed, 571 insertions(+)
diff --git
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToVariantDataConverters.java
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToVariantDataConverters.java
new file mode 100644
index 00000000000..13813c76a3a
--- /dev/null
+++
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToVariantDataConverters.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.types.variant.Variant;
+import org.apache.flink.types.variant.VariantBuilder;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Internal
+public class AvroToVariantDataConverters {
+ private static final VariantBuilder SHARED_BUILDER = Variant.newBuilder();
+
+ public static AvroToVariantDataConverter createVariantConverter(Schema
schema) {
+ return createNullableConverter(schema);
+ }
+
+ /** Creates a converter that directly maps Avro types to Variant types. */
+ private static AvroToVariantDataConverter
createAvroToVariantConverter(Schema schema) {
+ LogicalType logicalType = schema.getLogicalType();
+
+ switch (schema.getType()) {
+ case RECORD:
+ List<Schema.Field> fields = schema.getFields();
+ String[] fieldNames = new String[fields.size()];
+ AvroToVariantDataConverter[] fieldConverters =
+ new AvroToVariantDataConverter[fields.size()];
+
+ for (int i = 0; i < fields.size(); i++) {
+ Schema.Field field = fields.get(i);
+ fieldNames[i] = field.name();
+ fieldConverters[i] =
createNullableConverter(field.schema());
+ }
+
+ return (avroObject) -> {
+ GenericRecord record = (GenericRecord) avroObject;
+ VariantBuilder.VariantObjectBuilder variantObjectBuilder =
+ SHARED_BUILDER.object();
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ String fieldName = fieldNames[i];
+ AvroToVariantDataConverter converter =
fieldConverters[i];
+ Object fieldValue = record.get(fieldName);
+
+ variantObjectBuilder.add(fieldName,
converter.convert(fieldValue));
+ }
+
+ return variantObjectBuilder.build();
+ };
+
+ case NULL:
+ return (avroObject) -> SHARED_BUILDER.ofNull();
+
+ case BOOLEAN:
+ return (avroObject) -> SHARED_BUILDER.of((Boolean) avroObject);
+
+ case INT:
+ if (logicalType == LogicalTypes.date()) {
+ return (avroObject) ->
SHARED_BUILDER.of(convertToDate((Integer) avroObject));
+ } // Time-millis (logical type represents a time of day, with
no reference to a
+ // particular calendar).
+ // Store it as LONG by converting millis to micros as the
logical type millis or
+ // micros is lost.
+ else if (logicalType == LogicalTypes.timeMillis()) {
+ return (avroObject) -> SHARED_BUILDER.of((Integer)
avroObject * 1000L);
+ } else {
+ return (avroObject) -> SHARED_BUILDER.of((Integer)
avroObject);
+ }
+
+ case LONG:
+ if (logicalType == LogicalTypes.timestampMillis()
+ || logicalType == LogicalTypes.timestampMicros()) {
+ return (avroObject) ->
+ SHARED_BUILDER.of(
+ convertToTimestamp(
+ (Long) avroObject,
+ LogicalTypes.timestampMicros() ==
logicalType));
+ } else {
+ return (avroObject) -> SHARED_BUILDER.of((Long)
avroObject);
+ }
+
+ case FLOAT:
+ return (avroObject) -> SHARED_BUILDER.of((Float) avroObject);
+
+ case DOUBLE:
+ return (avroObject) -> SHARED_BUILDER.of((Double) avroObject);
+
+ case BYTES:
+ case FIXED:
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
logicalType;
+ return createDecimalConverter(
+ decimalType.getPrecision(),
decimalType.getScale());
+ } else {
+ return object -> SHARED_BUILDER.of(convertToBytes(object));
+ }
+
+ case STRING:
+ case ENUM:
+ return (avroObject) ->
SHARED_BUILDER.of(avroObject.toString());
+
+ case ARRAY:
+ Schema elementSchema = schema.getElementType();
+ AvroToVariantDataConverter elementConverter =
+ createNullableConverter(elementSchema);
+ return createArrayConverter(elementConverter);
+
+ case MAP:
+ Schema valueSchema = schema.getValueType();
+ AvroToVariantDataConverter valueConverter =
createNullableConverter(valueSchema);
+ return createMapConverter(valueConverter);
+
+ case UNION:
+ // Handle nullable types (union with null)
+ List<Schema> nonNullUnionType =
+ schema.getTypes().stream()
+ .filter(t -> t.getType() != Schema.Type.NULL)
+ .collect(Collectors.toList());
+
+ if (nonNullUnionType.size() == 1) {
+ return createNullableConverter(nonNullUnionType.get(0));
+ } else {
+ throw new UnsupportedOperationException(
+ "Avro Union with NULL type is only supported.
Unsupported types: "
+ + schema.getTypes());
+ }
+
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
schema.getType());
+ }
+ }
+
+ /** Creates an array converter that works directly with Avro elements. */
+ private static AvroToVariantDataConverter createArrayConverter(
+ AvroToVariantDataConverter elementConverter) {
+ return (avroObject) -> {
+ List<?> list = (List<?>) avroObject;
+ VariantBuilder.VariantArrayBuilder variantArrayBuilder =
SHARED_BUILDER.array();
+
+ for (Object item : list) {
+ Variant convertedItem = elementConverter.convert(item);
+ variantArrayBuilder.add(convertedItem);
+ }
+
+ return variantArrayBuilder.build();
+ };
+ }
+
+ /** Creates a map converter that works directly with Avro values. */
+ private static AvroToVariantDataConverter createMapConverter(
+ AvroToVariantDataConverter valueConverter) {
+ return (avroObject) -> {
+ Map<?, ?> map = (Map<?, ?>) avroObject;
+ VariantBuilder.VariantObjectBuilder variantObjectBuilder =
SHARED_BUILDER.object();
+
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ String key = entry.getKey().toString(); // Avro maps always
have string keys
+ Variant convertedValue =
valueConverter.convert(entry.getValue());
+ variantObjectBuilder.add(key, convertedValue);
+ }
+
+ return variantObjectBuilder.build();
+ };
+ }
+
+ /** Creates a decimal converter for the specified precision and scale. */
+ private static AvroToVariantDataConverter createDecimalConverter(int
precision, int scale) {
+ return avroObject -> {
+ if (avroObject instanceof BigDecimal) {
+ return SHARED_BUILDER.of((BigDecimal) avroObject);
+ }
+
+ return SHARED_BUILDER.of(
+ DecimalData.fromUnscaledBytes(convertToBytes(avroObject),
precision, scale)
+ .toBigDecimal());
+ };
+ }
+
+ /*
+ timestamp-millis logical type annotates an Avro long, where the long stores
+ the number of milliseconds from the unix epoch, 1 January 1970
00:00:00.000 UTC.
+ */
+ private static LocalDateTime convertToTimestamp(Long object, boolean
isMicros) {
+ int nanos;
+ long secs;
+
+ if (isMicros) {
+ secs = object / 1000_000;
+ nanos = (int) (object - secs * 1000_000) * 1000;
+ } else {
+ secs = object / 1000L;
+ nanos = (int) (object - secs * 1000L) * 1000_000;
+ }
+
+ return LocalDateTime.ofEpochSecond(secs, nanos, ZoneOffset.UTC);
+ }
+
+ /*
+ The date logical type represents a date within the calendar, with no
reference to a particular time zone or time of day.
+ A date logical type annotates an Avro int, where the int stores the number
of days from the unix epoch, 1 January 1970 (ISO calendar).
+ */
+ private static LocalDate convertToDate(Integer numOfDays) {
+ return LocalDate.ofEpochDay(numOfDays);
+ }
+
+ private static byte[] convertToBytes(Object object) {
+ if (object instanceof GenericFixed) {
+ return ((GenericFixed) object).bytes();
+ } else if (object instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) object;
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return bytes;
+ } else {
+ return (byte[]) object;
+ }
+ }
+
+ private static AvroToVariantDataConverter createNullableConverter(Schema
schema) {
+ final AvroToVariantDataConverter converter =
createAvroToVariantConverter(schema);
+ return avroObject -> {
+ if (avroObject == null) {
+ return SHARED_BUILDER.ofNull();
+ }
+ return converter.convert(avroObject);
+ };
+ }
+
+ @FunctionalInterface
+ public interface AvroToVariantDataConverter extends Serializable {
+ Variant convert(Object object);
+ }
+}
diff --git
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroToVariantDataConvertersTest.java
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroToVariantDataConvertersTest.java
new file mode 100644
index 00000000000..f2dfe624d45
--- /dev/null
+++
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroToVariantDataConvertersTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.types.variant.Variant;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AvroToVariantDataConvertersTest {
+
+ private static final Schema TEST_SCHEMA_1 =
+ new Schema.Parser()
+ .parse(
+ ""
+ + "{\"namespace\":
\"org.apache.flink.formats.avro.generated\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"Data\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"a\", \"type\":
\"string\"},\n"
+ + " {\"name\": \"b\", \"type\":
\"int\"}\n"
+ + " ]\n"
+ + "}");
+
+ private static final Schema TEST_SCHEMA_2 =
+ new Schema.Parser()
+ .parse(
+ ""
+ + "{\"namespace\":
\"org.apache.flink.formats.avro.generated\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"Timestamps\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"tsMillis\", \"type\":
{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}},\n"
+ + " {\"name\": \"tsMicros\", \"type\":
{\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}},\n"
+ + " {\"name\": \"tsMicros_union\",\n"
+ + " \"type\" : [ \"null\", {\n"
+ + " \"type\" : \"long\",\n"
+ + " \"logicalType\" :
\"timestamp-micros\"\n"
+ + " } ],\n"
+ + " \"default\": null},\n"
+ + " {\"name\": \"timeMillis\",
\"type\": {\"type\": \"int\", \"logicalType\": \"time-millis\"}},\n"
+ + " {\"name\": \"date\", \"type\":
{\"type\": \"int\", \"logicalType\": \"date\"}}\n"
+ + " ]\n"
+ + "}");
+
+ private static final Schema INNER_SCHEMA =
+ new Schema.Parser()
+ .parse(
+ ""
+ + "{\"namespace\":
\"org.apache.flink.formats.avro.generated\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"InnerType\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"id\", \"type\":
\"int\"},\n"
+ + " {\"name\": \"key\", \"type\":
\"string\"}\n"
+ + " ]\n"
+ + "}");
+
+ private static final Schema COMPLEX_UNION_SCHEMA =
+ new Schema.Parser()
+ .parse(
+ ""
+ + "{\"namespace\":
\"org.apache.flink.formats.avro.generated\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"ComplexWithUnionType\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"id\", \"type\":
\"int\"},\n"
+ + " {\"name\": \"value\", \"type\":
\"string\"},\n"
+ + " { \"name\" : \"nullableCol\",
\"type\" : [ \"null\", \"string\" ], \"default\" : null },"
+ + " {\"name\": \"inner\", \"type\": [
\"null\", "
+ + INNER_SCHEMA
+ + " ], \"default\" : null }"
+ + " ]\n"
+ + "}");
+
+ private static final Schema ARRAY_SCHEMA =
+ new Schema.Parser()
+ .parse(
+ "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"ArrayRecord\",\n"
+ + " \"namespace\":
\"org.apache.flink\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"values\",\n"
+ + " \"type\": {\n"
+ + " \"type\": \"array\",\n"
+ + " \"items\": \"string\"\n"
+ + " }\n"
+ + " }\n"
+ + " ]\n"
+ + "}");
+
+ private static final Schema SIMPLE_TYPES_SCHEMA =
+ new Schema.Parser()
+ .parse(
+ ""
+ + "{\"namespace\":
\"org.apache.flink.formats.avro.generated\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"SimpleTypes\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"boolField\",
\"type\": \"boolean\"},\n"
+ + " {\"name\": \"intField\", \"type\":
\"int\"},\n"
+ + " {\"name\": \"longField\",
\"type\": \"long\"},\n"
+ + " {\"name\": \"floatField\",
\"type\": \"float\"},\n"
+ + " {\"name\": \"doubleField\",
\"type\": \"double\"},\n"
+ + " {\"name\": \"stringField\",
\"type\": \"string\"}\n"
+ + " ]\n"
+ + "}");
+
+ @Test
+ public void testBasicAvroToVariantConversion() {
+ var converter =
AvroToVariantDataConverters.createVariantConverter(TEST_SCHEMA_1);
+
+ GenericRecord record =
+ new GenericRecordBuilder(TEST_SCHEMA_1).set("a",
"hello").set("b", 42).build();
+
+ Variant variant = converter.convert(record);
+
+ assertTrue(variant.isObject());
+ assertEquals("hello", variant.getField("a").getString());
+ assertEquals(42, variant.getField("b").getInt());
+ }
+
+ @Test
+ public void testSimpleTypesConversion() {
+ var converter =
AvroToVariantDataConverters.createVariantConverter(SIMPLE_TYPES_SCHEMA);
+
+ GenericRecord record =
+ new GenericRecordBuilder(SIMPLE_TYPES_SCHEMA)
+ .set("boolField", true)
+ .set("intField", 123)
+ .set("longField", 456L)
+ .set("floatField", 78.9f)
+ .set("doubleField", 12.34)
+ .set("stringField", "test")
+ .build();
+
+ Variant variant = converter.convert(record);
+
+ assertTrue(variant.isObject());
+ assertTrue(variant.getField("boolField").getBoolean());
+ assertEquals(123, variant.getField("intField").getInt());
+ assertEquals(456L, variant.getField("longField").getLong());
+ assertEquals(78.9f, variant.getField("floatField").getFloat(), 0.001f);
+ assertEquals(12.34, variant.getField("doubleField").getDouble(),
0.001);
+ assertEquals("test", variant.getField("stringField").getString());
+ }
+
+ @Test
+ public void testTimestampAndDateLogicalTypes() {
+ var converter =
AvroToVariantDataConverters.createVariantConverter(TEST_SCHEMA_2);
+
+ long currentTimeMillis = System.currentTimeMillis();
+ long currentTimeMicros = currentTimeMillis * 1000 + 123;
+ int currentDateDays = (int) (currentTimeMillis / (24 * 60 * 60 *
1000L));
+ int timeMillis8am = (int) TimeUnit.HOURS.toMillis(8);
+
+ long currentTimeSeconds = currentTimeMillis / 1000;
+ int nanosTsMicros = (int) (currentTimeMicros - currentTimeSeconds *
1000_000) * 1000;
+ int nanosTsMillis = (int) (currentTimeMillis - currentTimeSeconds *
1000) * 1000_000;
+
+ GenericRecord record =
+ new GenericRecordBuilder(TEST_SCHEMA_2)
+ .set("tsMillis", currentTimeMillis)
+ .set("tsMicros", currentTimeMicros)
+ .set("tsMicros_union", currentTimeMicros)
+ .set("timeMillis", timeMillis8am)
+ .set("date", currentDateDays)
+ .build();
+
+ Variant variant = converter.convert(record);
+
+ assertTrue(variant.isObject());
+
+ // Timestamps should be converted to LocalDateTime
+ assertEquals(variant.getField("tsMillis").getType(),
Variant.Type.TIMESTAMP);
+ assertEquals(variant.getField("tsMicros").getType(),
Variant.Type.TIMESTAMP);
+ assertEquals(variant.getField("tsMicros_union").getType(),
Variant.Type.TIMESTAMP);
+ assertEquals(
+ variant.getField("tsMillis").getDateTime(),
+ LocalDateTime.ofEpochSecond(currentTimeSeconds, nanosTsMillis,
ZoneOffset.UTC));
+ assertEquals(
+ variant.getField("tsMicros").getDateTime(),
+ LocalDateTime.ofEpochSecond(currentTimeSeconds, nanosTsMicros,
ZoneOffset.UTC));
+ assertEquals(
+ variant.getField("tsMicros_union").getDateTime(),
+ LocalDateTime.ofEpochSecond(currentTimeSeconds, nanosTsMicros,
ZoneOffset.UTC));
+
+ // Time-millis should be converted to long (micros)
+ assertEquals(timeMillis8am * 1000L,
variant.getField("timeMillis").getLong());
+
+ // Date should be converted to LocalDate
+ assertEquals(variant.getField("date").getType(), Variant.Type.DATE);
+ LocalDate expectedDate = LocalDate.ofEpochDay(currentDateDays);
+ assertEquals(expectedDate, variant.getField("date").getDate());
+ }
+
+ @Test
+ public void testComplexRecordWithUnion() {
+ var converter =
AvroToVariantDataConverters.createVariantConverter(COMPLEX_UNION_SCHEMA);
+
+ GenericData.Record inner =
+ new GenericRecordBuilder(INNER_SCHEMA).set("id",
10).set("key", "test_key").build();
+ GenericData.Record record =
+ new GenericRecordBuilder(COMPLEX_UNION_SCHEMA)
+ .set("id", 1)
+ .set("value", "main_value")
+ .set("nullableCol", "not_null")
+ .set("inner", inner)
+ .build();
+
+ Variant variant = converter.convert(record);
+
+ assertTrue(variant.isObject());
+ assertEquals(1, variant.getField("id").getInt());
+ assertEquals("main_value", variant.getField("value").getString());
+ assertEquals("not_null", variant.getField("nullableCol").getString());
+
+ var innerVariant = variant.getField("inner");
+ assertTrue(innerVariant.isObject());
+ assertEquals(10, innerVariant.getField("id").getInt());
+ assertEquals("test_key", innerVariant.getField("key").getString());
+ }
+
+ @Test
+ public void testComplexRecordWithNullValues() {
+ var converter =
AvroToVariantDataConverters.createVariantConverter(COMPLEX_UNION_SCHEMA);
+
+ GenericData.Record record =
+ new GenericRecordBuilder(COMPLEX_UNION_SCHEMA)
+ .set("id", 1)
+ .set("value", "main_value")
+ .set("nullableCol", null)
+ .set("inner", null)
+ .build();
+
+ Variant variant = converter.convert(record);
+
+ assertTrue(variant.isObject());
+ assertEquals(1, variant.getField("id").getInt());
+ assertEquals("main_value", variant.getField("value").getString());
+ assertTrue(variant.getField("nullableCol").isNull());
+ assertTrue(variant.getField("inner").isNull());
+ }
+
+ @Test
+ public void testArrayConversion() {
+ var converter =
AvroToVariantDataConverters.createVariantConverter(ARRAY_SCHEMA);
+
+ List<String> stringList = new ArrayList<>();
+ stringList.add("item1");
+ stringList.add("item2");
+ stringList.add("item3");
+
+ GenericData.Record record =
+ new GenericRecordBuilder(ARRAY_SCHEMA).set("values",
stringList).build();
+
+ Variant variant = converter.convert(record);
+
+ assertTrue(variant.isObject());
+ var arrayVariant = variant.getField("values");
+ assertTrue(arrayVariant.isArray());
+
+ assertEquals("item1", arrayVariant.getElement(0).getString());
+ assertEquals("item2", arrayVariant.getElement(1).getString());
+ assertEquals("item3", arrayVariant.getElement(2).getString());
+ }
+
+ @Test
+ public void testNullRecord() {
+ var converter =
AvroToVariantDataConverters.createVariantConverter(TEST_SCHEMA_1);
+
+ Variant variant = converter.convert(null);
+ assertTrue(variant.isNull());
+ }
+}