This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new d2f24e8  [FLINK-20175] Avro Confluent Registry SQL format does not 
support adding nullable columns
d2f24e8 is described below

commit d2f24e8177f05fd41c8804f2bb962a1a7500b095
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Mon Nov 16 20:28:29 2020 +0100

    [FLINK-20175] Avro Confluent Registry SQL format does not support adding 
nullable columns
---
 .../avro/typeutils/AvroSchemaConverter.java        |  75 ++++++-----
 .../avro/AvroRowDataDeSerializationSchemaTest.java |   6 +-
 .../avro/typeutils/AvroSchemaConverterTest.java    | 139 ++++++++++++++-------
 3 files changed, 146 insertions(+), 74 deletions(-)

diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index c129ea5..76aa5f2 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -181,27 +181,35 @@ public class AvroSchemaConverter {
 
        public static Schema convertToSchema(LogicalType logicalType, String 
rowName) {
                int precision;
+               boolean isNullable = logicalType.isNullable();
                switch (logicalType.getTypeRoot()) {
                        case NULL:
                                return SchemaBuilder.builder().nullType();
                        case BOOLEAN:
-                               return 
getNullableBuilder(logicalType).booleanType();
+                               Schema booleanType = 
SchemaBuilder.builder().booleanType();
+                               return isNullable ? nullableSchema(booleanType) 
: booleanType;
                        case TINYINT:
                        case SMALLINT:
                        case INTEGER:
-                               return 
getNullableBuilder(logicalType).intType();
+                               Schema intType = 
SchemaBuilder.builder().intType();
+                               return isNullable ? nullableSchema(intType) : 
intType;
                        case BIGINT:
-                               return 
getNullableBuilder(logicalType).longType();
+                               Schema longType = 
SchemaBuilder.builder().longType();
+                               return isNullable ? nullableSchema(longType) : 
longType;
                        case FLOAT:
-                               return 
getNullableBuilder(logicalType).floatType();
+                               Schema floatType = 
SchemaBuilder.builder().floatType();
+                               return isNullable ? nullableSchema(floatType) : 
floatType;
                        case DOUBLE:
-                               return 
getNullableBuilder(logicalType).doubleType();
+                               Schema doubleType = 
SchemaBuilder.builder().doubleType();
+                               return isNullable ? nullableSchema(doubleType) 
: doubleType;
                        case CHAR:
                        case VARCHAR:
-                               return 
getNullableBuilder(logicalType).stringType();
+                               Schema stringType = 
SchemaBuilder.builder().stringType();
+                               return isNullable ? nullableSchema(stringType) 
: stringType;
                        case BINARY:
                        case VARBINARY:
-                               return 
getNullableBuilder(logicalType).bytesType();
+                               Schema bytesType = 
SchemaBuilder.builder().bytesType();
+                               return isNullable ? nullableSchema(bytesType) : 
bytesType;
                        case TIMESTAMP_WITHOUT_TIME_ZONE:
                                // use long to represents Timestamp
                                final TimestampType timestampType = 
(TimestampType) logicalType;
@@ -213,10 +221,12 @@ public class AvroSchemaConverter {
                                        throw new 
IllegalArgumentException("Avro does not support TIMESTAMP type " +
                                                "with precision: " + precision 
+ ", it only supports precision less than 3.");
                                }
-                               return 
avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+                               Schema timestampeType = 
avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+                               return isNullable ? 
nullableSchema(timestampeType) : timestampeType;
                        case DATE:
                                // use int to represents Date
-                               return 
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+                               Schema dateType = 
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+                               return isNullable ? nullableSchema(dateType) : 
dateType;
                        case TIME_WITHOUT_TIME_ZONE:
                                precision = ((TimeType) 
logicalType).getPrecision();
                                if (precision > 3) {
@@ -225,13 +235,17 @@ public class AvroSchemaConverter {
                                                ", it only supports precision 
less than 3.");
                                }
                                // use int to represents Time, we only support 
millisecond when deserialization
-                               return 
LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+                               Schema timeType = LogicalTypes
+                                       .timeMillis()
+                                       
.addToSchema(SchemaBuilder.builder().intType());
+                               return isNullable ? nullableSchema(timeType) : 
timeType;
                        case DECIMAL:
-                               DecimalType decimalType = (DecimalType) 
logicalType;
+                               DecimalType decimalLogicalType = (DecimalType) 
logicalType;
                                // store BigDecimal as byte[]
-                               return LogicalTypes
-                                       .decimal(decimalType.getPrecision(), 
decimalType.getScale())
+                               Schema decimalType = LogicalTypes
+                                       
.decimal(decimalLogicalType.getPrecision(), decimalLogicalType.getScale())
                                        
.addToSchema(SchemaBuilder.builder().bytesType());
+                               return isNullable ? nullableSchema(decimalType) 
: decimalType;
                        case ROW:
                                RowType rowType = (RowType) logicalType;
                                List<String> fieldNames = 
rowType.getFieldNames();
@@ -241,27 +255,33 @@ public class AvroSchemaConverter {
                                        .record(rowName)
                                        .fields();
                                for (int i = 0; i < rowType.getFieldCount(); 
i++) {
-                                       String fieldName = rowName + "_" + 
fieldNames.get(i);
-                                       builder = builder
+                                       String fieldName = fieldNames.get(i);
+                                       LogicalType fieldType = 
rowType.getTypeAt(i);
+                                       SchemaBuilder.GenericDefault<Schema> 
fieldBuilder = builder
                                                .name(fieldName)
-                                               
.type(convertToSchema(rowType.getTypeAt(i), fieldName))
-                                               .noDefault();
+                                               
.type(convertToSchema(fieldType, rowName + "_" + fieldName));
+
+                                       if (fieldType.isNullable()) {
+                                               builder = 
fieldBuilder.withDefault(null);
+                                       } else {
+                                               builder = 
fieldBuilder.noDefault();
+                                       }
                                }
                                return builder.endRecord();
                        case MULTISET:
                        case MAP:
-                               return SchemaBuilder
+                               Schema mapType = SchemaBuilder
                                        .builder()
-                                       .nullable()
                                        .map()
                                        
.values(convertToSchema(extractValueTypeToAvroMap(logicalType), rowName));
+                               return isNullable ? nullableSchema(mapType) : 
mapType;
                        case ARRAY:
-                               ArrayType arrayType = (ArrayType) logicalType;
-                               return SchemaBuilder
+                               ArrayType arrayLogicalType = (ArrayType) 
logicalType;
+                               Schema arrayType = SchemaBuilder
                                        .builder()
-                                       .nullable()
                                        .array()
-                                       
.items(convertToSchema(arrayType.getElementType(), rowName));
+                                       
.items(convertToSchema(arrayLogicalType.getElementType(), rowName));
+                               return isNullable ? nullableSchema(arrayType) : 
arrayType;
                        case RAW:
                        case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                        default:
@@ -289,11 +309,8 @@ public class AvroSchemaConverter {
                return valueType;
        }
 
-       private static SchemaBuilder.BaseTypeBuilder<Schema> 
getNullableBuilder(LogicalType logicalType) {
-               SchemaBuilder.TypeBuilder<Schema> builder = 
SchemaBuilder.builder();
-               if (logicalType.isNullable()) {
-                       return builder.nullable();
-               }
-               return builder;
+       /** Returns schema with nullable true. */
+       private static Schema nullableSchema(Schema schema) {
+               return Schema.createUnion(SchemaBuilder.builder().nullType(), 
schema);
        }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
index 514d15b4..0b918de 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
@@ -181,9 +181,9 @@ public class AvroRowDataDeSerializationSchemaTest {
                byte[] input = byteArrayOutputStream.toByteArray();
 
                DataType dataType = ROW(
-                               FIELD("type_timestamp_millis", TIMESTAMP(3)),
-                               FIELD("type_date", DATE()),
-                               FIELD("type_time_millis", TIME(3)));
+                               FIELD("type_timestamp_millis", 
TIMESTAMP(3).notNull()),
+                               FIELD("type_date", DATE().notNull()),
+                               FIELD("type_time_millis", TIME(3).notNull()));
                final RowType rowType = (RowType) dataType.getLogicalType();
                final TypeInformation<RowData> typeInfo = new 
RowDataTypeInfo(rowType);
                AvroRowDataSerializationSchema serializationSchema = new 
AvroRowDataSerializationSchema(rowType);
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index f669a10..9b47509 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -22,17 +22,27 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.utils.AvroTestUtils;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
 
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.io.DecoderFactory;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.io.IOException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -55,6 +65,44 @@ public class AvroSchemaConverterTest {
        }
 
        @Test
+       public void testAddingOptionalField() throws IOException {
+               Schema oldSchema = SchemaBuilder.record("record")
+                       .fields()
+                       .requiredLong("category_id")
+                       .optionalString("name")
+                       .endRecord();
+
+               Schema newSchema = AvroSchemaConverter.convertToSchema(
+                       DataTypes.ROW(
+                               DataTypes.FIELD("category_id", 
DataTypes.BIGINT().notNull()),
+                               DataTypes.FIELD("name", 
DataTypes.STRING().nullable()),
+                               DataTypes.FIELD("description", 
DataTypes.STRING().nullable())
+                       ).getLogicalType()
+               );
+
+               byte[] serializedRecord = AvroTestUtils.writeRecord(
+                       new GenericRecordBuilder(oldSchema)
+                               .set("category_id", 1L)
+                               .set("name", "test")
+                               .build(),
+                       oldSchema
+               );
+               GenericDatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>(
+                       oldSchema,
+                       newSchema);
+               GenericRecord newRecord = datumReader.read(
+                       null,
+                       DecoderFactory.get().binaryDecoder(serializedRecord, 0, 
serializedRecord.length, null));
+               assertThat(
+                       newRecord,
+                       equalTo(new GenericRecordBuilder(newSchema)
+                               .set("category_id", 1L)
+                               .set("name", "test")
+                               .set("description", null)
+                               .build()));
+       }
+
+       @Test
        public void testInvalidRawTypeAvroSchemaConversion() {
                RowType rowType = (RowType) TableSchema.builder()
                        .field("a", DataTypes.STRING())
@@ -97,48 +145,55 @@ public class AvroSchemaConverterTest {
                                DataTypes.FIELD("row3", 
DataTypes.ROW(DataTypes.FIELD("c", DataTypes.STRING())))))
                        .build().toRowDataType().getLogicalType();
                Schema schema = AvroSchemaConverter.convertToSchema(rowType);
-               assertEquals("{\n" +
-                       "  \"type\" : \"record\",\n" +
-                       "  \"name\" : \"record\",\n" +
-                       "  \"fields\" : [ {\n" +
-                       "    \"name\" : \"record_row1\",\n" +
-                       "    \"type\" : {\n" +
-                       "      \"type\" : \"record\",\n" +
-                       "      \"name\" : \"record_row1\",\n" +
-                       "      \"fields\" : [ {\n" +
-                       "        \"name\" : \"record_row1_a\",\n" +
-                       "        \"type\" : [ \"string\", \"null\" ]\n" +
-                       "      } ]\n" +
-                       "    }\n" +
-                       "  }, {\n" +
-                       "    \"name\" : \"record_row2\",\n" +
-                       "    \"type\" : {\n" +
-                       "      \"type\" : \"record\",\n" +
-                       "      \"name\" : \"record_row2\",\n" +
-                       "      \"fields\" : [ {\n" +
-                       "        \"name\" : \"record_row2_b\",\n" +
-                       "        \"type\" : [ \"string\", \"null\" ]\n" +
-                       "      } ]\n" +
-                       "    }\n" +
-                       "  }, {\n" +
-                       "    \"name\" : \"record_row3\",\n" +
-                       "    \"type\" : {\n" +
-                       "      \"type\" : \"record\",\n" +
-                       "      \"name\" : \"record_row3\",\n" +
-                       "      \"fields\" : [ {\n" +
-                       "        \"name\" : \"record_row3_row3\",\n" +
-                       "        \"type\" : {\n" +
-                       "          \"type\" : \"record\",\n" +
-                       "          \"name\" : \"record_row3_row3\",\n" +
-                       "          \"fields\" : [ {\n" +
-                       "            \"name\" : \"record_row3_row3_c\",\n" +
-                       "            \"type\" : [ \"string\", \"null\" ]\n" +
-                       "          } ]\n" +
-                       "        }\n" +
-                       "      } ]\n" +
-                       "    }\n" +
-                       "  } ]\n" +
-                       "}", schema.toString(true));
+               assertEquals("{\n"
+                       + "  \"type\" : \"record\",\n"
+                       + "  \"name\" : \"record\",\n"
+                       + "  \"fields\" : [ {\n"
+                       + "    \"name\" : \"row1\",\n"
+                       + "    \"type\" : {\n"
+                       + "      \"type\" : \"record\",\n"
+                       + "      \"name\" : \"record_row1\",\n"
+                       + "      \"fields\" : [ {\n"
+                       + "        \"name\" : \"a\",\n"
+                       + "        \"type\" : [ \"null\", \"string\" ],\n"
+                       + "        \"default\" : null\n"
+                       + "      } ]\n"
+                       + "    },\n"
+                       + "    \"default\" : null\n"
+                       + "  }, {\n"
+                       + "    \"name\" : \"row2\",\n"
+                       + "    \"type\" : {\n"
+                       + "      \"type\" : \"record\",\n"
+                       + "      \"name\" : \"record_row2\",\n"
+                       + "      \"fields\" : [ {\n"
+                       + "        \"name\" : \"b\",\n"
+                       + "        \"type\" : [ \"null\", \"string\" ],\n"
+                       + "        \"default\" : null\n"
+                       + "      } ]\n"
+                       + "    },\n"
+                       + "    \"default\" : null\n"
+                       + "  }, {\n"
+                       + "    \"name\" : \"row3\",\n"
+                       + "    \"type\" : {\n"
+                       + "      \"type\" : \"record\",\n"
+                       + "      \"name\" : \"record_row3\",\n"
+                       + "      \"fields\" : [ {\n"
+                       + "        \"name\" : \"row3\",\n"
+                       + "        \"type\" : {\n"
+                       + "          \"type\" : \"record\",\n"
+                       + "          \"name\" : \"record_row3_row3\",\n"
+                       + "          \"fields\" : [ {\n"
+                       + "            \"name\" : \"c\",\n"
+                       + "            \"type\" : [ \"null\", \"string\" ],\n"
+                       + "            \"default\" : null\n"
+                       + "          } ]\n"
+                       + "        },\n"
+                       + "        \"default\" : null\n"
+                       + "      } ]\n"
+                       + "    },\n"
+                       + "    \"default\" : null\n"
+                       + "  } ]\n"
+                       + "}", schema.toString(true));
        }
 
        private void validateUserSchema(TypeInformation<?> actual) {

Reply via email to