flink git commit: [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant
Repository: flink Updated Branches: refs/heads/release-1.4 51231c471 -> 93f823fff [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant This closes #5567. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93f823ff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93f823ff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93f823ff Branch: refs/heads/release-1.4 Commit: 93f823fff7b1517cc3f2484fa0091ebdc9e546d0 Parents: 51231c4 Author: Timo WaltherAuthored: Thu Feb 22 17:22:54 2018 +0100 Committer: Timo Walther Committed: Wed Feb 28 13:32:34 2018 +0100 -- .../runtime/TupleSerializerConfigSnapshot.java | 2 +- .../apache/flink/util/InstantiationUtil.java| 56 +-- .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes .../TupleSerializerCompatibilityTest.scala | 86 + ...leSerializerCompatibilityTestGenerator.scala | 94 +++ 6 files changed, 231 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/93f823ff/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java index 705099e..eac5200 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java @@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot extends CompositeTypeSeriali super.read(in); try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) { - tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader()); + tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), true); } catch (ClassNotFoundException e) { throw new IOException("Could not find requested tuple class in classpath.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/93f823ff/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 7ffad55..a95bdf7 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -113,7 +113,7 @@ public final class InstantiationUtil { * * This can be removed once 1.2 is no longer supported. */ - private static Set scalaSerializerClassnames = new HashSet<>(); + private static final Set scalaSerializerClassnames = new HashSet<>(); static { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer"); @@ -121,11 +121,54 @@ public final class InstantiationUtil { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer"); } /** +* The serialVersionUID might change between Scala versions and since those classes are +* part of the tuple serializer config snapshots we need to ignore them. +* +* @see https://issues.apache.org/jira/browse/FLINK-8451;>FLINK-8451 +*/ + private static final Set scalaTypes = new HashSet<>(); + static { + scalaTypes.add("scala.Tuple1"); + scalaTypes.add("scala.Tuple2"); + scalaTypes.add("scala.Tuple3"); + scalaTypes.add("scala.Tuple4"); + scalaTypes.add("scala.Tuple5"); +
flink git commit: [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant
Repository: flink Updated Branches: refs/heads/release-1.5 08e615027 -> 302aaeb02 [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant This closes #5567. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/302aaeb0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/302aaeb0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/302aaeb0 Branch: refs/heads/release-1.5 Commit: 302aaeb021bacf3f37cb9a3ee236304c94adbf30 Parents: 08e6150 Author: Timo WaltherAuthored: Thu Feb 22 17:22:54 2018 +0100 Committer: Timo Walther Committed: Wed Feb 28 13:30:59 2018 +0100 -- .../runtime/TupleSerializerConfigSnapshot.java | 2 +- .../apache/flink/util/InstantiationUtil.java| 56 +-- .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes .../TupleSerializerCompatibilityTest.scala | 86 + ...leSerializerCompatibilityTestGenerator.scala | 94 +++ 6 files changed, 231 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java index 705099e..eac5200 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java @@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot extends CompositeTypeSeriali super.read(in); try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) { - tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader()); + tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), true); } catch (ClassNotFoundException e) { throw new IOException("Could not find requested tuple class in classpath.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 11e3990..978d270 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -113,7 +113,7 @@ public final class InstantiationUtil { * * This can be removed once 1.2 is no longer supported. */ - private static Set scalaSerializerClassnames = new HashSet<>(); + private static final Set scalaSerializerClassnames = new HashSet<>(); static { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer"); @@ -121,11 +121,54 @@ public final class InstantiationUtil { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer"); } /** +* The serialVersionUID might change between Scala versions and since those classes are +* part of the tuple serializer config snapshots we need to ignore them. +* +* @see https://issues.apache.org/jira/browse/FLINK-8451;>FLINK-8451 +*/ + private static final Set scalaTypes = new HashSet<>(); + static { + scalaTypes.add("scala.Tuple1"); + scalaTypes.add("scala.Tuple2"); + scalaTypes.add("scala.Tuple3"); + scalaTypes.add("scala.Tuple4"); + scalaTypes.add("scala.Tuple5"); +
flink git commit: [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant
Repository: flink Updated Branches: refs/heads/master e8d168509 -> 6c837d738 [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant This closes #5567. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c837d73 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c837d73 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c837d73 Branch: refs/heads/master Commit: 6c837d738f90ccf140ad2288cd24f3706babd63c Parents: e8d1685 Author: Timo WaltherAuthored: Thu Feb 22 17:22:54 2018 +0100 Committer: Timo Walther Committed: Wed Feb 28 13:28:12 2018 +0100 -- .../runtime/TupleSerializerConfigSnapshot.java | 2 +- .../apache/flink/util/InstantiationUtil.java| 56 +-- .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes .../TupleSerializerCompatibilityTest.scala | 86 + ...leSerializerCompatibilityTestGenerator.scala | 94 +++ 6 files changed, 231 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6c837d73/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java index 705099e..eac5200 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java @@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot extends CompositeTypeSeriali super.read(in); try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) { - tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader()); + tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), true); } catch (ClassNotFoundException e) { throw new IOException("Could not find requested tuple class in classpath.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/6c837d73/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 11e3990..978d270 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -113,7 +113,7 @@ public final class InstantiationUtil { * * This can be removed once 1.2 is no longer supported. */ - private static Set scalaSerializerClassnames = new HashSet<>(); + private static final Set scalaSerializerClassnames = new HashSet<>(); static { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer"); @@ -121,11 +121,54 @@ public final class InstantiationUtil { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer"); } /** +* The serialVersionUID might change between Scala versions and since those classes are +* part of the tuple serializer config snapshots we need to ignore them. +* +* @see https://issues.apache.org/jira/browse/FLINK-8451;>FLINK-8451 +*/ + private static final Set scalaTypes = new HashSet<>(); + static { + scalaTypes.add("scala.Tuple1"); + scalaTypes.add("scala.Tuple2"); + scalaTypes.add("scala.Tuple3"); + scalaTypes.add("scala.Tuple4"); + scalaTypes.add("scala.Tuple5"); +