This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9a709146769b47727b8887807b882db50ffd8c0f Author: Aljoscha Krettek <[email protected]> AuthorDate: Tue Jan 14 13:25:22 2020 +0100 [FLINK-13632] Port AvroSerializer upgrade test to TypeSerializerUpgradeTestBase --- .../typeutils/AvroSerializerMigrationTest.java | 259 --------------------- .../avro/typeutils/AvroSerializerUpgradeTest.java | 185 +++++++++++++++ ...6-avro-generic-type-serializer-address-snapshot | Bin 901 -> 0 bytes .../resources/flink-1.6-avro-type-serialized-data | Bin 23563 -> 0 bytes .../flink-1.6-avro-type-serializer-address-data | Bin 240 -> 0 bytes ...flink-1.6-avro-type-serializer-address-snapshot | Bin 710 -> 0 bytes .../flink-1.6-avro-type-serializer-snapshot | Bin 36411 -> 0 bytes ...7-avro-generic-type-serializer-address-snapshot | Bin 370 -> 0 bytes .../flink-1.7-avro-type-serializer-address-data | Bin 240 -> 0 bytes ...flink-1.7-avro-type-serializer-address-snapshot | Bin 380 -> 0 bytes .../src/test/resources/flink_11-kryo_registrations | 86 ------- 11 files changed, 185 insertions(+), 345 deletions(-) diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java deleted file mode 100644 index a162aa3..0000000 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.formats.avro.typeutils; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; -import org.apache.flink.core.memory.DataInputDeserializer; -import org.apache.flink.formats.avro.generated.Address; -import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; -import org.apache.flink.testutils.migration.MigrationVersion; - -import org.apache.avro.generic.GenericRecord; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.Arrays; -import java.util.Base64; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - -import static java.util.Arrays.asList; -import static junit.framework.TestCase.assertSame; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; - -/** - * Tests migrations for {@link AvroSerializerSnapshot}. - */ -@RunWith(Parameterized.class) -public class AvroSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Address> { - - private static final String DATA_FILE_FORMAT = "flink-%s-avro-type-serializer-address-data"; - private static final String SPECIFIC_SNAPSHOT_FILE_FORMAT = "flink-%s-avro-type-serializer-address-snapshot"; - private static final String GENERIC_SNAPSHOT_FILE_FORMAT = "flink-%s-avro-generic-type-serializer-address-snapshot"; - - public AvroSerializerMigrationTest(TestSpecification<Address> testSpec) { - super(testSpec); - } - - @SuppressWarnings("unchecked") - @Parameterized.Parameters(name = "Test Specification = {0}") - public static Collection<TestSpecification<?>> testSpecifications() { - - final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); - - testSpecifications.add( - "generic-avro-serializer", - AvroSerializer.class, - AvroSerializerSnapshot.class, - () -> new AvroSerializer(GenericRecord.class, Address.getClassSchema()), - testVersion -> String.format(GENERIC_SNAPSHOT_FILE_FORMAT, testVersion), - testVersion -> String.format(DATA_FILE_FORMAT, testVersion), - 10); - testSpecifications.add( - "specific-avro-serializer", - AvroSerializer.class, - AvroSerializerSnapshot.class, - () -> new AvroSerializer<>(Address.class), - testVersion -> String.format(SPECIFIC_SNAPSHOT_FILE_FORMAT, testVersion), - testVersion -> String.format(DATA_FILE_FORMAT, testVersion), - 10); - - return testSpecifications.get(); - } - - // --------------------------------------------------------------------------------------------------------------- - // The following batch of tests are making sure that AvroSerializer class is able to be Java-Deserialized. - // see [FLINK-11436] for more information. - - // Once we drop support for versions that carried snapshots with Java-Deserialized serializers we can drop this - // batch of tests. - // --------------------------------------------------------------------------------------------------------------- - - @Test - public void javaDeserializeFromFlink_1_5_ReflectiveRecord() throws IOException { - final String avroSerializerBase64 = "AAAAAQAAAQis7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n" + - "U2VyaWFsaXplcgAAAAAAAAABAgABTAAEdHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFj\n" + - "aGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwdnIA\n" + - "Tm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJvU2VyaWFsaXplck1pZ3Jh\n" + - "dGlvblRlc3QkU2ltcGxlUG9qbwAAAAAAAAAAAAAAeHA="; - - TypeSerializer<?> serializer = javaDeserialize(avroSerializerBase64); - assertThat(serializer, instanceOf(AvroSerializer.class)); - - AvroSerializer<?> avroSerializer = (AvroSerializer<?>) javaDeserialize(avroSerializerBase64); - assertSame(avroSerializer.getType(), SimplePojo.class); - assertThat(avroSerializer.getAvroSchema(), notNullValue()); - } - - @Test - public void javaDeserializeFromFlink_1_5_SpecificRecord() throws IOException { - final String avroSerializerBase64 = "AAAAAQAAASOs7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n" + - "U2VyaWFsaXplcgAAAAAAAAABAgABTAAEdHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFj\n" + - "aGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwdnIA\n" + - "L29yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLmdlbmVyYXRlZC5BZGRyZXNz7Paj+KjgQ2oMAAB4\n" + - "cgArb3JnLmFwYWNoZS5hdnJvLnNwZWNpZmljLlNwZWNpZmljUmVjb3JkQmFzZQKi+azGtzQdDAAAeHA="; - - TypeSerializer<?> serializer = javaDeserialize(avroSerializerBase64); - assertThat(serializer, instanceOf(AvroSerializer.class)); - - AvroSerializer<?> avroSerializer = (AvroSerializer<?>) javaDeserialize(avroSerializerBase64); - assertSame(avroSerializer.getType(), Address.class); - assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$)); - } - - @Test - public void javaDeserializeFromFlink_1_6() throws IOException { - final String avroSerializer = "AAAAAQAAAUis7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n" + - "U2VyaWFsaXplcgAAAAAAAAABAgACTAAMc2NoZW1hU3RyaW5ndAASTGphdmEvbGFuZy9TdHJpbmc7TAAE\n" + - "dHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBl\n" + - "dXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwcHZyAC9vcmcuYXBhY2hlLmZsaW5rLmZvcm1h\n" + - "dHMuYXZyby5nZW5lcmF0ZWQuQWRkcmVzc+z2o/io4ENqDAAAeHIAK29yZy5hcGFjaGUuYXZyby5zcGVj\n" + - "aWZpYy5TcGVjaWZpY1JlY29yZEJhc2UCovmsxrc0HQwAAHhw"; - - TypeSerializer<?> avro = javaDeserialize(avroSerializer); - - assertThat(avro, instanceOf(AvroSerializer.class)); - } - - @Test - public void javaDeserializeFromFlink_1_6_GenericRecord() throws IOException { - String avroSerializerBase64 = "AAAAAQAAAges7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n" + - "U2VyaWFsaXplcgAAAAAAAAABAgACTAAMc2NoZW1hU3RyaW5ndAASTGphdmEvbGFuZy9TdHJpbmc7TAAE\n" + - "dHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBl\n" + - "dXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwdAEBeyJ0eXBlIjoicmVjb3JkIiwibmFtZSI6\n" + - "IkFkZHJlc3MiLCJuYW1lc3BhY2UiOiJvcmcuYXBhY2hlLmZsaW5rLmZvcm1hdHMuYXZyby5nZW5lcmF0\n" + - "ZWQiLCJmaWVsZHMiOlt7Im5hbWUiOiJudW0iLCJ0eXBlIjoiaW50In0seyJuYW1lIjoic3RyZWV0Iiwi\n" + - "dHlwZSI6InN0cmluZyJ9LHsibmFtZSI6ImNpdHkiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIjoic3Rh\n" + - "dGUiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIjoiemlwIiwidHlwZSI6InN0cmluZyJ9XX12cgAlb3Jn\n" + - "LmFwYWNoZS5hdnJvLmdlbmVyaWMuR2VuZXJpY1JlY29yZAAAAAAAAAAAAAAAeHA="; - - TypeSerializer<?> serializer = javaDeserialize(avroSerializerBase64); - - AvroSerializer<?> avroSerializer = (AvroSerializer<?>) serializer; - assertSame(avroSerializer.getType(), GenericRecord.class); - assertThat(avroSerializer.getAvroSchema(), notNullValue()); - } - - @Test - public void javaDeserializeFromFlink_1_7() throws IOException { - String avroSerializerBase64 = "AAAAAQAAAeKs7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n" + - "U2VyaWFsaXplcgAAAAAAAAACAgADTAAOcHJldmlvdXNTY2hlbWF0AEBMb3JnL2FwYWNoZS9mbGluay9m\n" + - "b3JtYXRzL2F2cm8vdHlwZXV0aWxzL1NlcmlhbGl6YWJsZUF2cm9TY2hlbWE7TAAGc2NoZW1hcQB+AAFM\n" + - "AAR0eXBldAARTGphdmEvbGFuZy9DbGFzczt4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5\n" + - "cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA+b3JnLmFwYWNoZS5mbGluay5mb3Jt\n" + - "YXRzLmF2cm8udHlwZXV0aWxzLlNlcmlhbGl6YWJsZUF2cm9TY2hlbWEAAAAAAAAAAQMAAHhwdwEAeHNx\n" + - "AH4ABXcBAHh2cgAvb3JnLmFwYWNoZS5mbGluay5mb3JtYXRzLmF2cm8uZ2VuZXJhdGVkLkFkZHJlc3Ps\n" + - "9qP4qOBDagwAAHhyACtvcmcuYXBhY2hlLmF2cm8uc3BlY2lmaWMuU3BlY2lmaWNSZWNvcmRCYXNlAqL5\n" + - "rMa3NB0MAAB4cA=="; - - AvroSerializer<?> avroSerializer = (AvroSerializer<?>) javaDeserialize(avroSerializerBase64); - assertSame(avroSerializer.getType(), Address.class); - assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$)); - } - - @Test - public void javaDeserializeFromFlink_1_7_afterInitialization() throws IOException { - String avroSerializerBase64 = "AAAAAQAAAeKs7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n" + - "U2VyaWFsaXplcgAAAAAAAAACAgADTAAOcHJldmlvdXNTY2hlbWF0AEBMb3JnL2FwYWNoZS9mbGluay9m\n" + - "b3JtYXRzL2F2cm8vdHlwZXV0aWxzL1NlcmlhbGl6YWJsZUF2cm9TY2hlbWE7TAAGc2NoZW1hcQB+AAFM\n" + - "AAR0eXBldAARTGphdmEvbGFuZy9DbGFzczt4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5\n" + - "cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA+b3JnLmFwYWNoZS5mbGluay5mb3Jt\n" + - "YXRzLmF2cm8udHlwZXV0aWxzLlNlcmlhbGl6YWJsZUF2cm9TY2hlbWEAAAAAAAAAAQMAAHhwdwEAeHNx\n" + - "AH4ABXcBAHh2cgAvb3JnLmFwYWNoZS5mbGluay5mb3JtYXRzLmF2cm8uZ2VuZXJhdGVkLkFkZHJlc3Ps\n" + - "9qP4qOBDagwAAHhyACtvcmcuYXBhY2hlLmF2cm8uc3BlY2lmaWMuU3BlY2lmaWNSZWNvcmRCYXNlAqL5\n" + - "rMa3NB0MAAB4cA=="; - - AvroSerializer<?> avroSerializer = (AvroSerializer<?>) javaDeserialize(avroSerializerBase64); - assertSame(avroSerializer.getType(), Address.class); - assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$)); - } - - @Test - public void compositeSerializerFromFlink_1_6_WithNestedAvroSerializer() throws IOException { - String streamElementSerializerBase64 = "AAAAAQAAAq2s7QAFc3IAR29yZy5hcGFjaGUuZmxpbmsuc3RyZWFtaW5nLnJ1bnRpbWUuc3RyZWFtcmVj\n" + - "b3JkLlN0cmVhbUVsZW1lbnRTZXJpYWxpemVyAAAAAAAAAAECAAFMAA50eXBlU2VyaWFsaXplcnQANkxv\n" + - "cmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hyADRvcmcu\n" + - "YXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLlR5cGVTZXJpYWxpemVyAAAAAAAAAAECAAB4\n" + - "cHNyADZvcmcuYXBhY2hlLmZsaW5rLmZvcm1hdHMuYXZyby50eXBldXRpbHMuQXZyb1NlcmlhbGl6ZXIA\n" + - "AAAAAAAAAQIAAkwADHNjaGVtYVN0cmluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO0wABHR5cGV0ABFMamF2\n" + - "YS9sYW5nL0NsYXNzO3hxAH4AAnQBAXsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJBZGRyZXNzIiwibmFt\n" + - "ZXNwYWNlIjoib3JnLmFwYWNoZS5mbGluay5mb3JtYXRzLmF2cm8uZ2VuZXJhdGVkIiwiZmllbGRzIjpb\n" + - "eyJuYW1lIjoibnVtIiwidHlwZSI6ImludCJ9LHsibmFtZSI6InN0cmVldCIsInR5cGUiOiJzdHJpbmci\n" + - "fSx7Im5hbWUiOiJjaXR5IiwidHlwZSI6InN0cmluZyJ9LHsibmFtZSI6InN0YXRlIiwidHlwZSI6InN0\n" + - "cmluZyJ9LHsibmFtZSI6InppcCIsInR5cGUiOiJzdHJpbmcifV19dnIAJW9yZy5hcGFjaGUuYXZyby5n\n" + - "ZW5lcmljLkdlbmVyaWNSZWNvcmQAAAAAAAAAAAAAAHhw"; - - StreamElementSerializer<?> ser = (StreamElementSerializer<?>) javaDeserialize(streamElementSerializerBase64); - TypeSerializer<?> containedTypeSerializer = ser.getContainedTypeSerializer(); - - assertThat(containedTypeSerializer, instanceOf(AvroSerializer.class)); - - AvroSerializer<?> avroSerializer = (AvroSerializer<?>) containedTypeSerializer; - assertSame(avroSerializer.getType(), GenericRecord.class); - assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$)); - } - - @Test - public void makeSureThatFieldsWereNotChanged() { - // This test should be removed once we completely migrate all the composite serializers. - - List<String> serializedFieldNames = Arrays.stream(AvroSerializer.class.getDeclaredFields()) - .filter(field -> !Modifier.isTransient(field.getModifiers())) - .filter(field -> !Modifier.isStatic(field.getModifiers())) - .map(Field::getName) - .sorted() - .collect(Collectors.toList()); - - assertThat(serializedFieldNames, is(asList("previousSchema", "schema", "type"))); - } - - @SuppressWarnings("deprecation") - private static TypeSerializer<?> javaDeserialize(String base64) throws IOException { - byte[] bytes = Base64.getMimeDecoder().decode(base64); - DataInputDeserializer in = new DataInputDeserializer(bytes); - return TypeSerializerSerializationUtil.tryReadSerializer(in, Thread.currentThread().getContextClassLoader()); - } - - /** - * A simple pojo used in these tests. - */ - public static class SimplePojo { - private String foo; - - @SuppressWarnings("unused") - public String getFoo() { - return foo; - } - - @SuppressWarnings("unused") - public void setFoo(String foo) { - this.foo = foo; - } - } -} diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerUpgradeTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerUpgradeTest.java new file mode 100644 index 0000000..f647c92 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerUpgradeTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerMatchers; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase; +import org.apache.flink.formats.avro.generated.Address; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.hamcrest.Matcher; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +import static org.hamcrest.Matchers.is; + +/** + * Tests based on {@link TypeSerializerUpgradeTestBase} for the {@link AvroSerializer}. + */ +@RunWith(Parameterized.class) +public class AvroSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Object, Object> { + + public AvroSerializerUpgradeTest(TestSpecification<Object, Object> testSpecification) { + super(testSpecification); + } + + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection<TestSpecification<?, ?>> testSpecifications() throws Exception { + ArrayList<TestSpecification<?, ?>> testSpecifications = new ArrayList<>(); + for (MigrationVersion migrationVersion : migrationVersions) { + testSpecifications.add( + new TestSpecification<>( + "generic-avro-serializer", + migrationVersion, + GenericAvroSerializerSetup.class, + GenericAvroSerializerVerifier.class)); + + testSpecifications.add( + new TestSpecification<>( + "specific-avro-serializer", + migrationVersion, + SpecificAvroSerializerSetup.class, + SpecificAvroSerializerVerifier.class)); + } + + return testSpecifications; + } + + // ---------------------------------------------------------------------------------------------- + // Specification for "generic-avro-serializer" + // ---------------------------------------------------------------------------------------------- + + /** + * This class is only public to work with {@link org.apache.flink.api.common.typeutils.ClassRelocator}. + */ + public static final class GenericAvroSerializerSetup implements TypeSerializerUpgradeTestBase.PreUpgradeSetup<GenericRecord> { + + @Override + public TypeSerializer<GenericRecord> createPriorSerializer() { + return new AvroSerializer<>( + GenericRecord.class, + Address.getClassSchema()); + } + + @Override + public GenericRecord createTestData() { + GenericData.Record record = new GenericData.Record(Address.getClassSchema()); + record.put("num", 239); + record.put("street", "Baker Street"); + record.put("city", "London"); + record.put("state", "London"); + record.put("zip", "NW1 6XE"); + return record; + } + } + + /** + * This class is only public to work with {@link org.apache.flink.api.common.typeutils.ClassRelocator}. + */ + public static final class GenericAvroSerializerVerifier implements TypeSerializerUpgradeTestBase.UpgradeVerifier<GenericRecord> { + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public TypeSerializer<GenericRecord> createUpgradedSerializer() { + return new AvroSerializer( + GenericRecord.class, + Address.getClassSchema()); + } + + @Override + public Matcher<GenericRecord> testDataMatcher() { + GenericData.Record record = new GenericData.Record(Address.getClassSchema()); + record.put("num", 239); + record.put("street", "Baker Street"); + record.put("city", "London"); + record.put("state", "London"); + record.put("zip", "NW1 6XE"); + return is(record); + } + + @Override + public Matcher<TypeSerializerSchemaCompatibility<GenericRecord>> schemaCompatibilityMatcher(MigrationVersion version) { + return TypeSerializerMatchers.isCompatibleAsIs(); + } + } + + // ---------------------------------------------------------------------------------------------- + // Specification for "specific-avro-serializer" + // ---------------------------------------------------------------------------------------------- + + /** + * This class is only public to work with {@link org.apache.flink.api.common.typeutils.ClassRelocator}. + */ + public static final class SpecificAvroSerializerSetup implements TypeSerializerUpgradeTestBase.PreUpgradeSetup<Address> { + + @Override + public TypeSerializer<Address> createPriorSerializer() { + @SuppressWarnings({"unchecked", "rawtypes"}) + AvroSerializer<Address> avroSerializer = new AvroSerializer(Address.class); + return avroSerializer; + } + + @Override + public Address createTestData() { + Address addr = new Address(); + addr.setNum(239); + addr.setStreet("Baker Street"); + addr.setCity("London"); + addr.setState("London"); + addr.setZip("NW1 6XE"); + return addr; + } + } + + /** + * This class is only public to work with {@link org.apache.flink.api.common.typeutils.ClassRelocator}. + */ + public static final class SpecificAvroSerializerVerifier implements TypeSerializerUpgradeTestBase.UpgradeVerifier<Address> { + + @Override + public TypeSerializer<Address> createUpgradedSerializer() { + @SuppressWarnings({"unchecked", "rawtypes"}) + AvroSerializer<Address> avroSerializer = new AvroSerializer(Address.class); + return avroSerializer; + } + + @Override + public Matcher<Address> testDataMatcher() { + Address addr = new Address(); + addr.setNum(239); + addr.setStreet("Baker Street"); + addr.setCity("London"); + addr.setState("London"); + addr.setZip("NW1 6XE"); + return is(addr); + } + + @Override + public Matcher<TypeSerializerSchemaCompatibility<Address>> schemaCompatibilityMatcher(MigrationVersion version) { + return TypeSerializerMatchers.isCompatibleAsIs(); + } + } +} diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-generic-type-serializer-address-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-generic-type-serializer-address-snapshot deleted file mode 100644 index 57cbc42..0000000 Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-generic-type-serializer-address-snapshot and /dev/null differ diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data deleted file mode 100644 index 23853cf..0000000 Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data and /dev/null differ diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-address-data b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-address-data deleted file mode 100644 index 74acf72..0000000 Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-address-data and /dev/null differ diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-address-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-address-snapshot deleted file mode 100644 index d68be81..0000000 Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-address-snapshot and /dev/null differ diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot deleted file mode 100644 index 1474300..0000000 Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot and /dev/null differ diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot deleted file mode 100644 index f27d2dc..0000000 Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot and /dev/null differ diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-data b/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-data deleted file mode 100644 index 74acf72..0000000 Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-data and /dev/null differ diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot deleted file mode 100644 index 7c8f6c2..0000000 Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot and /dev/null differ diff --git a/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations b/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations deleted file mode 100644 index 7000e62..0000000 --- a/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations +++ /dev/null @@ -1,86 +0,0 @@ -0,int -1,java.lang.String -2,float -3,boolean -4,byte -5,char -6,short -7,long -8,double -9,void -10,scala.collection.convert.Wrappers$SeqWrapper -11,scala.collection.convert.Wrappers$IteratorWrapper -12,scala.collection.convert.Wrappers$MapWrapper -13,scala.collection.convert.Wrappers$JListWrapper -14,scala.collection.convert.Wrappers$JMapWrapper -15,scala.Some -16,scala.util.Left -17,scala.util.Right -18,scala.collection.immutable.Vector -19,scala.collection.immutable.Set$Set1 -20,scala.collection.immutable.Set$Set2 -21,scala.collection.immutable.Set$Set3 -22,scala.collection.immutable.Set$Set4 -23,scala.collection.immutable.HashSet$HashTrieSet -24,scala.collection.immutable.Map$Map1 -25,scala.collection.immutable.Map$Map2 -26,scala.collection.immutable.Map$Map3 -27,scala.collection.immutable.Map$Map4 -28,scala.collection.immutable.HashMap$HashTrieMap -29,scala.collection.immutable.Range$Inclusive -30,scala.collection.immutable.NumericRange$Inclusive -31,scala.collection.immutable.NumericRange$Exclusive -32,scala.collection.mutable.BitSet -33,scala.collection.mutable.HashMap -34,scala.collection.mutable.HashSet -35,scala.collection.convert.Wrappers$IterableWrapper -36,scala.Tuple1 -37,scala.Tuple2 -38,scala.Tuple3 -39,scala.Tuple4 -40,scala.Tuple5 -41,scala.Tuple6 -42,scala.Tuple7 -43,scala.Tuple8 -44,scala.Tuple9 -45,scala.Tuple10 -46,scala.Tuple11 -47,scala.Tuple12 -48,scala.Tuple13 -49,scala.Tuple14 -50,scala.Tuple15 -51,scala.Tuple16 -52,scala.Tuple17 -53,scala.Tuple18 -54,scala.Tuple19 -55,scala.Tuple20 -56,scala.Tuple21 -57,scala.Tuple22 -58,scala.Tuple1$mcJ$sp -59,scala.Tuple1$mcI$sp -60,scala.Tuple1$mcD$sp -61,scala.Tuple2$mcJJ$sp -62,scala.Tuple2$mcJI$sp -63,scala.Tuple2$mcJD$sp -64,scala.Tuple2$mcIJ$sp -65,scala.Tuple2$mcII$sp -66,scala.Tuple2$mcID$sp -67,scala.Tuple2$mcDJ$sp -68,scala.Tuple2$mcDI$sp -69,scala.Tuple2$mcDD$sp -70,scala.Symbol -71,scala.reflect.ClassTag -72,scala.runtime.BoxedUnit -73,java.util.Arrays$ArrayList -74,java.util.BitSet -75,java.util.PriorityQueue -76,java.util.regex.Pattern -77,java.sql.Date -78,java.sql.Time -79,java.sql.Timestamp -80,java.net.URI -81,java.net.InetSocketAddress -82,java.util.UUID -83,java.util.Locale -84,java.text.SimpleDateFormat -85,org.apache.avro.generic.GenericData$Array
