flink git commit: [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant

2018-02-28 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/release-1.4 51231c471 -> 93f823fff


[FLINK-8451] [serializers] Make Scala tuple serializer deserialization more 
failure tolerant

This closes #5567.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93f823ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93f823ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93f823ff

Branch: refs/heads/release-1.4
Commit: 93f823fff7b1517cc3f2484fa0091ebdc9e546d0
Parents: 51231c4
Author: Timo Walther 
Authored: Thu Feb 22 17:22:54 2018 +0100
Committer: Timo Walther 
Committed: Wed Feb 28 13:32:34 2018 +0100

--
 .../runtime/TupleSerializerConfigSnapshot.java  |   2 +-
 .../apache/flink/util/InstantiationUtil.java|  56 +--
 .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes
 .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes
 .../TupleSerializerCompatibilityTest.scala  |  86 +
 ...leSerializerCompatibilityTestGenerator.scala |  94 +++
 6 files changed, 231 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/93f823ff/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
index 705099e..eac5200 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot extends 
CompositeTypeSeriali
super.read(in);
 
try (final DataInputViewStream inViewWrapper = new 
DataInputViewStream(in)) {
-   tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
+   tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), 
true);
} catch (ClassNotFoundException e) {
throw new IOException("Could not find requested tuple 
class in classpath.", e);
}

http://git-wip-us.apache.org/repos/asf/flink/blob/93f823ff/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 7ffad55..a95bdf7 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -113,7 +113,7 @@ public final class InstantiationUtil {
 *
 * This can be removed once 1.2 is no longer supported.
 */
-   private static Set scalaSerializerClassnames = new HashSet<>();
+   private static final Set scalaSerializerClassnames = new 
HashSet<>();
static {

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
@@ -121,11 +121,54 @@ public final class InstantiationUtil {

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
-   
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
}
 
/**
+* The serialVersionUID might change between Scala versions and since 
those classes are
+* part of the tuple serializer config snapshots we need to ignore them.
+*
+* @see https://issues.apache.org/jira/browse/FLINK-8451;>FLINK-8451
+*/
+   private static final Set scalaTypes = new HashSet<>();
+   static {
+   scalaTypes.add("scala.Tuple1");
+   scalaTypes.add("scala.Tuple2");
+   scalaTypes.add("scala.Tuple3");
+   scalaTypes.add("scala.Tuple4");
+   scalaTypes.add("scala.Tuple5");
+   

flink git commit: [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant

2018-02-28 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/release-1.5 08e615027 -> 302aaeb02


[FLINK-8451] [serializers] Make Scala tuple serializer deserialization more 
failure tolerant

This closes #5567.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/302aaeb0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/302aaeb0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/302aaeb0

Branch: refs/heads/release-1.5
Commit: 302aaeb021bacf3f37cb9a3ee236304c94adbf30
Parents: 08e6150
Author: Timo Walther 
Authored: Thu Feb 22 17:22:54 2018 +0100
Committer: Timo Walther 
Committed: Wed Feb 28 13:30:59 2018 +0100

--
 .../runtime/TupleSerializerConfigSnapshot.java  |   2 +-
 .../apache/flink/util/InstantiationUtil.java|  56 +--
 .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes
 .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes
 .../TupleSerializerCompatibilityTest.scala  |  86 +
 ...leSerializerCompatibilityTestGenerator.scala |  94 +++
 6 files changed, 231 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
index 705099e..eac5200 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot extends 
CompositeTypeSeriali
super.read(in);
 
try (final DataInputViewStream inViewWrapper = new 
DataInputViewStream(in)) {
-   tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
+   tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), 
true);
} catch (ClassNotFoundException e) {
throw new IOException("Could not find requested tuple 
class in classpath.", e);
}

http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 11e3990..978d270 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -113,7 +113,7 @@ public final class InstantiationUtil {
 *
 * This can be removed once 1.2 is no longer supported.
 */
-   private static Set scalaSerializerClassnames = new HashSet<>();
+   private static final Set scalaSerializerClassnames = new 
HashSet<>();
static {

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
@@ -121,11 +121,54 @@ public final class InstantiationUtil {

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
-   
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
}
 
/**
+* The serialVersionUID might change between Scala versions and since 
those classes are
+* part of the tuple serializer config snapshots we need to ignore them.
+*
+* @see https://issues.apache.org/jira/browse/FLINK-8451;>FLINK-8451
+*/
+   private static final Set scalaTypes = new HashSet<>();
+   static {
+   scalaTypes.add("scala.Tuple1");
+   scalaTypes.add("scala.Tuple2");
+   scalaTypes.add("scala.Tuple3");
+   scalaTypes.add("scala.Tuple4");
+   scalaTypes.add("scala.Tuple5");
+   

flink git commit: [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant

2018-02-28 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master e8d168509 -> 6c837d738


[FLINK-8451] [serializers] Make Scala tuple serializer deserialization more 
failure tolerant

This closes #5567.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c837d73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c837d73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c837d73

Branch: refs/heads/master
Commit: 6c837d738f90ccf140ad2288cd24f3706babd63c
Parents: e8d1685
Author: Timo Walther 
Authored: Thu Feb 22 17:22:54 2018 +0100
Committer: Timo Walther 
Committed: Wed Feb 28 13:28:12 2018 +0100

--
 .../runtime/TupleSerializerConfigSnapshot.java  |   2 +-
 .../apache/flink/util/InstantiationUtil.java|  56 +--
 .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes
 .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes
 .../TupleSerializerCompatibilityTest.scala  |  86 +
 ...leSerializerCompatibilityTestGenerator.scala |  94 +++
 6 files changed, 231 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6c837d73/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
index 705099e..eac5200 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot extends 
CompositeTypeSeriali
super.read(in);
 
try (final DataInputViewStream inViewWrapper = new 
DataInputViewStream(in)) {
-   tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
+   tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), 
true);
} catch (ClassNotFoundException e) {
throw new IOException("Could not find requested tuple 
class in classpath.", e);
}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c837d73/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 11e3990..978d270 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -113,7 +113,7 @@ public final class InstantiationUtil {
 *
 * This can be removed once 1.2 is no longer supported.
 */
-   private static Set scalaSerializerClassnames = new HashSet<>();
+   private static final Set scalaSerializerClassnames = new 
HashSet<>();
static {

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
@@ -121,11 +121,54 @@ public final class InstantiationUtil {

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
-   
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
}
 
/**
+* The serialVersionUID might change between Scala versions and since 
those classes are
+* part of the tuple serializer config snapshots we need to ignore them.
+*
+* @see https://issues.apache.org/jira/browse/FLINK-8451;>FLINK-8451
+*/
+   private static final Set scalaTypes = new HashSet<>();
+   static {
+   scalaTypes.add("scala.Tuple1");
+   scalaTypes.add("scala.Tuple2");
+   scalaTypes.add("scala.Tuple3");
+   scalaTypes.add("scala.Tuple4");
+   scalaTypes.add("scala.Tuple5");
+