AVRO-1931: Reader is now compatible if able to read all branches of union Closes #199
Signed-off-by: Sriharsha Chintalapani <[email protected]> Signed-off-by: Nandor Kollar <[email protected]> Signed-off-by: Sean Busbey <[email protected]> (cherry picked from commit 15651fc95e058d1b3cc165a70c367d1dc2bad8b7) (cherry picked from commit 83cdd2bd70a4f5f16a52177fc46aa6dec412548c) Project: http://git-wip-us.apache.org/repos/asf/avro/repo Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/522b59b0 Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/522b59b0 Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/522b59b0 Branch: refs/heads/branch-1.7 Commit: 522b59b04f3cf1300908bb91b30b89af1089621e Parents: 492ab98 Author: Anders Sundelin <[email protected]> Authored: Thu Mar 2 11:08:23 2017 +0100 Committer: Sean Busbey <[email protected]> Committed: Fri Aug 18 09:56:53 2017 -0500 ---------------------------------------------------------------------- .../org/apache/avro/SchemaCompatibility.java | 17 +- .../TestReadingWritingDataInEvolvedSchemas.java | 284 +++++++++++++++++++ .../apache/avro/TestSchemaCompatibility.java | 34 +++ 3 files changed, 329 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/avro/blob/522b59b0/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java index 9ac6dc8..c713c32 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java +++ b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java @@ -350,10 +350,15 @@ public class SchemaCompatibility { } else { // Reader and writer have different schema types: - // Handle the corner case where writer is a union of a singleton branch: { X } === X - if ((writer.getType() == Schema.Type.UNION) - && writer.getTypes().size() == 1) { - return getCompatibility(reader, writer.getTypes().get(0)); + // Reader compatible with all branches of a writer union is compatible + if (writer.getType() == Schema.Type.UNION) { + for (Schema s : writer.getTypes()) { + SchemaCompatibilityType compatibility = getCompatibility(reader, s); + if (compatibility == SchemaCompatibilityType.INCOMPATIBLE) { + return SchemaCompatibilityType.INCOMPATIBLE; + } + } + return SchemaCompatibilityType.COMPATIBLE; } switch (reader.getType()) { @@ -380,12 +385,12 @@ public class SchemaCompatibility { : SchemaCompatibilityType.INCOMPATIBLE; } case BYTES: { - return (writer.getType() == Type.STRING) + return (writer.getType() == Type.STRING) ? SchemaCompatibilityType.COMPATIBLE : SchemaCompatibilityType.INCOMPATIBLE; } case STRING: { - return (writer.getType() == Type.BYTES) + return (writer.getType() == Type.BYTES) ? SchemaCompatibilityType.COMPATIBLE : SchemaCompatibilityType.INCOMPATIBLE; } http://git-wip-us.apache.org/repos/asf/avro/blob/522b59b0/lang/java/avro/src/test/java/org/apache/avro/TestReadingWritingDataInEvolvedSchemas.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestReadingWritingDataInEvolvedSchemas.java b/lang/java/avro/src/test/java/org/apache/avro/TestReadingWritingDataInEvolvedSchemas.java new file mode 100644 index 0000000..0012876 --- /dev/null +++ b/lang/java/avro/src/test/java/org/apache/avro/TestReadingWritingDataInEvolvedSchemas.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.EnumSymbol; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class TestReadingWritingDataInEvolvedSchemas { + + private static final String RECORD_A = "RecordA"; + private static final String FIELD_A = "fieldA"; + private static final char LATIN_SMALL_LETTER_O_WITH_DIARESIS = '\u00F6'; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private static final Schema DOUBLE_RECORD = SchemaBuilder.record(RECORD_A) // + .fields() // + .name(FIELD_A).type().doubleType().noDefault() // + .endRecord(); + private static final Schema FLOAT_RECORD = SchemaBuilder.record(RECORD_A) // + .fields() // + .name(FIELD_A).type().floatType().noDefault() // + .endRecord(); + private static final Schema LONG_RECORD = SchemaBuilder.record(RECORD_A) // + .fields() // + .name(FIELD_A).type().longType().noDefault() // + .endRecord(); + private static final Schema INT_RECORD = SchemaBuilder.record(RECORD_A) // + .fields() // + .name(FIELD_A).type().intType().noDefault() // + .endRecord(); + private static final Schema UNION_INT_LONG_FLOAT_DOUBLE_RECORD = SchemaBuilder.record(RECORD_A) // + .fields() // + .name(FIELD_A).type().unionOf().doubleType().and().floatType().and().longType().and().intType().endUnion() + .noDefault() // + .endRecord(); + private static final Schema STRING_RECORD = SchemaBuilder.record(RECORD_A) // + .fields() // + .name(FIELD_A).type().stringType().noDefault() // + .endRecord(); + private static final Schema BYTES_RECORD = SchemaBuilder.record(RECORD_A) // + .fields() // + .name(FIELD_A).type().bytesType().noDefault() // + .endRecord(); + private static final Schema UNION_STRING_BYTES_RECORD = SchemaBuilder.record(RECORD_A) // + .fields() // + .name(FIELD_A).type().unionOf().stringType().and().bytesType().endUnion() + .noDefault() // + .endRecord(); + private static final Schema ENUM_AB_RECORD = SchemaBuilder.record(RECORD_A) // + .fields() // + .name(FIELD_A).type().enumeration("Enum1").symbols("A", "B").noDefault() // + .endRecord(); + private static final Schema ENUM_ABC_RECORD = SchemaBuilder.record(RECORD_A) // + .fields() // + .name(FIELD_A).type().enumeration("Enum1").symbols("A", "B", "C").noDefault() // + .endRecord(); + + @Test + public void doubleWrittenWithUnionSchemaIsConvertedToDoubleSchema() throws Exception { + Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD; + Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0); + byte[] encoded = encodeGenericBlob(record); + Record decoded = decodeGenericBlob(DOUBLE_RECORD, writer, encoded); + assertEquals(42.0, decoded.get(FIELD_A)); + } + + @Test + public void doubleWrittenWithUnionSchemaIsNotConvertedToFloatSchema() throws Exception { + expectedException.expect(AvroTypeException.class); + expectedException.expectMessage("Found double, expecting float"); + Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD; + Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0); + byte[] encoded = encodeGenericBlob(record); + decodeGenericBlob(FLOAT_RECORD, writer, encoded); + } + + @Test + public void floatWrittenWithUnionSchemaIsNotConvertedToLongSchema() throws Exception { + expectedException.expect(AvroTypeException.class); + expectedException.expectMessage("Found float, expecting long"); + Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD; + Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0f); + byte[] encoded = encodeGenericBlob(record); + decodeGenericBlob(LONG_RECORD, writer, encoded); + } + + @Test + public void longWrittenWithUnionSchemaIsNotConvertedToIntSchema() throws Exception { + expectedException.expect(AvroTypeException.class); + expectedException.expectMessage("Found long, expecting int"); + Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD; + Record record = defaultRecordWithSchema(writer, FIELD_A, 42L); + byte[] encoded = encodeGenericBlob(record); + decodeGenericBlob(INT_RECORD, writer, encoded); + } + + @Test + public void intWrittenWithUnionSchemaIsConvertedToAllNumberSchemas() throws Exception { + Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD; + Record record = defaultRecordWithSchema(writer, FIELD_A, 42); + byte[] encoded = encodeGenericBlob(record); + assertEquals(42.0, decodeGenericBlob(DOUBLE_RECORD, writer, encoded).get(FIELD_A)); + assertEquals(42.0f, decodeGenericBlob(FLOAT_RECORD, writer, encoded).get(FIELD_A)); + assertEquals(42L, decodeGenericBlob(LONG_RECORD, writer, encoded).get(FIELD_A)); + assertEquals(42, decodeGenericBlob(INT_RECORD, writer, encoded).get(FIELD_A)); + } + + @Test + public void asciiStringWrittenWithUnionSchemaIsConvertedToBytesSchema() throws Exception { + Schema writer = UNION_STRING_BYTES_RECORD; + Record record = defaultRecordWithSchema(writer, FIELD_A, "42"); + byte[] encoded = encodeGenericBlob(record); + ByteBuffer actual = (ByteBuffer) decodeGenericBlob(BYTES_RECORD, writer, encoded).get(FIELD_A); + assertArrayEquals("42".getBytes("UTF-8"), actual.array()); + } + + @Test + public void utf8StringWrittenWithUnionSchemaIsConvertedToBytesSchema() throws Exception { + String goeran = String.format("G%sran", LATIN_SMALL_LETTER_O_WITH_DIARESIS); + Schema writer = UNION_STRING_BYTES_RECORD; + Record record = defaultRecordWithSchema(writer, FIELD_A, goeran); + byte[] encoded = encodeGenericBlob(record); + ByteBuffer actual = (ByteBuffer) decodeGenericBlob(BYTES_RECORD, writer, encoded).get(FIELD_A); + assertArrayEquals(goeran.getBytes("UTF-8"), actual.array()); + } + + @Test + public void asciiBytesWrittenWithUnionSchemaIsConvertedToStringSchema() throws Exception { + Schema writer = UNION_STRING_BYTES_RECORD; + ByteBuffer buf = ByteBuffer.wrap("42".getBytes("UTF-8")); + Record record = defaultRecordWithSchema(writer, FIELD_A, buf); + byte[] encoded = encodeGenericBlob(record); + CharSequence read = (CharSequence) decodeGenericBlob(STRING_RECORD, writer, encoded).get(FIELD_A); + assertEquals("42", read.toString()); + } + + @Test + public void utf8BytesWrittenWithUnionSchemaIsConvertedToStringSchema() throws Exception { + String goeran = String.format("G%sran", LATIN_SMALL_LETTER_O_WITH_DIARESIS); + Schema writer = UNION_STRING_BYTES_RECORD; + Record record = defaultRecordWithSchema(writer, FIELD_A, goeran); + byte[] encoded = encodeGenericBlob(record); + CharSequence read = (CharSequence) decodeGenericBlob(STRING_RECORD, writer, encoded).get(FIELD_A); + assertEquals(goeran, read.toString()); + } + + @Test + public void enumRecordCanBeReadWithExtendedEnumSchema() throws Exception { + Schema writer = ENUM_AB_RECORD; + Record record = defaultRecordWithSchema(writer, FIELD_A, new EnumSymbol(writer, "A")); + byte[] encoded = encodeGenericBlob(record); + Record decoded = decodeGenericBlob(ENUM_ABC_RECORD, writer, encoded); + assertEquals("A", decoded.get(FIELD_A).toString()); + } + + @Test + public void enumRecordWithExtendedSchemaCanBeReadWithOriginalEnumSchemaIfOnlyOldValues() throws Exception { + Schema writer = ENUM_ABC_RECORD; + Record record = defaultRecordWithSchema(writer, FIELD_A, new EnumSymbol(writer, "A")); + byte[] encoded = encodeGenericBlob(record); + Record decoded = decodeGenericBlob(ENUM_AB_RECORD, writer, encoded); + assertEquals("A", decoded.get(FIELD_A).toString()); + } + + @Test + public void enumRecordWithExtendedSchemaCanNotBeReadIfNewValuesAreUsed() throws Exception { + expectedException.expect(AvroTypeException.class); + expectedException.expectMessage("No match for C"); + Schema writer = ENUM_ABC_RECORD; + Record record = defaultRecordWithSchema(writer, FIELD_A, new EnumSymbol(writer, "C")); + byte[] encoded = encodeGenericBlob(record); + decodeGenericBlob(ENUM_AB_RECORD, writer, encoded); + } + + @Test + public void recordWrittenWithExtendedSchemaCanBeReadWithOriginalSchemaButLossOfData() throws Exception { + Schema writer = SchemaBuilder.record(RECORD_A) // + .fields() // + .name("newTopField").type().stringType().noDefault() // + .name(FIELD_A).type().intType().noDefault() // + .endRecord(); + Record record = defaultRecordWithSchema(writer, FIELD_A, 42); + record.put("newTopField", "not decoded"); + byte[] encoded = encodeGenericBlob(record); + Record decoded = decodeGenericBlob(INT_RECORD, writer, encoded); + assertEquals(42, decoded.get(FIELD_A)); + assertNull(decoded.get("newTopField")); + } + + @Test + public void readerWithoutDefaultValueThrowsException() throws Exception { + expectedException.expect(AvroTypeException.class); + expectedException.expectMessage("missing required field newField"); + Schema reader = SchemaBuilder.record(RECORD_A) // + .fields() // + .name("newField").type().intType().noDefault() // + .name(FIELD_A).type().intType().noDefault() // + .endRecord(); + Record record = defaultRecordWithSchema(INT_RECORD, FIELD_A, 42); + byte[] encoded = encodeGenericBlob(record); + decodeGenericBlob(reader, INT_RECORD, encoded); + } + + @Test + public void readerWithDefaultValueIsApplied() throws Exception { + Schema reader = SchemaBuilder.record(RECORD_A) // + .fields() // + .name("newFieldWithDefault").type().intType().intDefault(314) // + .name(FIELD_A).type().intType().noDefault() // + .endRecord(); + Record record = defaultRecordWithSchema(INT_RECORD, FIELD_A, 42); + byte[] encoded = encodeGenericBlob(record); + Record decoded = decodeGenericBlob(reader, INT_RECORD, encoded); + assertEquals(42, decoded.get(FIELD_A)); + assertEquals(314, decoded.get("newFieldWithDefault")); + } + + private <T> Record defaultRecordWithSchema(Schema schema, String key, T value) { + Record data = new GenericData.Record(schema); + data.put(key, value); + return data; + } + + private static byte[] encodeGenericBlob(GenericRecord data) + throws IOException { + DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(data.getSchema()); + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(outStream, null); + writer.write(data, encoder); + encoder.flush(); + outStream.close(); + return outStream.toByteArray(); + } + + private static Record decodeGenericBlob(Schema expectedSchema, Schema schemaOfBlob, byte[] blob) throws IOException { + if (blob == null) { + return null; + } + GenericDatumReader<Record> reader = new GenericDatumReader<Record>(); + reader.setExpected(expectedSchema); + reader.setSchema(schemaOfBlob); + Decoder decoder = DecoderFactory.get().binaryDecoder(blob, null); + Record data = null; + data = reader.read(null, decoder); + return data; + } +} http://git-wip-us.apache.org/repos/asf/avro/blob/522b59b0/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java index 25a74e1..ebe3a61 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java +++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java @@ -82,12 +82,24 @@ public class TestSchemaCompatibility { Schema.createUnion(list(INT_SCHEMA)); private static final Schema LONG_UNION_SCHEMA = Schema.createUnion(list(LONG_SCHEMA)); + private static final Schema FLOAT_UNION_SCHEMA = + Schema.createUnion(list(FLOAT_SCHEMA)); + private static final Schema DOUBLE_UNION_SCHEMA = + Schema.createUnion(list(DOUBLE_SCHEMA)); private static final Schema STRING_UNION_SCHEMA = Schema.createUnion(list(STRING_SCHEMA)); + private static final Schema BYTES_UNION_SCHEMA = + Schema.createUnion(list(BYTES_SCHEMA)); private static final Schema INT_STRING_UNION_SCHEMA = Schema.createUnion(list(INT_SCHEMA, STRING_SCHEMA)); private static final Schema STRING_INT_UNION_SCHEMA = Schema.createUnion(list(STRING_SCHEMA, INT_SCHEMA)); + private static final Schema INT_FLOAT_UNION_SCHEMA = + Schema.createUnion(list(INT_SCHEMA, FLOAT_SCHEMA)); + private static final Schema INT_LONG_UNION_SCHEMA = + Schema.createUnion(list(INT_SCHEMA, LONG_SCHEMA)); + private static final Schema INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA = + Schema.createUnion(list(INT_SCHEMA, LONG_SCHEMA, FLOAT_SCHEMA, DOUBLE_SCHEMA)); // Non recursive records: private static final Schema EMPTY_RECORD1 = @@ -363,8 +375,27 @@ public class TestSchemaCompatibility { new ReaderWriter(INT_STRING_UNION_SCHEMA, STRING_INT_UNION_SCHEMA), new ReaderWriter(INT_UNION_SCHEMA, EMPTY_UNION_SCHEMA), new ReaderWriter(LONG_UNION_SCHEMA, INT_UNION_SCHEMA), + new ReaderWriter(FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA), + new ReaderWriter(DOUBLE_UNION_SCHEMA, INT_UNION_SCHEMA), + new ReaderWriter(LONG_UNION_SCHEMA, EMPTY_UNION_SCHEMA), + new ReaderWriter(FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA), + new ReaderWriter(DOUBLE_UNION_SCHEMA, LONG_UNION_SCHEMA), + new ReaderWriter(FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA), + new ReaderWriter(DOUBLE_UNION_SCHEMA, FLOAT_UNION_SCHEMA), + new ReaderWriter(STRING_UNION_SCHEMA, EMPTY_UNION_SCHEMA), + new ReaderWriter(STRING_UNION_SCHEMA, BYTES_UNION_SCHEMA), + new ReaderWriter(BYTES_UNION_SCHEMA, EMPTY_UNION_SCHEMA), + new ReaderWriter(BYTES_UNION_SCHEMA, STRING_UNION_SCHEMA), + new ReaderWriter(DOUBLE_UNION_SCHEMA, INT_FLOAT_UNION_SCHEMA), + + // Readers capable of reading all branches of a union are compatible + new ReaderWriter(FLOAT_SCHEMA, INT_FLOAT_UNION_SCHEMA), + new ReaderWriter(LONG_SCHEMA, INT_LONG_UNION_SCHEMA), + new ReaderWriter(DOUBLE_SCHEMA, INT_FLOAT_UNION_SCHEMA), + new ReaderWriter(DOUBLE_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA), // Special case of singleton unions: + new ReaderWriter(FLOAT_SCHEMA, FLOAT_UNION_SCHEMA), new ReaderWriter(INT_UNION_SCHEMA, INT_SCHEMA), new ReaderWriter(INT_SCHEMA, INT_UNION_SCHEMA), @@ -435,6 +466,9 @@ public class TestSchemaCompatibility { // Tests involving unions: new ReaderWriter(INT_UNION_SCHEMA, INT_STRING_UNION_SCHEMA), new ReaderWriter(STRING_UNION_SCHEMA, INT_STRING_UNION_SCHEMA), + new ReaderWriter(FLOAT_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA), + new ReaderWriter(LONG_SCHEMA, INT_FLOAT_UNION_SCHEMA), + new ReaderWriter(INT_SCHEMA, INT_FLOAT_UNION_SCHEMA), new ReaderWriter(EMPTY_RECORD2, EMPTY_RECORD1), new ReaderWriter(A_INT_RECORD1, EMPTY_RECORD1),
