This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new d2f24e8 [FLINK-20175] Avro Confluent Registry SQL format does not support adding nullable columns d2f24e8 is described below commit d2f24e8177f05fd41c8804f2bb962a1a7500b095 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Mon Nov 16 20:28:29 2020 +0100 [FLINK-20175] Avro Confluent Registry SQL format does not support adding nullable columns --- .../avro/typeutils/AvroSchemaConverter.java | 75 ++++++----- .../avro/AvroRowDataDeSerializationSchemaTest.java | 6 +- .../avro/typeutils/AvroSchemaConverterTest.java | 139 ++++++++++++++------- 3 files changed, 146 insertions(+), 74 deletions(-) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java index c129ea5..76aa5f2 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java @@ -181,27 +181,35 @@ public class AvroSchemaConverter { public static Schema convertToSchema(LogicalType logicalType, String rowName) { int precision; + boolean isNullable = logicalType.isNullable(); switch (logicalType.getTypeRoot()) { case NULL: return SchemaBuilder.builder().nullType(); case BOOLEAN: - return getNullableBuilder(logicalType).booleanType(); + Schema booleanType = SchemaBuilder.builder().booleanType(); + return isNullable ? nullableSchema(booleanType) : booleanType; case TINYINT: case SMALLINT: case INTEGER: - return getNullableBuilder(logicalType).intType(); + Schema intType = SchemaBuilder.builder().intType(); + return isNullable ? nullableSchema(intType) : intType; case BIGINT: - return getNullableBuilder(logicalType).longType(); + Schema longType = SchemaBuilder.builder().longType(); + return isNullable ? nullableSchema(longType) : longType; case FLOAT: - return getNullableBuilder(logicalType).floatType(); + Schema floatType = SchemaBuilder.builder().floatType(); + return isNullable ? nullableSchema(floatType) : floatType; case DOUBLE: - return getNullableBuilder(logicalType).doubleType(); + Schema doubleType = SchemaBuilder.builder().doubleType(); + return isNullable ? nullableSchema(doubleType) : doubleType; case CHAR: case VARCHAR: - return getNullableBuilder(logicalType).stringType(); + Schema stringType = SchemaBuilder.builder().stringType(); + return isNullable ? nullableSchema(stringType) : stringType; case BINARY: case VARBINARY: - return getNullableBuilder(logicalType).bytesType(); + Schema bytesType = SchemaBuilder.builder().bytesType(); + return isNullable ? nullableSchema(bytesType) : bytesType; case TIMESTAMP_WITHOUT_TIME_ZONE: // use long to represents Timestamp final TimestampType timestampType = (TimestampType) logicalType; @@ -213,10 +221,12 @@ public class AvroSchemaConverter { throw new IllegalArgumentException("Avro does not support TIMESTAMP type " + "with precision: " + precision + ", it only supports precision less than 3."); } - return avroLogicalType.addToSchema(SchemaBuilder.builder().longType()); + Schema timestampeType = avroLogicalType.addToSchema(SchemaBuilder.builder().longType()); + return isNullable ? nullableSchema(timestampeType) : timestampeType; case DATE: // use int to represents Date - return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()); + Schema dateType = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()); + return isNullable ? nullableSchema(dateType) : dateType; case TIME_WITHOUT_TIME_ZONE: precision = ((TimeType) logicalType).getPrecision(); if (precision > 3) { @@ -225,13 +235,17 @@ public class AvroSchemaConverter { ", it only supports precision less than 3."); } // use int to represents Time, we only support millisecond when deserialization - return LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType()); + Schema timeType = LogicalTypes + .timeMillis() + .addToSchema(SchemaBuilder.builder().intType()); + return isNullable ? nullableSchema(timeType) : timeType; case DECIMAL: - DecimalType decimalType = (DecimalType) logicalType; + DecimalType decimalLogicalType = (DecimalType) logicalType; // store BigDecimal as byte[] - return LogicalTypes - .decimal(decimalType.getPrecision(), decimalType.getScale()) + Schema decimalType = LogicalTypes + .decimal(decimalLogicalType.getPrecision(), decimalLogicalType.getScale()) .addToSchema(SchemaBuilder.builder().bytesType()); + return isNullable ? nullableSchema(decimalType) : decimalType; case ROW: RowType rowType = (RowType) logicalType; List<String> fieldNames = rowType.getFieldNames(); @@ -241,27 +255,33 @@ public class AvroSchemaConverter { .record(rowName) .fields(); for (int i = 0; i < rowType.getFieldCount(); i++) { - String fieldName = rowName + "_" + fieldNames.get(i); - builder = builder + String fieldName = fieldNames.get(i); + LogicalType fieldType = rowType.getTypeAt(i); + SchemaBuilder.GenericDefault<Schema> fieldBuilder = builder .name(fieldName) - .type(convertToSchema(rowType.getTypeAt(i), fieldName)) - .noDefault(); + .type(convertToSchema(fieldType, rowName + "_" + fieldName)); + + if (fieldType.isNullable()) { + builder = fieldBuilder.withDefault(null); + } else { + builder = fieldBuilder.noDefault(); + } } return builder.endRecord(); case MULTISET: case MAP: - return SchemaBuilder + Schema mapType = SchemaBuilder .builder() - .nullable() .map() .values(convertToSchema(extractValueTypeToAvroMap(logicalType), rowName)); + return isNullable ? nullableSchema(mapType) : mapType; case ARRAY: - ArrayType arrayType = (ArrayType) logicalType; - return SchemaBuilder + ArrayType arrayLogicalType = (ArrayType) logicalType; + Schema arrayType = SchemaBuilder .builder() - .nullable() .array() - .items(convertToSchema(arrayType.getElementType(), rowName)); + .items(convertToSchema(arrayLogicalType.getElementType(), rowName)); + return isNullable ? nullableSchema(arrayType) : arrayType; case RAW: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: default: @@ -289,11 +309,8 @@ public class AvroSchemaConverter { return valueType; } - private static SchemaBuilder.BaseTypeBuilder<Schema> getNullableBuilder(LogicalType logicalType) { - SchemaBuilder.TypeBuilder<Schema> builder = SchemaBuilder.builder(); - if (logicalType.isNullable()) { - return builder.nullable(); - } - return builder; + /** Returns schema with nullable true. */ + private static Schema nullableSchema(Schema schema) { + return Schema.createUnion(SchemaBuilder.builder().nullType(), schema); } } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java index 514d15b4..0b918de 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java @@ -181,9 +181,9 @@ public class AvroRowDataDeSerializationSchemaTest { byte[] input = byteArrayOutputStream.toByteArray(); DataType dataType = ROW( - FIELD("type_timestamp_millis", TIMESTAMP(3)), - FIELD("type_date", DATE()), - FIELD("type_time_millis", TIME(3))); + FIELD("type_timestamp_millis", TIMESTAMP(3).notNull()), + FIELD("type_date", DATE().notNull()), + FIELD("type_time_millis", TIME(3).notNull())); final RowType rowType = (RowType) dataType.getLogicalType(); final TypeInformation<RowData> typeInfo = new RowDataTypeInfo(rowType); AvroRowDataSerializationSchema serializationSchema = new AvroRowDataSerializationSchema(rowType); diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java index f669a10..9b47509 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java @@ -22,17 +22,27 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.formats.avro.utils.AvroTestUtils; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.io.DecoderFactory; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; /** @@ -55,6 +65,44 @@ public class AvroSchemaConverterTest { } @Test + public void testAddingOptionalField() throws IOException { + Schema oldSchema = SchemaBuilder.record("record") + .fields() + .requiredLong("category_id") + .optionalString("name") + .endRecord(); + + Schema newSchema = AvroSchemaConverter.convertToSchema( + DataTypes.ROW( + DataTypes.FIELD("category_id", DataTypes.BIGINT().notNull()), + DataTypes.FIELD("name", DataTypes.STRING().nullable()), + DataTypes.FIELD("description", DataTypes.STRING().nullable()) + ).getLogicalType() + ); + + byte[] serializedRecord = AvroTestUtils.writeRecord( + new GenericRecordBuilder(oldSchema) + .set("category_id", 1L) + .set("name", "test") + .build(), + oldSchema + ); + GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>( + oldSchema, + newSchema); + GenericRecord newRecord = datumReader.read( + null, + DecoderFactory.get().binaryDecoder(serializedRecord, 0, serializedRecord.length, null)); + assertThat( + newRecord, + equalTo(new GenericRecordBuilder(newSchema) + .set("category_id", 1L) + .set("name", "test") + .set("description", null) + .build())); + } + + @Test public void testInvalidRawTypeAvroSchemaConversion() { RowType rowType = (RowType) TableSchema.builder() .field("a", DataTypes.STRING()) @@ -97,48 +145,55 @@ public class AvroSchemaConverterTest { DataTypes.FIELD("row3", DataTypes.ROW(DataTypes.FIELD("c", DataTypes.STRING()))))) .build().toRowDataType().getLogicalType(); Schema schema = AvroSchemaConverter.convertToSchema(rowType); - assertEquals("{\n" + - " \"type\" : \"record\",\n" + - " \"name\" : \"record\",\n" + - " \"fields\" : [ {\n" + - " \"name\" : \"record_row1\",\n" + - " \"type\" : {\n" + - " \"type\" : \"record\",\n" + - " \"name\" : \"record_row1\",\n" + - " \"fields\" : [ {\n" + - " \"name\" : \"record_row1_a\",\n" + - " \"type\" : [ \"string\", \"null\" ]\n" + - " } ]\n" + - " }\n" + - " }, {\n" + - " \"name\" : \"record_row2\",\n" + - " \"type\" : {\n" + - " \"type\" : \"record\",\n" + - " \"name\" : \"record_row2\",\n" + - " \"fields\" : [ {\n" + - " \"name\" : \"record_row2_b\",\n" + - " \"type\" : [ \"string\", \"null\" ]\n" + - " } ]\n" + - " }\n" + - " }, {\n" + - " \"name\" : \"record_row3\",\n" + - " \"type\" : {\n" + - " \"type\" : \"record\",\n" + - " \"name\" : \"record_row3\",\n" + - " \"fields\" : [ {\n" + - " \"name\" : \"record_row3_row3\",\n" + - " \"type\" : {\n" + - " \"type\" : \"record\",\n" + - " \"name\" : \"record_row3_row3\",\n" + - " \"fields\" : [ {\n" + - " \"name\" : \"record_row3_row3_c\",\n" + - " \"type\" : [ \"string\", \"null\" ]\n" + - " } ]\n" + - " }\n" + - " } ]\n" + - " }\n" + - " } ]\n" + - "}", schema.toString(true)); + assertEquals("{\n" + + " \"type\" : \"record\",\n" + + " \"name\" : \"record\",\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"row1\",\n" + + " \"type\" : {\n" + + " \"type\" : \"record\",\n" + + " \"name\" : \"record_row1\",\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"a\",\n" + + " \"type\" : [ \"null\", \"string\" ],\n" + + " \"default\" : null\n" + + " } ]\n" + + " },\n" + + " \"default\" : null\n" + + " }, {\n" + + " \"name\" : \"row2\",\n" + + " \"type\" : {\n" + + " \"type\" : \"record\",\n" + + " \"name\" : \"record_row2\",\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"b\",\n" + + " \"type\" : [ \"null\", \"string\" ],\n" + + " \"default\" : null\n" + + " } ]\n" + + " },\n" + + " \"default\" : null\n" + + " }, {\n" + + " \"name\" : \"row3\",\n" + + " \"type\" : {\n" + + " \"type\" : \"record\",\n" + + " \"name\" : \"record_row3\",\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"row3\",\n" + + " \"type\" : {\n" + + " \"type\" : \"record\",\n" + + " \"name\" : \"record_row3_row3\",\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"c\",\n" + + " \"type\" : [ \"null\", \"string\" ],\n" + + " \"default\" : null\n" + + " } ]\n" + + " },\n" + + " \"default\" : null\n" + + " } ]\n" + + " },\n" + + " \"default\" : null\n" + + " } ]\n" + + "}", schema.toString(true)); } private void validateUserSchema(TypeInformation<?> actual) {