Repository: flink
Updated Branches:
  refs/heads/master fe4e96a72 -> 63c04a516


[hotfix] [core] Catch InvalidClassException in TypeSerializerSerializationProxy

Previously, the TypeSerializerSerializationProxy only uses the dummy
ClassNotFoundDummyTypeSerializer as a placeholder in the case where the
user uses a completely new serializer and deletes the old one.

There is also the case where the user changes the original serializer's
implementation and results in an InvalidClassException when trying to
deserialize the serializer. We should also use the
ClassNotFoundDummyTypeSerializer as a temporary placeholder in this
case.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/409319a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/409319a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/409319a0

Branch: refs/heads/master
Commit: 409319a065407f8ed6ae3f43c06b327adfd2501c
Parents: fe4e96a
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Authored: Tue May 2 19:35:18 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Mon May 8 00:33:41 2017 +0800

----------------------------------------------------------------------
 .../TypeSerializerSerializationProxy.java       |  3 +-
 .../TypeSerializerSerializationProxyTest.java   | 50 +++++++++++++++++++-
 2 files changed, 51 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/409319a0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
index cb8967b..c94124f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InvalidClassException;
 import java.util.Arrays;
 
 @Internal
@@ -97,7 +98,7 @@ public class TypeSerializerSerializationProxy<T> extends 
VersionedIOReadableWrit
                in.readFully(buffer);
                try {
                        typeSerializer = 
InstantiationUtil.deserializeObject(buffer, userClassLoader);
-               } catch (ClassNotFoundException e) {
+               } catch (ClassNotFoundException | InvalidClassException e) {
                        if (ignoreClassNotFound) {
                                // we create a dummy so that all the 
information is not lost when we get a new checkpoint before receiving
                                // a proper typeserializer from the user

http://git-wip-us.apache.org/repos/asf/flink/blob/409319a0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
index 982e7ff..db1b4ef 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
@@ -26,13 +26,21 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.IOException;
+import java.io.InvalidClassException;
 import java.net.URL;
 import java.net.URLClassLoader;
 
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(InstantiationUtil.class)
 public class TypeSerializerSerializationProxyTest {
 
        @Test
@@ -91,4 +99,44 @@ public class TypeSerializerSerializationProxyTest {
                                InstantiationUtil.serializeObject(serializer),
                                
((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) 
proxy.getTypeSerializer()).getActualBytes());
        }
-}
\ No newline at end of file
+
+       @Test
+       public void testStateSerializerSerializationProxyInvalidClass() throws 
Exception {
+
+               TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+               TypeSerializerSerializationProxy<?> proxy = new 
TypeSerializerSerializationProxy<>(serializer);
+
+               byte[] serialized;
+               try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
+                       proxy.write(new DataOutputViewStreamWrapper(out));
+                       serialized = out.toByteArray();
+               }
+
+               PowerMockito.spy(InstantiationUtil.class);
+               PowerMockito
+                       .doThrow(new InvalidClassException("test invalid class 
exception"))
+                       .when(InstantiationUtil.class, "deserializeObject", 
any(byte[].class), any(ClassLoader.class));
+
+               proxy = new TypeSerializerSerializationProxy<>(new 
URLClassLoader(new URL[0], null));
+
+               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
+                       proxy.read(new DataInputViewStreamWrapper(in));
+                       fail("InvalidClassException expected, leading to 
IOException");
+               } catch (IOException expected) {
+
+               }
+
+               proxy = new TypeSerializerSerializationProxy<>(new 
URLClassLoader(new URL[0], null), true);
+
+               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
+                       proxy.read(new DataInputViewStreamWrapper(in));
+               }
+
+               Assert.assertTrue(proxy.getTypeSerializer() instanceof 
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer);
+
+               Assert.assertArrayEquals(
+                       InstantiationUtil.serializeObject(serializer),
+                       
((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) 
proxy.getTypeSerializer()).getActualBytes());
+       }
+}

Reply via email to