Repository: flink
Updated Branches:
  refs/heads/master fcd264a70 -> 5f08e5359


[FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serialization

This commit adds a reimplemented JavaSerializer to be registered with
Kryo. This is due to a know issue with Kryo's JavaSerializer that may
use the wrong classloader for deserialzation.

Instead of registering Kryo's JavaSerializer for Throwables, it is now
changed to register the reimplemented JavaSerializer. Users who bump
into ClassNotFoundExceptions if they are using Kryo's JavaSerializer for
their own types are also recommended to change to Flink's JavaSerializer.

This closes #3517.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2143172
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2143172
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2143172

Branch: refs/heads/master
Commit: f2143172feca2925832c8b26c3c9fbbb92ecd48f
Parents: fcd264a
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Authored: Sun Mar 12 22:46:27 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Mon Mar 13 23:31:33 2017 +0800

----------------------------------------------------------------------
 docs/dev/custom_serializers.md                  | 12 +++
 .../typeutils/runtime/kryo/JavaSerializer.java  | 82 ++++++++++++++++++++
 .../typeutils/runtime/kryo/KryoSerializer.java  |  5 +-
 .../apache/flink/util/InstantiationUtil.java    |  4 +-
 4 files changed, 98 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2143172/docs/dev/custom_serializers.md
----------------------------------------------------------------------
diff --git a/docs/dev/custom_serializers.md b/docs/dev/custom_serializers.md
index 2b72ca0..ddfc2ee 100644
--- a/docs/dev/custom_serializers.md
+++ b/docs/dev/custom_serializers.md
@@ -109,4 +109,16 @@ For Google Protobuf you need the following Maven 
dependency:
 
 Please adjust the versions of both libraries as needed.
 
+### Issue with using Kryo's `JavaSerializer` 
 
+If you register Kryo's `JavaSerializer` for your custom type, you may
+encounter `ClassNotFoundException`s even though your custom type class is
+included in the submitted user code jar. This is due to a know issue with
+Kryo's `JavaSerializer`, which may incorrectly use the wrong classloader.
+
+In this case, you should use 
`org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer`
+instead to resolve the issue. This is a reimplemented `JavaSerializer` in Flink
+that makes sure the user code classloader is used.
+
+Please refer to [FLINK-6025](https://issues.apache.org/jira/browse/FLINK-6025)
+for more details.

http://git-wip-us.apache.org/repos/asf/flink/blob/f2143172/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
new file mode 100644
index 0000000..a51647c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.util.ObjectMap;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * This is a reimplementation of Kryo's {@link 
com.esotericsoftware.kryo.serializers.JavaSerializer},
+ * that additionally makes sure the {@link ObjectInputStream} used for 
deserialization specifically uses Kryo's
+ * registered classloader.
+ *
+ * Flink maintains this reimplementation due to a known issue with Kryo's 
{@code JavaSerializer}, in which the wrong
+ * classloader may be used for deserialization, leading to {@link 
ClassNotFoundException}s.
+ *
+ * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-6025";>FLINK-6025</a>
+ * @see <a href="https://github.com/EsotericSoftware/kryo/pull/483";>Known 
issue with Kryo's JavaSerializer</a>
+ *
+ * @param <T> The type to be serialized.
+ */
+public class JavaSerializer<T> extends Serializer<T> {
+
+       public JavaSerializer() {}
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void write(Kryo kryo, Output output, T o) {
+               try {
+                       ObjectMap graphContext = kryo.getGraphContext();
+                       ObjectOutputStream objectStream = 
(ObjectOutputStream)graphContext.get(this);
+                       if (objectStream == null) {
+                               objectStream = new ObjectOutputStream(output);
+                               graphContext.put(this, objectStream);
+                       }
+                       objectStream.writeObject(o);
+                       objectStream.flush();
+               } catch (Exception ex) {
+                       throw new KryoException("Error during Java 
serialization.", ex);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public T read(Kryo kryo, Input input, Class aClass) {
+               try {
+                       ObjectMap graphContext = kryo.getGraphContext();
+                       ObjectInputStream objectStream = 
(ObjectInputStream)graphContext.get(this);
+                       if (objectStream == null) {
+                               // make sure we use Kryo's classloader
+                               objectStream = new 
InstantiationUtil.ClassLoaderObjectInputStream(input, kryo.getClassLoader());
+                               graphContext.put(this, objectStream);
+                       }
+                       return (T) objectStream.readObject();
+               } catch (Exception ex) {
+                       throw new KryoException("Error during Java 
deserialization.", ex);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2143172/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index e74e251..44c952a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -24,7 +24,6 @@ import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
 
 import org.apache.avro.generic.GenericData;
 
@@ -130,7 +129,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
        @Override
        public KryoSerializer<T> duplicate() {
-               return new KryoSerializer<T>(this);
+               return new KryoSerializer<>(this);
        }
 
        @Override
@@ -331,6 +330,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                        kryo.setReferences(true);
                        
                        // Throwable and all subclasses should be serialized 
via java serialization
+                       // Note: the registered JavaSerializer is Flink's own 
implementation, and not Kryo's.
+                       //       This is due to a know issue with Kryo's 
JavaSerializer. See FLINK-6025 for details.
                        kryo.addDefaultSerializer(Throwable.class, new 
JavaSerializer());
 
                        // Add default serializers first, so that they type 
registrations without a serializer

http://git-wip-us.apache.org/repos/asf/flink/blob/f2143172/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index d4a031c..6441c86 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -45,9 +45,7 @@ import java.util.HashMap;
 public final class InstantiationUtil {
        
        /**
-        * A custom ObjectInputStream that can also load user-code using a
-        * user-code ClassLoader.
-        *
+        * A custom ObjectInputStream that can load classes using a specific 
ClassLoader.
         */
        public static class ClassLoaderObjectInputStream extends 
ObjectInputStream {
 

Reply via email to