Repository: flink Updated Branches: refs/heads/master fe4e96a72 -> 63c04a516
[hotfix] [core] Catch InvalidClassException in TypeSerializerSerializationProxy Previously, the TypeSerializerSerializationProxy only uses the dummy ClassNotFoundDummyTypeSerializer as a placeholder in the case where the user uses a completely new serializer and deletes the old one. There is also the case where the user changes the original serializer's implementation and results in an InvalidClassException when trying to deserialize the serializer. We should also use the ClassNotFoundDummyTypeSerializer as a temporary placeholder in this case. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/409319a0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/409319a0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/409319a0 Branch: refs/heads/master Commit: 409319a065407f8ed6ae3f43c06b327adfd2501c Parents: fe4e96a Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Authored: Tue May 2 19:35:18 2017 +0800 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Mon May 8 00:33:41 2017 +0800 ---------------------------------------------------------------------- .../TypeSerializerSerializationProxy.java | 3 +- .../TypeSerializerSerializationProxyTest.java | 50 +++++++++++++++++++- 2 files changed, 51 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/409319a0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java index cb8967b..c94124f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InvalidClassException; import java.util.Arrays; @Internal @@ -97,7 +98,7 @@ public class TypeSerializerSerializationProxy<T> extends VersionedIOReadableWrit in.readFully(buffer); try { typeSerializer = InstantiationUtil.deserializeObject(buffer, userClassLoader); - } catch (ClassNotFoundException e) { + } catch (ClassNotFoundException | InvalidClassException e) { if (ignoreClassNotFound) { // we create a dummy so that all the information is not lost when we get a new checkpoint before receiving // a proper typeserializer from the user http://git-wip-us.apache.org/repos/asf/flink/blob/409319a0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java index 982e7ff..db1b4ef 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java @@ -26,13 +26,21 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; +import java.io.InvalidClassException; import java.net.URL; import java.net.URLClassLoader; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +@RunWith(PowerMockRunner.class) +@PrepareForTest(InstantiationUtil.class) public class TypeSerializerSerializationProxyTest { @Test @@ -91,4 +99,44 @@ public class TypeSerializerSerializationProxyTest { InstantiationUtil.serializeObject(serializer), ((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) proxy.getTypeSerializer()).getActualBytes()); } -} \ No newline at end of file + + @Test + public void testStateSerializerSerializationProxyInvalidClass() throws Exception { + + TypeSerializer<?> serializer = IntSerializer.INSTANCE; + + TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer); + + byte[] serialized; + try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { + proxy.write(new DataOutputViewStreamWrapper(out)); + serialized = out.toByteArray(); + } + + PowerMockito.spy(InstantiationUtil.class); + PowerMockito + .doThrow(new InvalidClassException("test invalid class exception")) + .when(InstantiationUtil.class, "deserializeObject", any(byte[].class), any(ClassLoader.class)); + + proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null)); + + try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { + proxy.read(new DataInputViewStreamWrapper(in)); + fail("InvalidClassException expected, leading to IOException"); + } catch (IOException expected) { + + } + + proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null), true); + + try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { + proxy.read(new DataInputViewStreamWrapper(in)); + } + + Assert.assertTrue(proxy.getTypeSerializer() instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer); + + Assert.assertArrayEquals( + InstantiationUtil.serializeObject(serializer), + ((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) proxy.getTypeSerializer()).getActualBytes()); + } +}