[FLINK-7420] [avro] Replace GenericData.Array by dummy when reading TypeSerializers
This also adds a new test that verifies that we correctly register Avro Serializers when they are present and modifies an existing test to verify that we correctly register dummy classes. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29249b2e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29249b2e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29249b2e Branch: refs/heads/master Commit: 29249b2eeb9cb9910a5a55ae6c3a0b648d67d2b5 Parents: db7c70f Author: Aljoscha Krettek <[email protected]> Authored: Wed Oct 25 17:38:24 2017 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Nov 3 16:40:34 2017 +0100 ---------------------------------------------------------------------- .../flink-connector-kafka-0.10/pom.xml | 8 ++ .../flink-connector-kafka-0.11/pom.xml | 8 ++ .../flink-connector-kafka-0.8/pom.xml | 8 ++ .../flink-connector-kafka-0.9/pom.xml | 8 ++ .../TypeSerializerSerializationUtil.java | 23 +++- ...ryoRegistrationSerializerConfigSnapshot.java | 2 +- .../kryo/KryoSerializerCompatibilityTest.java | 125 +++++++++++++++++++ .../type-with-avro-serialized-using-kryo | 1 + .../type-without-avro-serialized-using-kryo | Bin 0 -> 31 bytes .../AvroKryoSerializerRegistrationsTest.java | 117 +++++++++++++++++ .../test/resources/flink_11-kryo_registrations | 86 +++++++++++++ flink-libraries/flink-cep/pom.xml | 8 -- ...ckendStateMetaInfoSnapshotReaderWriters.java | 4 +- .../misc/KryoSerializerRegistrationsTest.java | 11 ++ pom.xml | 21 ++-- 15 files changed, 404 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.10/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml index 2b6660d..3357591 100644 --- a/flink-connectors/flink-connector-kafka-0.10/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml @@ -95,6 +95,14 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-avro_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.11/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml b/flink-connectors/flink-connector-kafka-0.11/pom.xml index 162d5d0..4f6be1d 100644 --- a/flink-connectors/flink-connector-kafka-0.11/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml @@ -104,6 +104,14 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-avro_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.8/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml index c990188..b96274a 100644 --- a/flink-connectors/flink-connector-kafka-0.8/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml @@ -83,6 +83,14 @@ under the License. </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.9/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml b/flink-connectors/flink-connector-kafka-0.9/pom.xml index 819d590..c711c5f 100644 --- a/flink-connectors/flink-connector-kafka-0.9/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml @@ -91,6 +91,14 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-avro_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java index 058ef46..d03498a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java @@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils; import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; import org.apache.flink.core.io.VersionedIOReadableWritable; import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; @@ -74,7 +75,9 @@ public class TypeSerializerSerializationUtil { /** * An {@link ObjectInputStream} that ignores serialVersionUID mismatches when deserializing objects of - * anonymous classes or our Scala serializer classes. + * anonymous classes or our Scala serializer classes and also replaces occurences of GenericData.Array + * (from Avro) by a dummy class so that the KryoSerializer can still be deserialized without + * Avro being on the classpath. * * <p>The {@link TypeSerializerSerializationProxy} uses this specific object input stream to read serializers, * so that mismatching serialVersionUIDs of anonymous classes / Scala serializers are ignored. @@ -83,9 +86,9 @@ public class TypeSerializerSerializationUtil { * * @see <a href="https://issues.apache.org/jira/browse/FLINK-6869">FLINK-6869</a> */ - public static class SerialUIDMismatchTolerantInputStream extends InstantiationUtil.ClassLoaderObjectInputStream { + public static class FailureTolerantObjectInputStream extends InstantiationUtil.ClassLoaderObjectInputStream { - public SerialUIDMismatchTolerantInputStream(InputStream in, ClassLoader cl) throws IOException { + public FailureTolerantObjectInputStream(InputStream in, ClassLoader cl) throws IOException { super(in, cl); } @@ -93,6 +96,16 @@ public class TypeSerializerSerializationUtil { protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException { ObjectStreamClass streamClassDescriptor = super.readClassDescriptor(); + try { + Class.forName(streamClassDescriptor.getName(), false, classLoader); + } catch (ClassNotFoundException e) { + if (streamClassDescriptor.getName().equals("org.apache.avro.generic.GenericData$Array")) { + ObjectStreamClass result = ObjectStreamClass.lookup( + KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class); + return result; + } + } + Class localClass = resolveClass(streamClassDescriptor); if (scalaSerializerClassnames.contains(localClass.getName()) || localClass.isAnonymousClass() // isAnonymousClass does not work for anonymous Scala classes; additionally check by classname @@ -433,8 +446,8 @@ public class TypeSerializerSerializationUtil { ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader(); try ( - SerialUIDMismatchTolerantInputStream ois = - new SerialUIDMismatchTolerantInputStream(new ByteArrayInputStream(buffer), userClassLoader)) { + FailureTolerantObjectInputStream ois = + new FailureTolerantObjectInputStream(new ByteArrayInputStream(buffer), userClassLoader)) { Thread.currentThread().setContextClassLoader(userClassLoader); typeSerializer = (TypeSerializer<T>) ois.readObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java index 14287ca..cdf6b23 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java @@ -217,7 +217,7 @@ public abstract class KryoRegistrationSerializerConfigSnapshot<T> extends Generi /** * Placeholder dummy for a previously registered class that can no longer be found in classpath on restore. */ - public static class DummyRegisteredClass {} + public static class DummyRegisteredClass implements Serializable {} /** * Placeholder dummmy for a previously registered Kryo serializer that is no longer valid or in classpath on restore. http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java index 1cacc9e..11c95f1 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java @@ -29,14 +29,20 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.FileInputStream; import java.io.InputStream; +import java.util.List; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; /** @@ -44,6 +50,9 @@ import static org.junit.Assert.assertTrue; */ public class KryoSerializerCompatibilityTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Test public void testMigrationStrategyForRemovedAvroDependency() throws Exception { KryoSerializer<TestClass> kryoSerializerForA = new KryoSerializer<>(TestClass.class, new ExecutionConfig()); @@ -85,6 +94,122 @@ public class KryoSerializerCompatibilityTest { assertTrue(compatResult.isRequiresMigration()); } + @Test + public void testMigrationOfTypeWithAvroType() throws Exception { + + /* + When Avro sees the schema "{"type" : "array", "items" : "boolean"}" it will create a field + of type List<Integer> but the actual type will be GenericData.Array<Integer>. The + KryoSerializer registers a special Serializer for this type that simply deserializes + as ArrayList because Kryo cannot handle GenericData.Array well. Before Flink 1.4 Avro + was always in the classpath but after 1.4 it's only present if the flink-avro jar is + included. This test verifies that we can still deserialize data written pre-1.4. + */ + class FakeAvroClass { + public List<Integer> array; + + FakeAvroClass(List<Integer> array) { + this.array = array; + } + } + + /* + // This has to be executed on a pre-1.4 branch to generate the binary blob + { + ExecutionConfig executionConfig = new ExecutionConfig(); + KryoSerializer<FakeAvroClass> kryoSerializer = + new KryoSerializer<>(FakeAvroClass.class, executionConfig); + + try ( + FileOutputStream f = new FileOutputStream( + "src/test/resources/type-with-avro-serialized-using-kryo"); + DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(f)) { + + + GenericData.Array<Integer> array = + new GenericData.Array<>(10, Schema.createArray(Schema.create(Schema.Type.INT))); + + array.add(10); + array.add(20); + array.add(30); + + FakeAvroClass myTestClass = new FakeAvroClass(array); + + kryoSerializer.serialize(myTestClass, outputView); + } + } + */ + + { + ExecutionConfig executionConfig = new ExecutionConfig(); + KryoSerializer<FakeAvroClass> kryoSerializer = + new KryoSerializer<>(FakeAvroClass.class, executionConfig); + + try ( + FileInputStream f = new FileInputStream("src/test/resources/type-with-avro-serialized-using-kryo"); + DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(f)) { + + thrown.expectMessage("Could not find required Avro dependency"); + FakeAvroClass myTestClass = kryoSerializer.deserialize(inputView); + } + } + } + + @Test + public void testMigrationWithTypeDevoidOfAvroTypes() throws Exception { + + class FakeClass { + public List<Integer> array; + + FakeClass(List<Integer> array) { + this.array = array; + } + } + + /* + // This has to be executed on a pre-1.4 branch to generate the binary blob + { + ExecutionConfig executionConfig = new ExecutionConfig(); + KryoSerializer<FakeClass> kryoSerializer = + new KryoSerializer<>(FakeClass.class, executionConfig); + + try ( + FileOutputStream f = new FileOutputStream( + "src/test/resources/type-without-avro-serialized-using-kryo"); + DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(f)) { + + + List<Integer> array = new ArrayList<>(10); + + array.add(10); + array.add(20); + array.add(30); + + FakeClass myTestClass = new FakeClass(array); + + kryoSerializer.serialize(myTestClass, outputView); + } + } + */ + + { + ExecutionConfig executionConfig = new ExecutionConfig(); + KryoSerializer<FakeClass> kryoSerializer = + new KryoSerializer<>(FakeClass.class, executionConfig); + + try ( + FileInputStream f = new FileInputStream("src/test/resources/type-without-avro-serialized-using-kryo"); + DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(f)) { + + FakeClass myTestClass = kryoSerializer.deserialize(inputView); + + assertThat(myTestClass.array.get(0), is(10)); + assertThat(myTestClass.array.get(1), is(20)); + assertThat(myTestClass.array.get(2), is(30)); + } + } + } + /** * Tests that after reconfiguration, registration ids are reconfigured to * remain the same as the preceding KryoSerializer. http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/test/resources/type-with-avro-serialized-using-kryo ---------------------------------------------------------------------- diff --git a/flink-core/src/test/resources/type-with-avro-serialized-using-kryo b/flink-core/src/test/resources/type-with-avro-serialized-using-kryo new file mode 100644 index 0000000..3901024 --- /dev/null +++ b/flink-core/src/test/resources/type-with-avro-serialized-using-kryo @@ -0,0 +1 @@ + (< \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/test/resources/type-without-avro-serialized-using-kryo ---------------------------------------------------------------------- diff --git a/flink-core/src/test/resources/type-without-avro-serialized-using-kryo b/flink-core/src/test/resources/type-without-avro-serialized-using-kryo new file mode 100644 index 0000000..d95094c Binary files /dev/null and b/flink-core/src/test/resources/type-without-avro-serialized-using-kryo differ http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java new file mode 100644 index 0000000..060cfdd --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Registration; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests that the set of Kryo registrations is the same across compatible + * Flink versions. + * + * <p>Special version of {@code KryoSerializerRegistrationsTest} that sits in the Avro module + * and verifies that we correctly register Avro types at the {@link KryoSerializer} when + * Avro is present. + */ +public class AvroKryoSerializerRegistrationsTest { + + /** + * Tests that the registered classes in Kryo did not change. + * + * <p>Once we have proper serializer versioning this test will become obsolete. + * But currently a change in the serializers can break savepoint backwards + * compatibility between Flink versions. + */ + @Test + public void testDefaultKryoRegisteredClassesDidNotChange() throws Exception { + final Kryo kryo = new KryoSerializer<>(Integer.class, new ExecutionConfig()).getKryo(); + + try (BufferedReader reader = new BufferedReader(new InputStreamReader( + getClass().getClassLoader().getResourceAsStream("flink_11-kryo_registrations")))) { + + String line; + while ((line = reader.readLine()) != null) { + String[] split = line.split(","); + final int tag = Integer.parseInt(split[0]); + final String registeredClass = split[1]; + + Registration registration = kryo.getRegistration(tag); + + if (registration == null) { + fail(String.format("Registration for %d = %s got lost", tag, registeredClass)); + } + else if (!registeredClass.equals(registration.getType().getName())) { + fail(String.format("Registration for %d = %s changed to %s", + tag, registeredClass, registration.getType().getName())); + } + } + } + } + + /** + * Creates a Kryo serializer and writes the default registrations out to a + * comma separated file with one entry per line: + * + * <pre> + * id,class + * </pre> + * + * <p>The produced file is used to check that the registered IDs don't change + * in future Flink versions. + * + * <p>This method is not used in the tests, but documents how the test file + * has been created and can be used to re-create it if needed. + * + * @param filePath File path to write registrations to + */ + private void writeDefaultKryoRegistrations(String filePath) throws IOException { + final File file = new File(filePath); + if (file.exists()) { + assertTrue(file.delete()); + } + + final Kryo kryo = new KryoSerializer<>(Integer.class, new ExecutionConfig()).getKryo(); + final int nextId = kryo.getNextRegistrationId(); + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { + for (int i = 0; i < nextId; i++) { + Registration registration = kryo.getRegistration(i); + String str = registration.getId() + "," + registration.getType().getName(); + writer.write(str, 0, str.length()); + writer.newLine(); + } + + System.out.println("Created file with registrations at " + file.getAbsolutePath()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations b/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations new file mode 100644 index 0000000..7000e62 --- /dev/null +++ b/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations @@ -0,0 +1,86 @@ +0,int +1,java.lang.String +2,float +3,boolean +4,byte +5,char +6,short +7,long +8,double +9,void +10,scala.collection.convert.Wrappers$SeqWrapper +11,scala.collection.convert.Wrappers$IteratorWrapper +12,scala.collection.convert.Wrappers$MapWrapper +13,scala.collection.convert.Wrappers$JListWrapper +14,scala.collection.convert.Wrappers$JMapWrapper +15,scala.Some +16,scala.util.Left +17,scala.util.Right +18,scala.collection.immutable.Vector +19,scala.collection.immutable.Set$Set1 +20,scala.collection.immutable.Set$Set2 +21,scala.collection.immutable.Set$Set3 +22,scala.collection.immutable.Set$Set4 +23,scala.collection.immutable.HashSet$HashTrieSet +24,scala.collection.immutable.Map$Map1 +25,scala.collection.immutable.Map$Map2 +26,scala.collection.immutable.Map$Map3 +27,scala.collection.immutable.Map$Map4 +28,scala.collection.immutable.HashMap$HashTrieMap +29,scala.collection.immutable.Range$Inclusive +30,scala.collection.immutable.NumericRange$Inclusive +31,scala.collection.immutable.NumericRange$Exclusive +32,scala.collection.mutable.BitSet +33,scala.collection.mutable.HashMap +34,scala.collection.mutable.HashSet +35,scala.collection.convert.Wrappers$IterableWrapper +36,scala.Tuple1 +37,scala.Tuple2 +38,scala.Tuple3 +39,scala.Tuple4 +40,scala.Tuple5 +41,scala.Tuple6 +42,scala.Tuple7 +43,scala.Tuple8 +44,scala.Tuple9 +45,scala.Tuple10 +46,scala.Tuple11 +47,scala.Tuple12 +48,scala.Tuple13 +49,scala.Tuple14 +50,scala.Tuple15 +51,scala.Tuple16 +52,scala.Tuple17 +53,scala.Tuple18 +54,scala.Tuple19 +55,scala.Tuple20 +56,scala.Tuple21 +57,scala.Tuple22 +58,scala.Tuple1$mcJ$sp +59,scala.Tuple1$mcI$sp +60,scala.Tuple1$mcD$sp +61,scala.Tuple2$mcJJ$sp +62,scala.Tuple2$mcJI$sp +63,scala.Tuple2$mcJD$sp +64,scala.Tuple2$mcIJ$sp +65,scala.Tuple2$mcII$sp +66,scala.Tuple2$mcID$sp +67,scala.Tuple2$mcDJ$sp +68,scala.Tuple2$mcDI$sp +69,scala.Tuple2$mcDD$sp +70,scala.Symbol +71,scala.reflect.ClassTag +72,scala.runtime.BoxedUnit +73,java.util.Arrays$ArrayList +74,java.util.BitSet +75,java.util.PriorityQueue +76,java.util.regex.Pattern +77,java.sql.Date +78,java.sql.Time +79,java.sql.Timestamp +80,java.net.URI +81,java.net.InetSocketAddress +82,java.util.UUID +83,java.util.Locale +84,java.text.SimpleDateFormat +85,org.apache.avro.generic.GenericData$Array http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-libraries/flink-cep/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml index bd57d17..a561cca 100644 --- a/flink-libraries/flink-cep/pom.xml +++ b/flink-libraries/flink-cep/pom.xml @@ -88,14 +88,6 @@ under the License. <version>${project.version}</version> <scope>test</scope> </dependency> - - <!-- we include Avro to make the CEPMigrationTest work, it uses a Kryo-serialized savepoint (see FLINK-7420) --> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-avro_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java index dc322c3..c333397 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java @@ -168,8 +168,8 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { DataInputViewStream dis = new DataInputViewStream(in); ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader(); try ( - TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream ois = - new TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream(dis, userCodeClassLoader)) { + TypeSerializerSerializationUtil.FailureTolerantObjectInputStream ois = + new TypeSerializerSerializationUtil.FailureTolerantObjectInputStream(dis, userCodeClassLoader)) { Thread.currentThread().setContextClassLoader(userCodeClassLoader); TypeSerializer<S> stateSerializer = (TypeSerializer<S>) ois.readObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java index 77d2a1a..cbe9394 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java @@ -33,6 +33,8 @@ import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -72,6 +74,15 @@ public class KryoSerializerRegistrationsTest { if (registration == null) { fail(String.format("Registration for %d = %s got lost", tag, registeredClass)); } + else if (registeredClass.equals("org.apache.avro.generic.GenericData$Array")) { + // starting with Flink 1.4 Avro is no longer a dependency of core. Avro is + // only available if flink-avro is present. There is a special version of + // this test in AvroKryoSerializerRegistrationsTest that verifies correct + // registration of Avro types if present + assertThat( + registration.getType().getName(), + is("org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass")); + } else if (!registeredClass.equals(registration.getType().getName())) { fail(String.format("Registration for %d = %s changed to %s", tag, registeredClass, registration.getType().getName())); http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 773dc34..b93251b 100644 --- a/pom.xml +++ b/pom.xml @@ -215,11 +215,11 @@ under the License. </dependencies> <!-- this section defines the module versions that are used if nothing else is specified. --> - + <dependencyManagement> - <!-- WARN: - DO NOT put guava, - protobuf, + <!-- WARN: + DO NOT put guava, + protobuf, asm, netty here. It will overwrite Hadoop's guava dependency (even though we handle it @@ -367,7 +367,7 @@ under the License. <artifactId>joda-convert</artifactId> <version>1.7</version> </dependency> - + <!-- kryo used in different versions by Flink an chill --> <dependency> <groupId>com.esotericsoftware.kryo</groupId> @@ -579,7 +579,7 @@ under the License. <outputDir>${project.build.directory}/spotbugs</outputDir> <!-- A list of available stylesheets can be found here: https://github.com/findbugsproject/findbugs/tree/master/findbugs/src/xsl --> <stylesheet>plain.xsl</stylesheet> - + <fileMappers> <fileMapper implementation="org.codehaus.plexus.components.io.filemappers.FileExtensionMapper"> @@ -772,7 +772,7 @@ under the License. </plugins> </build> </profile> - + <profile> <!--japicmp 0.7 does not support deactivation from the command line, so we have to use a workaround with profiles instead. @@ -842,7 +842,7 @@ under the License. </dependency> </dependencies> </profile> - + <profile> <id>release</id> <properties> @@ -1027,6 +1027,7 @@ under the License. <!-- Test Data. --> <exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude> + <exclude>flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations</exclude> <exclude>flink-runtime/src/test/resources/flink_11-kryo_registrations</exclude> <exclude>flink-core/src/test/resources/kryo-serializer-config-snapshot-v1</exclude> <exclude>flink-formats/flink-avro/src/test/resources/avro/*.avsc</exclude> @@ -1287,7 +1288,7 @@ under the License. </plugin> </plugins> - <!-- Plugin configurations for plugins activated in sub-projects --> + <!-- Plugin configurations for plugins activated in sub-projects --> <pluginManagement> <plugins> @@ -1310,7 +1311,7 @@ under the License. <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> </plugin> - + <!-- Disable certain plugins in Eclipse --> <plugin> <groupId>org.eclipse.m2e</groupId>
