Repository: flink
Updated Branches:
  refs/heads/release-1.3 625da00a5 -> 4eebf21e9


[FLINK-6565] Fail memory-backed state restores with meaningful message if 
previous serializer is unavailable

This closes #3882.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7de22122
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7de22122
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7de22122

Branch: refs/heads/release-1.3
Commit: 7de221224ebf179581228ae2db7bd685468189da
Parents: 625da00
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Authored: Fri May 12 19:11:25 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Sat May 13 17:05:35 2017 +0800

----------------------------------------------------------------------
 .../state/DefaultOperatorStateBackend.java      |  17 +++
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   8 ++
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   6 +
 .../state/heap/HeapKeyedStateBackend.java       |  17 +++
 .../runtime/state/MemoryStateBackendTest.java   | 135 +++++++++++++++++++
 .../runtime/state/OperatorStateBackendTest.java |  70 +++++++++-
 .../runtime/state/StateBackendTestBase.java     |   2 +-
 7 files changed, 251 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7de22122/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index ab0c1f0..1d3af72 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -293,6 +294,22 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
                                // Recreate all PartitionableListStates from 
the meta info
                                for 
(RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : 
restoredMetaInfoSnapshots) {
+
+                                       if 
(restoredMetaInfo.getPartitionStateSerializer() == null ||
+                                                       
restoredMetaInfo.getPartitionStateSerializer()
+                                                               instanceof 
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) {
+
+                                               // must fail now if the 
previous serializer cannot be restored because there is no serializer
+                                               // capable of reading previous 
state
+                                               // TODO when eager state 
registration is in place, we can try to get a convert deserializer
+                                               // TODO from the newly 
registered serializer instead of simply failing here
+
+                                               throw new IOException("Unable 
to restore operator state [" + restoredMetaInfo.getName() + "]." +
+                                                       " The previous 
serializer of the operator state must be present; the serializer could" +
+                                                       " have been removed 
from the classpath, or its implementation have changed and could" +
+                                                       " not be loaded. This 
is a temporary restriction that will be fixed in future versions.");
+                                       }
+
                                        PartitionableListState<?> listState = 
registeredStates.get(restoredMetaInfo.getName());
 
                                        if (null == listState) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7de22122/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
index 83aa335..ac81e86 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
@@ -28,6 +28,8 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -37,6 +39,8 @@ import java.io.IOException;
  */
 public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(KeyedBackendStateMetaInfoSnapshotReaderWriters.class);
+
        // 
-------------------------------------------------------------------------------
        //  Writers
        //   - v1: Flink 1.2.x
@@ -230,6 +234,8 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters 
{
                                namespaceSerializerProxy.read(inViewWrapper);
                                
metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer());
                        } catch (IOException e) {
+                               LOG.warn("Deserialization of previous namespace 
serializer errored; setting serializer to null. ", e);
+
                                metaInfo.setNamespaceSerializer(null);
                        }
 
@@ -241,6 +247,8 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters 
{
                                stateSerializerProxy.read(inViewWrapper);
                                
metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer());
                        } catch (IOException e) {
+                               LOG.warn("Deserialization of previous state 
serializer errored; setting serializer to null. ", e);
+
                                metaInfo.setStateSerializer(null);
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7de22122/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
index 9ab106b..4f151c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -30,6 +30,8 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -39,6 +41,8 @@ import java.io.IOException;
  */
 public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(OperatorBackendStateMetaInfoSnapshotReaderWriters.class);
+
        // 
-------------------------------------------------------------------------------
        //  Writers
        //   - v1: Flink 1.2.x
@@ -219,6 +223,8 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                                
partitionStateSerializerProxy.read(inViewWrapper);
                                
stateMetaInfo.setPartitionStateSerializer(partitionStateSerializerProxy.getTypeSerializer());
                        } catch (IOException e) {
+                               LOG.warn("Deserialization of previous 
serializer errored; setting serializer to null. ", e);
+
                                stateMetaInfo.setPartitionStateSerializer(null);
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7de22122/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 866ed28..bc314df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -389,6 +390,22 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                for 
(RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : 
restoredMetaInfos) {
 
+                                       if 
(restoredMetaInfo.getStateSerializer() == null ||
+                                                       
restoredMetaInfo.getStateSerializer()
+                                                               instanceof 
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) {
+
+                                               // must fail now if the 
previous serializer cannot be restored because there is no serializer
+                                               // capable of reading previous 
state
+                                               // TODO when eager state 
registration is in place, we can try to get a convert deserializer
+                                               // TODO from the newly 
registered serializer instead of simply failing here
+
+                                               throw new IOException("Unable 
to restore keyed state [" + restoredMetaInfo.getName() + "]." +
+                                                       " For memory-backed 
keyed state, the previous serializer of the keyed state must be" +
+                                                       " present; the 
serializer could have been removed from the classpath, or its implementation" +
+                                                       " have changed and 
could not be loaded. This is a temporary restriction that will be fixed" +
+                                                       " in future versions.");
+                                       }
+
                                        StateTable<K, ?, ?> stateTable = 
stateTables.get(restoredMetaInfo.getName());
 
                                        //important: only create a new table we 
did not already create it previously

http://git-wip-us.apache.org/repos/asf/flink/blob/7de22122/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 48d56e2..fee97f4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -20,27 +20,48 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.FutureUtil;
+import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.concurrent.RunnableFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link 
org.apache.flink.runtime.state.memory.MemoryStateBackend}.
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, 
OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
 public class MemoryStateBackendTest extends 
StateBackendTestBase<MemoryStateBackend> {
 
        @Override
@@ -198,6 +219,120 @@ public class MemoryStateBackendTest extends 
StateBackendTestBase<MemoryStateBack
                }
        }
 
+       /**
+        * Verifies that the operator state backend fails with appropriate 
error and message if
+        * previous serializer can not be restored.
+        */
+       @Test
+       public void 
testOperatorStateRestoreFailsIfSerializerDeserializationFails() throws 
Exception {
+               AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend(4096);
+
+               Environment env = mock(Environment.class);
+               when(env.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+               
when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader());
+
+               OperatorStateBackend operatorStateBackend = 
abstractStateBackend.createOperatorStateBackend(env, "test-op-name");
+
+               // write some state
+               ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
+               ListStateDescriptor<Serializable> stateDescriptor2 = new 
ListStateDescriptor<>("test2", new JavaSerializer<>());
+               ListStateDescriptor<Serializable> stateDescriptor3 = new 
ListStateDescriptor<>("test3", new JavaSerializer<>());
+               ListState<Serializable> listState1 = 
operatorStateBackend.getListState(stateDescriptor1);
+               ListState<Serializable> listState2 = 
operatorStateBackend.getListState(stateDescriptor2);
+               ListState<Serializable> listState3 = 
operatorStateBackend.getUnionListState(stateDescriptor3);
+
+               listState1.add(42);
+               listState1.add(4711);
+
+               listState2.add(7);
+               listState2.add(13);
+               listState2.add(23);
+
+               listState3.add(17);
+               listState3.add(18);
+               listState3.add(19);
+               listState3.add(20);
+
+               CheckpointStreamFactory streamFactory = 
abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
+               RunnableFuture<OperatorStateHandle> runnableFuture =
+                       operatorStateBackend.snapshot(1, 1, streamFactory, 
CheckpointOptions.forFullCheckpoint());
+               OperatorStateHandle stateHandle = 
FutureUtil.runIfNotDoneAndGet(runnableFuture);
+
+               try {
+
+                       operatorStateBackend.close();
+                       operatorStateBackend.dispose();
+
+                       operatorStateBackend = 
abstractStateBackend.createOperatorStateBackend(
+                               env,
+                               "testOperator");
+
+                       // mock failure when deserializing serializer
+                       TypeSerializerSerializationProxy<?> mockProxy = 
mock(TypeSerializerSerializationProxy.class);
+                       doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+                       
PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+                       
operatorStateBackend.restore(Collections.singletonList(stateHandle));
+
+                       fail("The operator state restore should have failed if 
the previous state serializer could not be loaded.");
+               } catch (IOException expected) {
+                       
Assert.assertTrue(expected.getMessage().contains("Unable to restore operator 
state"));
+               } finally {
+                       stateHandle.discardState();
+               }
+       }
+
+       /**
+        * Verifies that memory-backed keyed state backend fails with 
appropriate error and message if
+        * previous serializer can not be restored.
+        */
+       @Test
+       public void 
testKeyedStateRestoreFailsIfSerializerDeserializationFails() throws Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               KeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+               ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class, null);
+               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               HeapKeyedStateBackend<Integer> heapBackend = 
(HeapKeyedStateBackend<Integer>) backend;
+
+               assertEquals(0, heapBackend.numStateEntries());
+
+               ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               // write some state
+               backend.setCurrentKey(0);
+               state.update("hello");
+               state.update("ciao");
+
+               KeyedStateHandle snapshot = 
runSnapshot(((HeapKeyedStateBackend<Integer>) backend).snapshot(
+                       682375462378L,
+                       2,
+                       streamFactory,
+                       CheckpointOptions.forFullCheckpoint()));
+
+               backend.dispose();
+
+               // ========== restore snapshot ==========
+
+               Environment env = mock(Environment.class);
+               when(env.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+               
when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader());
+
+               // mock failure when deserializing serializer
+               TypeSerializerSerializationProxy<?> mockProxy = 
mock(TypeSerializerSerializationProxy.class);
+               doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+               
PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+               try {
+                       restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
+
+                       fail("The keyed state restore should have failed if the 
previous state serializer could not be loaded.");
+               } catch (IOException expected) {
+                       
Assert.assertTrue(expected.getMessage().contains("Unable to restore keyed 
state"));
+               }
+       }
+
        @Ignore
        @Test
        public void testConcurrentMapIfQueryable() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7de22122/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 85b9eaf..af5f0b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -22,7 +22,9 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
@@ -32,6 +34,10 @@ import 
org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.util.FutureUtil;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.File;
 import java.io.IOException;
@@ -52,9 +58,13 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(OperatorBackendStateMetaInfoSnapshotReaderWriters.class)
 public class OperatorStateBackendTest {
 
        private final ClassLoader classLoader = getClass().getClassLoader();
@@ -290,7 +300,7 @@ public class OperatorStateBackendTest {
 
        @Test
        public void testSnapshotRestoreAsync() throws Exception {
-               DefaultOperatorStateBackend operatorStateBackend =
+               OperatorStateBackend operatorStateBackend =
                                new 
DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), 
new ExecutionConfig(), true);
 
                ListStateDescriptor<MutableType> stateDescriptor1 =
@@ -362,8 +372,7 @@ public class OperatorStateBackendTest {
 
                        AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend(4096);
 
-                       //TODO this is temporarily casted to test already 
functionality that we do not yet expose through public API
-                       operatorStateBackend = (DefaultOperatorStateBackend) 
abstractStateBackend.createOperatorStateBackend(
+                       operatorStateBackend = 
abstractStateBackend.createOperatorStateBackend(
                                        createMockEnvironment(),
                                        "testOperator");
 
@@ -494,6 +503,61 @@ public class OperatorStateBackendTest {
                }
        }
 
+       @Test
+       public void testRestoreFailsIfSerializerDeserializationFails() throws 
Exception {
+               AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend(4096);
+
+               OperatorStateBackend operatorStateBackend = 
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), 
"test-op-name");
+
+               // write some state
+               ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
+               ListStateDescriptor<Serializable> stateDescriptor2 = new 
ListStateDescriptor<>("test2", new JavaSerializer<>());
+               ListStateDescriptor<Serializable> stateDescriptor3 = new 
ListStateDescriptor<>("test3", new JavaSerializer<>());
+               ListState<Serializable> listState1 = 
operatorStateBackend.getListState(stateDescriptor1);
+               ListState<Serializable> listState2 = 
operatorStateBackend.getListState(stateDescriptor2);
+               ListState<Serializable> listState3 = 
operatorStateBackend.getUnionListState(stateDescriptor3);
+
+               listState1.add(42);
+               listState1.add(4711);
+
+               listState2.add(7);
+               listState2.add(13);
+               listState2.add(23);
+
+               listState3.add(17);
+               listState3.add(18);
+               listState3.add(19);
+               listState3.add(20);
+
+               CheckpointStreamFactory streamFactory = 
abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
+               RunnableFuture<OperatorStateHandle> runnableFuture =
+                       operatorStateBackend.snapshot(1, 1, streamFactory, 
CheckpointOptions.forFullCheckpoint());
+               OperatorStateHandle stateHandle = 
FutureUtil.runIfNotDoneAndGet(runnableFuture);
+
+               try {
+
+                       operatorStateBackend.close();
+                       operatorStateBackend.dispose();
+
+                       operatorStateBackend = 
abstractStateBackend.createOperatorStateBackend(
+                               createMockEnvironment(),
+                               "testOperator");
+
+                       // mock failure when deserializing serializer
+                       TypeSerializerSerializationProxy<?> mockProxy = 
mock(TypeSerializerSerializationProxy.class);
+                       doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+                       
PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+                       
operatorStateBackend.restore(Collections.singletonList(stateHandle));
+
+                       fail("The operator state restore should have failed if 
the previous state serializer could not be loaded.");
+               } catch (IOException expected) {
+                       
Assert.assertTrue(expected.getMessage().contains("Unable to restore operator 
state"));
+               } finally {
+                       stateHandle.discardState();
+               }
+       }
+
        static final class MutableType implements Serializable {
 
                private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/7de22122/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 96025fe..658ccde 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -2508,7 +2508,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                }
        }
 
-       private KeyedStateHandle runSnapshot(RunnableFuture<KeyedStateHandle> 
snapshotRunnableFuture) throws Exception {
+       protected KeyedStateHandle runSnapshot(RunnableFuture<KeyedStateHandle> 
snapshotRunnableFuture) throws Exception {
                if(!snapshotRunnableFuture.isDone()) {
                        Thread runner = new Thread(snapshotRunnableFuture);
                        runner.start();

Reply via email to