Repository: flink Updated Branches: refs/heads/release-1.5 08e615027 -> 302aaeb02
[FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant This closes #5567. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/302aaeb0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/302aaeb0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/302aaeb0 Branch: refs/heads/release-1.5 Commit: 302aaeb021bacf3f37cb9a3ee236304c94adbf30 Parents: 08e6150 Author: Timo Walther <twal...@apache.org> Authored: Thu Feb 22 17:22:54 2018 +0100 Committer: Timo Walther <twal...@apache.org> Committed: Wed Feb 28 13:30:59 2018 +0100 ---------------------------------------------------------------------- .../runtime/TupleSerializerConfigSnapshot.java | 2 +- .../apache/flink/util/InstantiationUtil.java | 56 +++++++++-- .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes .../TupleSerializerCompatibilityTest.scala | 86 +++++++++++++++++ ...leSerializerCompatibilityTestGenerator.scala | 94 +++++++++++++++++++ 6 files changed, 231 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java index 705099e..eac5200 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java @@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot<T> extends CompositeTypeSeriali super.read(in); try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) { - tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader()); + tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), true); } catch (ClassNotFoundException e) { throw new IOException("Could not find requested tuple class in classpath.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 11e3990..978d270 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -113,7 +113,7 @@ public final class InstantiationUtil { * * <p>This can be removed once 1.2 is no longer supported. */ - private static Set<String> scalaSerializerClassnames = new HashSet<>(); + private static final Set<String> scalaSerializerClassnames = new HashSet<>(); static { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer"); @@ -121,11 +121,54 @@ public final class InstantiationUtil { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer"); } /** + * The serialVersionUID might change between Scala versions and since those classes are + * part of the tuple serializer config snapshots we need to ignore them. + * + * @see <a href="https://issues.apache.org/jira/browse/FLINK-8451">FLINK-8451</a> + */ + private static final Set<String> scalaTypes = new HashSet<>(); + static { + scalaTypes.add("scala.Tuple1"); + scalaTypes.add("scala.Tuple2"); + scalaTypes.add("scala.Tuple3"); + scalaTypes.add("scala.Tuple4"); + scalaTypes.add("scala.Tuple5"); + scalaTypes.add("scala.Tuple6"); + scalaTypes.add("scala.Tuple7"); + scalaTypes.add("scala.Tuple8"); + scalaTypes.add("scala.Tuple9"); + scalaTypes.add("scala.Tuple10"); + scalaTypes.add("scala.Tuple11"); + scalaTypes.add("scala.Tuple12"); + scalaTypes.add("scala.Tuple13"); + scalaTypes.add("scala.Tuple14"); + scalaTypes.add("scala.Tuple15"); + scalaTypes.add("scala.Tuple16"); + scalaTypes.add("scala.Tuple17"); + scalaTypes.add("scala.Tuple18"); + scalaTypes.add("scala.Tuple19"); + scalaTypes.add("scala.Tuple20"); + scalaTypes.add("scala.Tuple21"); + scalaTypes.add("scala.Tuple22"); + scalaTypes.add("scala.Tuple1$mcJ$sp"); + scalaTypes.add("scala.Tuple1$mcI$sp"); + scalaTypes.add("scala.Tuple1$mcD$sp"); + scalaTypes.add("scala.Tuple2$mcJJ$sp"); + scalaTypes.add("scala.Tuple2$mcJI$sp"); + scalaTypes.add("scala.Tuple2$mcJD$sp"); + scalaTypes.add("scala.Tuple2$mcIJ$sp"); + scalaTypes.add("scala.Tuple2$mcII$sp"); + scalaTypes.add("scala.Tuple2$mcID$sp"); + scalaTypes.add("scala.Tuple2$mcDJ$sp"); + scalaTypes.add("scala.Tuple2$mcDI$sp"); + scalaTypes.add("scala.Tuple2$mcDD$sp"); + } + + /** * An {@link ObjectInputStream} that ignores serialVersionUID mismatches when deserializing objects of * anonymous classes or our Scala serializer classes and also replaces occurences of GenericData.Array * (from Avro) by a dummy class so that the KryoSerializer can still be deserialized without @@ -158,12 +201,13 @@ public final class InstantiationUtil { } } - Class localClass = resolveClass(streamClassDescriptor); - if (scalaSerializerClassnames.contains(localClass.getName()) || localClass.isAnonymousClass() + final Class localClass = resolveClass(streamClassDescriptor); + final String name = localClass.getName(); + if (scalaSerializerClassnames.contains(name) || scalaTypes.contains(name) || localClass.isAnonymousClass() // isAnonymousClass does not work for anonymous Scala classes; additionally check by classname - || localClass.getName().contains("$anon$") || localClass.getName().contains("$anonfun")) { + || name.contains("$anon$") || name.contains("$anonfun")) { - ObjectStreamClass localClassDescriptor = ObjectStreamClass.lookup(localClass); + final ObjectStreamClass localClassDescriptor = ObjectStreamClass.lookup(localClass); if (localClassDescriptor != null && localClassDescriptor.getSerialVersionUID() != streamClassDescriptor.getSerialVersionUID()) { LOG.warn("Ignoring serialVersionUID mismatch for anonymous class {}; was {}, now {}.", http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-data ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-data b/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-data new file mode 100644 index 0000000..ddd6ac0 Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-data differ http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-snapshot ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-snapshot b/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-snapshot new file mode 100644 index 0000000..1ebdabb Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala new file mode 100644 index 0000000..7d594fb --- /dev/null +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.runtime + +import java.io.InputStream + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.api.scala.runtime.TupleSerializerCompatibilityTestGenerator._ +import org.apache.flink.api.scala.typeutils.CaseClassSerializer +import org.apache.flink.core.memory.DataInputViewStreamWrapper +import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue} +import org.junit.Test + +/** + * Test for ensuring backwards compatibility of tuples and case classes across Scala versions. + */ +class TupleSerializerCompatibilityTest { + + @Test + def testCompatibilityWithFlink_1_3(): Unit = { + var is: InputStream = null + try { + is = getClass.getClassLoader.getResourceAsStream(SNAPSHOT_RESOURCE) + val snapshotIn = new DataInputViewStreamWrapper(is) + + val deserialized = TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience( + snapshotIn, + getClass.getClassLoader) + + assertEquals(1, deserialized.size) + + val oldSerializer = deserialized.get(0).f0 + val oldConfigSnapshot = deserialized.get(0).f1 + + // test serializer and config snapshot + assertNotNull(oldSerializer) + assertNotNull(oldConfigSnapshot) + assertTrue(oldSerializer.isInstanceOf[CaseClassSerializer[_]]) + assertTrue(oldConfigSnapshot.isInstanceOf[TupleSerializerConfigSnapshot[_]]) + + val currentSerializer = createTypeInformation[TestCaseClass] + .createSerializer(new ExecutionConfig()) + assertFalse(currentSerializer.ensureCompatibility(oldConfigSnapshot).isRequiresMigration) + + // test old data serialization + is.close() + is = getClass.getClassLoader.getResourceAsStream(DATA_RESOURCE) + var dataIn = new DataInputViewStreamWrapper(is) + + assertEquals(TEST_DATA_1, oldSerializer.deserialize(dataIn)) + assertEquals(TEST_DATA_2, oldSerializer.deserialize(dataIn)) + assertEquals(TEST_DATA_3, oldSerializer.deserialize(dataIn)) + + // test new data serialization + is.close() + is = getClass.getClassLoader.getResourceAsStream(DATA_RESOURCE) + dataIn = new DataInputViewStreamWrapper(is) + assertEquals(TEST_DATA_1, currentSerializer.deserialize(dataIn)) + assertEquals(TEST_DATA_2, currentSerializer.deserialize(dataIn)) + assertEquals(TEST_DATA_3, currentSerializer.deserialize(dataIn)) + } finally { + if (is != null) { + is.close() + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTestGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTestGenerator.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTestGenerator.scala new file mode 100644 index 0000000..6f088b4 --- /dev/null +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTestGenerator.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.runtime + +import java.io.FileOutputStream +import java.util.Collections + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil +import org.apache.flink.core.memory.DataOutputViewStreamWrapper + +/** + * Run this code on a 1.3 (or earlier) branch to generate the test data + * for the [[TupleSerializerCompatibilityTest]]. + * + * This generator is a separate file because a companion object would have side-effects on the + * annotated classes generated by Scala. + */ +object TupleSerializerCompatibilityTestGenerator { + + case class TestCaseClass( + i: Int, + e: Either[String, Unit], + t2: (Boolean, String), + t1: (Double), + t2ii: (Int, Int)) + + val TEST_DATA_1 = TestCaseClass(42, Left("Hello"), (false, "what?"), 12.2, (12, 12)) + val TEST_DATA_2 = TestCaseClass(42, Right(), (false, "what?"), 12.2, (100, 200)) + val TEST_DATA_3 = TestCaseClass(100, Left("Hello"), (true, "what?"), 14.2, (-1, Int.MinValue)) + + val SNAPSHOT_RESOURCE: String = "flink-1.3.2-scala-types-serializer-snapshot" + + val DATA_RESOURCE: String = "flink-1.3.2-scala-types-serializer-data" + + val SNAPSHOT_OUTPUT_PATH: String = "/tmp/snapshot/" + SNAPSHOT_RESOURCE + + val DATA_OUTPUT_PATH: String = "/tmp/snapshot/" + DATA_RESOURCE + + def main(args: Array[String]): Unit = { + + val typeInfo = org.apache.flink.api.scala.createTypeInformation[TestCaseClass] + + val serializer = typeInfo.createSerializer(new ExecutionConfig()) + val configSnapshot = serializer.snapshotConfiguration() + + var fos: FileOutputStream = null + try { + fos = new FileOutputStream(SNAPSHOT_OUTPUT_PATH) + val out = new DataOutputViewStreamWrapper(fos) + + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( + out, + Collections.singletonList( + new org.apache.flink.api.java.tuple.Tuple2(serializer, configSnapshot) + ) + ) + } finally { + if (fos != null) { + fos.close() + } + } + + fos = null + try { + fos = new FileOutputStream(DATA_OUTPUT_PATH) + val out = new DataOutputViewStreamWrapper(fos) + + serializer.serialize(TEST_DATA_1, out) + serializer.serialize(TEST_DATA_2, out) + serializer.serialize(TEST_DATA_3, out) + } finally { + if (fos != null) { + fos.close() + } + } + } +}