Repository: flink
Updated Branches:
  refs/heads/release-1.5 08e615027 -> 302aaeb02


[FLINK-8451] [serializers] Make Scala tuple serializer deserialization more 
failure tolerant

This closes #5567.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/302aaeb0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/302aaeb0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/302aaeb0

Branch: refs/heads/release-1.5
Commit: 302aaeb021bacf3f37cb9a3ee236304c94adbf30
Parents: 08e6150
Author: Timo Walther <twal...@apache.org>
Authored: Thu Feb 22 17:22:54 2018 +0100
Committer: Timo Walther <twal...@apache.org>
Committed: Wed Feb 28 13:30:59 2018 +0100

----------------------------------------------------------------------
 .../runtime/TupleSerializerConfigSnapshot.java  |   2 +-
 .../apache/flink/util/InstantiationUtil.java    |  56 +++++++++--
 .../flink-1.3.2-scala-types-serializer-data     | Bin 0 -> 97 bytes
 .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes
 .../TupleSerializerCompatibilityTest.scala      |  86 +++++++++++++++++
 ...leSerializerCompatibilityTestGenerator.scala |  94 +++++++++++++++++++
 6 files changed, 231 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
index 705099e..eac5200 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot<T> extends 
CompositeTypeSeriali
                super.read(in);
 
                try (final DataInputViewStream inViewWrapper = new 
DataInputViewStream(in)) {
-                       tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
+                       tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), 
true);
                } catch (ClassNotFoundException e) {
                        throw new IOException("Could not find requested tuple 
class in classpath.", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 11e3990..978d270 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -113,7 +113,7 @@ public final class InstantiationUtil {
         *
         * <p>This can be removed once 1.2 is no longer supported.
         */
-       private static Set<String> scalaSerializerClassnames = new HashSet<>();
+       private static final Set<String> scalaSerializerClassnames = new 
HashSet<>();
        static {
                
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");
                
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
@@ -121,11 +121,54 @@ public final class InstantiationUtil {
                
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");
                
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");
                
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
-               
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
                
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
        }
 
        /**
+        * The serialVersionUID might change between Scala versions and since 
those classes are
+        * part of the tuple serializer config snapshots we need to ignore them.
+        *
+        * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-8451";>FLINK-8451</a>
+        */
+       private static final Set<String> scalaTypes = new HashSet<>();
+       static {
+               scalaTypes.add("scala.Tuple1");
+               scalaTypes.add("scala.Tuple2");
+               scalaTypes.add("scala.Tuple3");
+               scalaTypes.add("scala.Tuple4");
+               scalaTypes.add("scala.Tuple5");
+               scalaTypes.add("scala.Tuple6");
+               scalaTypes.add("scala.Tuple7");
+               scalaTypes.add("scala.Tuple8");
+               scalaTypes.add("scala.Tuple9");
+               scalaTypes.add("scala.Tuple10");
+               scalaTypes.add("scala.Tuple11");
+               scalaTypes.add("scala.Tuple12");
+               scalaTypes.add("scala.Tuple13");
+               scalaTypes.add("scala.Tuple14");
+               scalaTypes.add("scala.Tuple15");
+               scalaTypes.add("scala.Tuple16");
+               scalaTypes.add("scala.Tuple17");
+               scalaTypes.add("scala.Tuple18");
+               scalaTypes.add("scala.Tuple19");
+               scalaTypes.add("scala.Tuple20");
+               scalaTypes.add("scala.Tuple21");
+               scalaTypes.add("scala.Tuple22");
+               scalaTypes.add("scala.Tuple1$mcJ$sp");
+               scalaTypes.add("scala.Tuple1$mcI$sp");
+               scalaTypes.add("scala.Tuple1$mcD$sp");
+               scalaTypes.add("scala.Tuple2$mcJJ$sp");
+               scalaTypes.add("scala.Tuple2$mcJI$sp");
+               scalaTypes.add("scala.Tuple2$mcJD$sp");
+               scalaTypes.add("scala.Tuple2$mcIJ$sp");
+               scalaTypes.add("scala.Tuple2$mcII$sp");
+               scalaTypes.add("scala.Tuple2$mcID$sp");
+               scalaTypes.add("scala.Tuple2$mcDJ$sp");
+               scalaTypes.add("scala.Tuple2$mcDI$sp");
+               scalaTypes.add("scala.Tuple2$mcDD$sp");
+       }
+
+       /**
         * An {@link ObjectInputStream} that ignores serialVersionUID 
mismatches when deserializing objects of
         * anonymous classes or our Scala serializer classes and also replaces 
occurences of GenericData.Array
         * (from Avro) by a dummy class so that the KryoSerializer can still be 
deserialized without
@@ -158,12 +201,13 @@ public final class InstantiationUtil {
                                }
                        }
 
-                       Class localClass = resolveClass(streamClassDescriptor);
-                       if 
(scalaSerializerClassnames.contains(localClass.getName()) || 
localClass.isAnonymousClass()
+                       final Class localClass = 
resolveClass(streamClassDescriptor);
+                       final String name = localClass.getName();
+                       if (scalaSerializerClassnames.contains(name) || 
scalaTypes.contains(name) || localClass.isAnonymousClass()
                                // isAnonymousClass does not work for anonymous 
Scala classes; additionally check by classname
-                               || localClass.getName().contains("$anon$") || 
localClass.getName().contains("$anonfun")) {
+                               || name.contains("$anon$") || 
name.contains("$anonfun")) {
 
-                               ObjectStreamClass localClassDescriptor = 
ObjectStreamClass.lookup(localClass);
+                               final ObjectStreamClass localClassDescriptor = 
ObjectStreamClass.lookup(localClass);
                                if (localClassDescriptor != null
                                        && 
localClassDescriptor.getSerialVersionUID() != 
streamClassDescriptor.getSerialVersionUID()) {
                                        LOG.warn("Ignoring serialVersionUID 
mismatch for anonymous class {}; was {}, now {}.",

http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-data
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-data 
b/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-data
new file mode 100644
index 0000000..ddd6ac0
Binary files /dev/null and 
b/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-data differ

http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-snapshot 
b/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-snapshot
new file mode 100644
index 0000000..1ebdabb
Binary files /dev/null and 
b/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-snapshot 
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
new file mode 100644
index 0000000..7d594fb
--- /dev/null
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.runtime
+
+import java.io.InputStream
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
+import 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot
+import org.apache.flink.api.scala.createTypeInformation
+import 
org.apache.flink.api.scala.runtime.TupleSerializerCompatibilityTestGenerator._
+import org.apache.flink.api.scala.typeutils.CaseClassSerializer
+import org.apache.flink.core.memory.DataInputViewStreamWrapper
+import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
+import org.junit.Test
+
+/**
+  * Test for ensuring backwards compatibility of tuples and case classes 
across Scala versions.
+  */
+class TupleSerializerCompatibilityTest {
+
+  @Test
+  def testCompatibilityWithFlink_1_3(): Unit = {
+    var is: InputStream = null
+    try {
+      is = getClass.getClassLoader.getResourceAsStream(SNAPSHOT_RESOURCE)
+      val snapshotIn = new DataInputViewStreamWrapper(is)
+
+      val deserialized = 
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
+        snapshotIn,
+        getClass.getClassLoader)
+
+      assertEquals(1, deserialized.size)
+
+      val oldSerializer = deserialized.get(0).f0
+      val oldConfigSnapshot = deserialized.get(0).f1
+
+      // test serializer and config snapshot
+      assertNotNull(oldSerializer)
+      assertNotNull(oldConfigSnapshot)
+      assertTrue(oldSerializer.isInstanceOf[CaseClassSerializer[_]])
+      
assertTrue(oldConfigSnapshot.isInstanceOf[TupleSerializerConfigSnapshot[_]])
+
+      val currentSerializer = createTypeInformation[TestCaseClass]
+        .createSerializer(new ExecutionConfig())
+      
assertFalse(currentSerializer.ensureCompatibility(oldConfigSnapshot).isRequiresMigration)
+
+      // test old data serialization
+      is.close()
+      is = getClass.getClassLoader.getResourceAsStream(DATA_RESOURCE)
+      var dataIn = new DataInputViewStreamWrapper(is)
+
+      assertEquals(TEST_DATA_1, oldSerializer.deserialize(dataIn))
+      assertEquals(TEST_DATA_2, oldSerializer.deserialize(dataIn))
+      assertEquals(TEST_DATA_3, oldSerializer.deserialize(dataIn))
+
+      // test new data serialization
+      is.close()
+      is = getClass.getClassLoader.getResourceAsStream(DATA_RESOURCE)
+      dataIn = new DataInputViewStreamWrapper(is)
+      assertEquals(TEST_DATA_1, currentSerializer.deserialize(dataIn))
+      assertEquals(TEST_DATA_2, currentSerializer.deserialize(dataIn))
+      assertEquals(TEST_DATA_3, currentSerializer.deserialize(dataIn))
+    } finally {
+      if (is != null) {
+        is.close()
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTestGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTestGenerator.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTestGenerator.scala
new file mode 100644
index 0000000..6f088b4
--- /dev/null
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTestGenerator.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.runtime
+
+import java.io.FileOutputStream
+import java.util.Collections
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper
+
+/**
+  * Run this code on a 1.3 (or earlier) branch to generate the test data
+  * for the [[TupleSerializerCompatibilityTest]].
+  *
+  * This generator is a separate file because a companion object would have 
side-effects on the
+  * annotated classes generated by Scala.
+  */
+object TupleSerializerCompatibilityTestGenerator {
+
+  case class TestCaseClass(
+    i: Int,
+    e: Either[String, Unit],
+    t2: (Boolean, String),
+    t1: (Double),
+    t2ii: (Int, Int))
+
+  val TEST_DATA_1 = TestCaseClass(42, Left("Hello"), (false, "what?"), 12.2, 
(12, 12))
+  val TEST_DATA_2 = TestCaseClass(42, Right(), (false, "what?"), 12.2, (100, 
200))
+  val TEST_DATA_3 = TestCaseClass(100, Left("Hello"), (true, "what?"), 14.2, 
(-1, Int.MinValue))
+
+  val SNAPSHOT_RESOURCE: String = "flink-1.3.2-scala-types-serializer-snapshot"
+
+  val DATA_RESOURCE: String = "flink-1.3.2-scala-types-serializer-data"
+
+  val SNAPSHOT_OUTPUT_PATH: String = "/tmp/snapshot/" + SNAPSHOT_RESOURCE
+
+  val DATA_OUTPUT_PATH: String = "/tmp/snapshot/" + DATA_RESOURCE
+
+  def main(args: Array[String]): Unit = {
+
+    val typeInfo = 
org.apache.flink.api.scala.createTypeInformation[TestCaseClass]
+
+    val serializer = typeInfo.createSerializer(new ExecutionConfig())
+    val configSnapshot = serializer.snapshotConfiguration()
+
+    var fos: FileOutputStream = null
+    try {
+      fos = new FileOutputStream(SNAPSHOT_OUTPUT_PATH)
+      val out = new DataOutputViewStreamWrapper(fos)
+
+      TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
+        out,
+        Collections.singletonList(
+          new org.apache.flink.api.java.tuple.Tuple2(serializer, 
configSnapshot)
+        )
+      )
+    } finally {
+      if (fos != null) {
+        fos.close()
+      }
+    }
+
+    fos = null
+    try {
+      fos = new FileOutputStream(DATA_OUTPUT_PATH)
+      val out = new DataOutputViewStreamWrapper(fos)
+
+      serializer.serialize(TEST_DATA_1, out)
+      serializer.serialize(TEST_DATA_2, out)
+      serializer.serialize(TEST_DATA_3, out)
+    } finally {
+      if (fos != null) {
+        fos.close()
+      }
+    }
+  }
+}

Reply via email to