http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 7f24cd3..aa0f08d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
@@ -140,7 +141,9 @@ public class StreamingJobGraphGenerator {
                        
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
                }
 
-               setChaining(hashes, legacyHashes);
+               Map<Integer, List<Tuple2<byte[], byte[]>>> 
chainedOperatorHashes = new HashMap<>();
+
+               setChaining(hashes, legacyHashes, chainedOperatorHashes);
 
                setPhysicalEdges();
 
@@ -190,9 +193,9 @@ public class StreamingJobGraphGenerator {
         *
         * <p>This will recursively create all {@link JobVertex} instances.
         */
-       private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, 
byte[]>> legacyHashes) {
+       private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, 
byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> 
chainedOperatorHashes) {
                for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
-                       createChain(sourceNodeId, sourceNodeId, hashes, 
legacyHashes, 0);
+                       createChain(sourceNodeId, sourceNodeId, hashes, 
legacyHashes, 0, chainedOperatorHashes);
                }
        }
 
@@ -201,7 +204,8 @@ public class StreamingJobGraphGenerator {
                        Integer currentNodeId,
                        Map<Integer, byte[]> hashes,
                        List<Map<Integer, byte[]>> legacyHashes,
-                       int chainIndex) {
+                       int chainIndex,
+                       Map<Integer, List<Tuple2<byte[], byte[]>>> 
chainedOperatorHashes) {
 
                if (!builtVertices.contains(startNodeId)) {
 
@@ -220,20 +224,27 @@ public class StreamingJobGraphGenerator {
 
                        for (StreamEdge chainable : chainableOutputs) {
                                transitiveOutEdges.addAll(
-                                               createChain(startNodeId, 
chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1));
+                                               createChain(startNodeId, 
chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, 
chainedOperatorHashes));
                        }
 
                        for (StreamEdge nonChainable : nonChainableOutputs) {
                                transitiveOutEdges.add(nonChainable);
-                               createChain(nonChainable.getTargetId(), 
nonChainable.getTargetId(), hashes, legacyHashes, 0);
+                               createChain(nonChainable.getTargetId(), 
nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
+                       }
+
+                       List<Tuple2<byte[], byte[]>> operatorHashes = 
chainedOperatorHashes.get(startNodeId);
+                       if (operatorHashes == null) {
+                               operatorHashes = new ArrayList<>();
+                               chainedOperatorHashes.put(startNodeId, 
operatorHashes);
                        }
+                       operatorHashes.add(new 
Tuple2<>(hashes.get(currentNodeId), legacyHashes.get(1).get(currentNodeId)));
 
                        chainedNames.put(currentNodeId, 
createChainedName(currentNodeId, chainableOutputs));
                        chainedMinResources.put(currentNodeId, 
createChainedMinResources(currentNodeId, chainableOutputs));
                        chainedPreferredResources.put(currentNodeId, 
createChainedPreferredResources(currentNodeId, chainableOutputs));
 
                        StreamConfig config = currentNodeId.equals(startNodeId)
-                                       ? createJobVertex(startNodeId, hashes, 
legacyHashes)
+                                       ? createJobVertex(startNodeId, hashes, 
legacyHashes, chainedOperatorHashes)
                                        : new StreamConfig(new Configuration());
 
                        setVertexConfig(currentNodeId, config, 
chainableOutputs, nonChainableOutputs);
@@ -308,7 +319,8 @@ public class StreamingJobGraphGenerator {
        private StreamConfig createJobVertex(
                        Integer streamNodeId,
                        Map<Integer, byte[]> hashes,
-                       List<Map<Integer, byte[]>> legacyHashes) {
+                       List<Map<Integer, byte[]>> legacyHashes,
+                       Map<Integer, List<Tuple2<byte[], byte[]>>> 
chainedOperatorHashes) {
 
                JobVertex jobVertex;
                StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
@@ -330,18 +342,32 @@ public class StreamingJobGraphGenerator {
                        }
                }
 
+               List<Tuple2<byte[], byte[]>> chainedOperators = 
chainedOperatorHashes.get(streamNodeId);
+               List<OperatorID> chainedOperatorVertexIds = new ArrayList<>();
+               List<OperatorID> userDefinedChainedOperatorVertexIds = new 
ArrayList<>();
+               if (chainedOperators != null) {
+                       for (Tuple2<byte[], byte[]> chainedOperator : 
chainedOperators) {
+                               chainedOperatorVertexIds.add(new 
OperatorID(chainedOperator.f0));
+                               
userDefinedChainedOperatorVertexIds.add(chainedOperator.f1 != null ? new 
OperatorID(chainedOperator.f1) : null);
+                       }
+               }
+
                if (streamNode.getInputFormat() != null) {
                        jobVertex = new InputFormatVertex(
                                        chainedNames.get(streamNodeId),
                                        jobVertexId,
-                                       legacyJobVertexIds);
+                                       legacyJobVertexIds,
+                                       chainedOperatorVertexIds,
+                                       userDefinedChainedOperatorVertexIds);
                        TaskConfig taskConfig = new 
TaskConfig(jobVertex.getConfiguration());
                        taskConfig.setStubWrapper(new 
UserCodeObjectWrapper<Object>(streamNode.getInputFormat()));
                } else {
                        jobVertex = new JobVertex(
                                        chainedNames.get(streamNodeId),
                                        jobVertexId,
-                                       legacyJobVertexIds);
+                                       legacyJobVertexIds,
+                                       chainedOperatorVertexIds,
+                                       userDefinedChainedOperatorVertexIds);
                }
 
                jobVertex.setResources(chainedMinResources.get(streamNodeId), 
chainedPreferredResources.get(streamNodeId));

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index 05aa694..a897674 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -47,7 +47,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Tests the {@link StreamNode} hash assignment during translation from {@link 
StreamGraph} to
@@ -392,10 +391,10 @@ public class StreamingJobGraphGeneratorNodeHashTest 
extends TestLogger {
        }
 
        /**
-        * Tests that a manual hash for an intermediate chain node throws an 
Exception.
+        * Tests that a manual hash for an intermediate chain node is accepted.
         */
-       @Test(expected = UnsupportedOperationException.class)
-       public void 
testManualHashAssignmentForIntermediateNodeInChainThrowsException() throws 
Exception {
+       @Test
+       public void testManualHashAssignmentForIntermediateNodeInChain() throws 
Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
                env.setParallelism(4);
 
@@ -409,9 +408,6 @@ public class StreamingJobGraphGeneratorNodeHashTest extends 
TestLogger {
 
        /**
         * Tests that a manual hash at the beginning of a chain is accepted.
-        *
-        * <p>This should work, because the ID is used at the beginning of a 
chain. This is currently
-        * not allowed for intermediate nodes (see {@link 
#testManualHashAssignmentForIntermediateNodeInChainThrowsException()}).
         */
        @Test
        public void testManualHashAssignmentForStartNodeInInChain() throws 
Exception {
@@ -446,7 +442,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends 
TestLogger {
        }
 
        @Test
-       public void testUserProvidedHashingOnChainNotSupported() {
+       public void testUserProvidedHashingOnChainSupported() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
 
                env.addSource(new NoOpSourceFunction(), 
"src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
@@ -455,11 +451,7 @@ public class StreamingJobGraphGeneratorNodeHashTest 
extends TestLogger {
                                .keyBy(new NoOpKeySelector())
                                .reduce(new 
NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
 
-               try {
-                       env.getStreamGraph().getJobGraph();
-                       fail();
-               } catch (UnsupportedOperationException ignored) {
-               }
+               env.getStreamGraph().getJobGraph();
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index e0de7d2..72a1b63 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -32,28 +32,32 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -93,7 +97,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -102,7 +108,6 @@ import static 
org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSav
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -310,30 +315,43 @@ public class SavepointITCase extends TestLogger {
                                };
                        }};
 
+                       ExecutionGraph graph = (ExecutionGraph) 
((JobManagerMessages.JobFound) Await.result(jobManager.ask(new 
JobManagerMessages.RequestJob(jobId), deadline.timeLeft()), 
deadline.timeLeft())).executionGraph();
+
                        // - Verification START 
-------------------------------------------
 
                        String errMsg = "Error during gathering of 
TaskDeploymentDescriptors";
-                       assertNull(errMsg, error[0]);
+                       if (error[0] != null) {
+                               throw new RuntimeException(error[0]);
+                       }
+
+                       Map<OperatorID, Tuple2<Integer, ExecutionJobVertex>> 
operatorToJobVertexMapping = new HashMap<>();
+                       for (ExecutionJobVertex task : 
graph.getVerticesTopologically()) {
+                               List<OperatorID> operatorIDs = 
task.getOperatorIDs();
+                               for (int x = 0; x < operatorIDs.size(); x++) {
+                                       
operatorToJobVertexMapping.put(operatorIDs.get(x), new Tuple2<>(x, task));
+                               }
+                       }
 
                        // Verify that all tasks, which are part of the 
savepoint
                        // have a matching task deployment descriptor.
-                       for (TaskState taskState : savepoint.getTaskStates()) {
-                               Collection<TaskDeploymentDescriptor> taskTdds = 
tdds.get(taskState.getJobVertexID());
+                       for (OperatorState operatorState : 
savepoint.getOperatorStates()) {
+                               Tuple2<Integer, ExecutionJobVertex> 
chainIndexAndJobVertex = 
operatorToJobVertexMapping.get(operatorState.getOperatorID());
+                               Collection<TaskDeploymentDescriptor> taskTdds = 
tdds.get(chainIndexAndJobVertex.f1.getJobVertexId());
 
                                errMsg = "Missing task for savepoint state for 
operator "
-                                       + taskState.getJobVertexID() + ".";
+                                       + operatorState.getOperatorID() + ".";
                                assertTrue(errMsg, taskTdds.size() > 0);
 
-                               
assertEquals(taskState.getNumberCollectedStates(), taskTdds.size());
+                               
assertEquals(operatorState.getNumberCollectedStates(), taskTdds.size());
 
                                for (TaskDeploymentDescriptor tdd : taskTdds) {
-                                       SubtaskState subtaskState = 
taskState.getState(tdd.getSubtaskIndex());
+                                       OperatorSubtaskState subtaskState = 
operatorState.getState(tdd.getSubtaskIndex());
 
                                        assertNotNull(subtaskState);
 
                                        errMsg = "Initial operator state 
mismatch.";
                                        assertEquals(errMsg, 
subtaskState.getLegacyOperatorState(),
-                                               
tdd.getTaskStateHandles().getLegacyOperatorState());
+                                               
tdd.getTaskStateHandles().getLegacyOperatorState().get(chainIndexAndJobVertex.f0));
                                }
                        }
 
@@ -360,15 +378,13 @@ public class SavepointITCase extends TestLogger {
                        // The checkpoint files
                        List<File> checkpointFiles = new ArrayList<>();
 
-                       for (TaskState stateForTaskGroup : 
savepoint.getTaskStates()) {
-                               for (SubtaskState subtaskState : 
stateForTaskGroup.getStates()) {
-                                       ChainedStateHandle<StreamStateHandle> 
streamTaskState = subtaskState.getLegacyOperatorState();
+                       for (OperatorState stateForTaskGroup : 
savepoint.getOperatorStates()) {
+                               for (OperatorSubtaskState subtaskState : 
stateForTaskGroup.getStates()) {
+                                       StreamStateHandle streamTaskState = 
subtaskState.getLegacyOperatorState();
 
-                                       for (int i = 0; i < 
streamTaskState.getLength(); i++) {
-                                               if (streamTaskState.get(i) != 
null) {
-                                                       FileStateHandle 
fileStateHandle = (FileStateHandle) streamTaskState.get(i);
-                                                       checkpointFiles.add(new 
File(fileStateHandle.getFilePath().toUri()));
-                                               }
+                                       if (streamTaskState != null) {
+                                               FileStateHandle fileStateHandle 
= (FileStateHandle) streamTaskState;
+                                               checkpointFiles.add(new 
File(fileStateHandle.getFilePath().toUri()));
                                        }
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
new file mode 100644
index 0000000..2eecf49
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Option;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Abstract class to verify that it is possible to migrate a 1.2 savepoint to 
1.3 and that the topology can be modified
+ * from that point on.
+ * 
+ * The verification is done in 2 Steps:
+ * Step 1: Migrate the job to 1.3 by submitting the same job used for the 1.2 
savepoint, and create a new savepoint.
+ * Step 2: Modify the job topology, and restore from the savepoint created in 
step 1.
+ */
+public abstract class AbstractOperatorRestoreTestBase {
+
+       @Rule
+       public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+       private static ActorSystem actorSystem = null;
+       private static ActorGateway jobManager = null;
+       private static ActorGateway archiver = null;
+       private static ActorGateway taskManager = null;
+
+       private static final FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
+
+       @BeforeClass
+       public static void setupCluster() throws Exception {
+               FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
+
+               actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
+
+               Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
+                       new Configuration(),
+                       actorSystem,
+                       TestingUtils.defaultExecutor(),
+                       TestingUtils.defaultExecutor(),
+                       Option.apply("jm"),
+                       Option.apply("arch"),
+                       TestingJobManager.class,
+                       TestingMemoryArchivist.class);
+
+               jobManager = new AkkaActorGateway(master._1(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+               archiver = new AkkaActorGateway(master._2(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+               Configuration tmConfig = new Configuration();
+               
tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+
+               ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
+                       tmConfig,
+                       ResourceID.generate(),
+                       actorSystem,
+                       "localhost",
+                       Option.apply("tm"),
+                       Option.<LeaderRetrievalService>apply(new 
StandaloneLeaderRetrievalService(jobManager.path(), 
HighAvailabilityServices.DEFAULT_LEADER_ID)),
+                       true,
+                       TestingTaskManager.class);
+
+               taskManager = new AkkaActorGateway(taskManagerRef, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+               // Wait until connected
+               Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+               Await.ready(taskManager.ask(msg, timeout), timeout);
+       }
+
+       @AfterClass
+       public static void tearDownCluster() {
+               if (actorSystem != null) {
+                       actorSystem.shutdown();
+               }
+
+               if (archiver != null) {
+                       archiver.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
+               }
+
+               if (jobManager != null) {
+                       jobManager.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
+               }
+
+               if (taskManager != null) {
+                       taskManager.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
+               }
+       }
+
+       @Test
+       public void testMigrationAndRestore() throws Throwable {
+               // submit 1.2 job and create a migrated 1.3 savepoint
+               String savepointPath = migrateJob();
+               // restore from migrated 1.3 savepoint
+               restoreJob(savepointPath);
+       }
+
+       private String migrateJob() throws Throwable {
+               URL savepointResource = 
AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/"
 + getMigrationSavepointName());
+               if (savepointResource == null) {
+                       throw new IllegalArgumentException("Savepoint file does 
not exist.");
+               }
+               JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
+               
jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
+
+               Object msg;
+               Object result;
+
+               // Submit job graph
+               msg = new JobManagerMessages.SubmitJob(jobToMigrate, 
ListeningBehaviour.DETACHED);
+               result = Await.result(jobManager.ask(msg, timeout), timeout);
+
+               if (result instanceof JobManagerMessages.JobResultFailure) {
+                       JobManagerMessages.JobResultFailure failure = 
(JobManagerMessages.JobResultFailure) result;
+                       throw new Exception(failure.cause());
+               }
+               Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, 
result.getClass());
+
+               // Wait for all tasks to be running
+               msg = new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
+               Await.result(jobManager.ask(msg, timeout), timeout);
+
+               // Trigger savepoint
+               File targetDirectory = tmpFolder.newFolder();
+               msg = new 
JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), 
targetDirectory.getAbsolutePath());
+               Future<Object> future = jobManager.ask(msg, timeout);
+               result = Await.result(future, timeout);
+
+               if (result instanceof JobManagerMessages.CancellationFailure) {
+                       JobManagerMessages.CancellationFailure failure = 
(JobManagerMessages.CancellationFailure) result;
+                       throw new Exception(failure.cause());
+               }
+
+               String savepointPath = 
((JobManagerMessages.CancellationSuccess) result).savepointPath();
+
+               // Wait until canceled
+               msg = new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), 
JobStatus.CANCELED);
+               Await.ready(jobManager.ask(msg, timeout), timeout);
+
+               return savepointPath;
+       }
+
+       private void restoreJob(String savepointPath) throws Exception {
+               JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
+               
jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath,
 true));
+
+               Object msg;
+               Object result;
+
+               // Submit job graph
+               msg = new JobManagerMessages.SubmitJob(jobToRestore, 
ListeningBehaviour.DETACHED);
+               result = Await.result(jobManager.ask(msg, timeout), timeout);
+
+               if (result instanceof JobManagerMessages.JobResultFailure) {
+                       JobManagerMessages.JobResultFailure failure = 
(JobManagerMessages.JobResultFailure) result;
+                       throw new Exception(failure.cause());
+               }
+               Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, 
result.getClass());
+
+               msg = new 
JobManagerMessages.RequestJobStatus(jobToRestore.getJobID());
+               JobStatus status = ((JobManagerMessages.CurrentJobStatus) 
Await.result(jobManager.ask(msg, timeout), timeout)).status();
+               while (!status.isTerminalState()) {
+                       status = ((JobManagerMessages.CurrentJobStatus) 
Await.result(jobManager.ask(msg, timeout), timeout)).status();
+               }
+
+               Assert.assertEquals(JobStatus.FINISHED, status);
+       }
+
+       private JobGraph createJobGraph(ExecutionMode mode) {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+               env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+               env.setRestartStrategy(RestartStrategies.noRestart());
+               env.setStateBackend(new MemoryStateBackend());
+
+               switch (mode) {
+                       case MIGRATE:
+                               createMigrationJob(env);
+                               break;
+                       case RESTORE:
+                               createRestoredJob(env);
+                               break;
+               }
+
+               return 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+       }
+
+       /**
+        * Recreates the job used to create the 1.2 savepoint.
+        *
+        * @param env StreamExecutionEnvironment to use
+        */
+       protected abstract void createMigrationJob(StreamExecutionEnvironment 
env);
+
+       /**
+        * Creates a modified version of the job used to create the 1.2 
savepoint.
+        *
+        * @param env StreamExecutionEnvironment to use
+        */
+       protected abstract void createRestoredJob(StreamExecutionEnvironment 
env);
+
+       /**
+        * Returns the name of the savepoint directory to use, relative to 
"resources/operatorstate".
+        *
+        * @return savepoint directory to use
+        */
+       protected abstract String getMigrationSavepointName();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
new file mode 100644
index 0000000..f333aca
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore;
+
+/**
+ * Enum to control function behavior for the different test stages.
+ * 
+ * {@link ExecutionMode#GENERATE} should be used when creating the 1.2 
savepoint.
+ * {@link ExecutionMode#MIGRATE} should be used when migrating the 1.2 
savepoint to 1.3.
+ * {@link ExecutionMode#RESTORE} should be used when restoring from the 
migrated 1.3 savepoint.
+ */
+public enum ExecutionMode {
+       GENERATE,
+       MIGRATE,
+       RESTORE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
new file mode 100644
index 0000000..28cd15a
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.keyed;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+public class KeyedComplexChainTest extends AbstractOperatorRestoreTestBase {
+
+       @Override
+       public void createMigrationJob(StreamExecutionEnvironment env) {
+               /**
+                * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2)
+                */
+               SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = 
KeyedJob.createIntegerTupleSource(env, ExecutionMode.MIGRATE);
+
+               SingleOutputStreamOperator<Integer> window = 
KeyedJob.createWindowFunction(ExecutionMode.MIGRATE, source);
+
+               SingleOutputStreamOperator<Integer> first = 
KeyedJob.createFirstStatefulMap(ExecutionMode.MIGRATE, window);
+
+               SingleOutputStreamOperator<Integer> second = 
KeyedJob.createSecondStatefulMap(ExecutionMode.MIGRATE, first);
+       }
+
+       @Override
+       protected void createRestoredJob(StreamExecutionEnvironment env) {
+               /**
+                * Source -> keyBy -> C(Window -> StatefulMap2) -> StatefulMap1
+                */
+               SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = 
KeyedJob.createIntegerTupleSource(env, ExecutionMode.RESTORE);
+
+               SingleOutputStreamOperator<Integer> window = 
KeyedJob.createWindowFunction(ExecutionMode.RESTORE, source);
+
+               SingleOutputStreamOperator<Integer> second = 
KeyedJob.createSecondStatefulMap(ExecutionMode.RESTORE, window);
+
+               SingleOutputStreamOperator<Integer> first = 
KeyedJob.createFirstStatefulMap(ExecutionMode.RESTORE, second);
+               first.startNewChain();
+       }
+
+       @Override
+       protected final String getMigrationSavepointName() {
+               return "complexKeyed";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
new file mode 100644
index 0000000..6add7b2
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.keyed;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Savepoint generator to create the job used by the {@link 
KeyedComplexChainTest}.
+ *
+ * The job should be cancelled manually through the REST API using the 
cancel-with-savepoint operation.
+ */
+public class KeyedJob {
+
+       public static void main(String[] args) throws Exception {
+               ParameterTool pt = ParameterTool.fromArgs(args);
+
+               String savepointsPath = pt.getRequired("savepoint-path");
+
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, 
savepointsPath);
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
+               env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+               env.setRestartStrategy(RestartStrategies.noRestart());
+
+               env.setStateBackend(new MemoryStateBackend());
+
+               /**
+                * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2)
+                */
+
+               SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = 
createIntegerTupleSource(env, ExecutionMode.GENERATE);
+
+               SingleOutputStreamOperator<Integer> window = 
createWindowFunction(ExecutionMode.GENERATE, source);
+
+               SingleOutputStreamOperator<Integer> first = 
createFirstStatefulMap(ExecutionMode.GENERATE, window);
+
+               SingleOutputStreamOperator<Integer> second = 
createSecondStatefulMap(ExecutionMode.GENERATE, first);
+
+               env.execute("job");
+       }
+
+       public static SingleOutputStreamOperator<Tuple2<Integer, Integer>> 
createIntegerTupleSource(StreamExecutionEnvironment env, ExecutionMode mode) {
+               return env.addSource(new IntegerTupleSource(mode));
+       }
+
+       public static SingleOutputStreamOperator<Integer> 
createWindowFunction(ExecutionMode mode, DataStream<Tuple2<Integer, Integer>> 
input) {
+               return input
+                       .keyBy(0)
+                       .countWindow(1)
+                       .apply(new StatefulWindowFunction(mode))
+                       .setParallelism(4)
+                       .uid("window");
+       }
+
+       public static SingleOutputStreamOperator<Integer> 
createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+               SingleOutputStreamOperator<Integer> map = input
+                       .map(new StatefulStringStoringMap(mode, "first"))
+                       .setParallelism(4);
+
+               // TODO: re-enable this when generating the actual 1.2 savepoint
+               //if (mode == ExecutionMode.MIGRATE || mode == 
ExecutionMode.RESTORE) {
+               map.uid("first");
+               //}
+
+               return map;
+       }
+
+       public static SingleOutputStreamOperator<Integer> 
createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+               SingleOutputStreamOperator<Integer> map = input
+                       .map(new StatefulStringStoringMap(mode, "second"))
+                       .setParallelism(4);
+
+               // TODO: re-enable this when generating the actual 1.2 savepoint
+               //if (mode == ExecutionMode.MIGRATE || mode == 
ExecutionMode.RESTORE) {
+               map.uid("second");
+               //}
+
+               return map;
+       }
+
+       private static final class IntegerTupleSource extends 
RichSourceFunction<Tuple2<Integer, Integer>> {
+
+               private static final long serialVersionUID = 
1912878510707871659L;
+               private final ExecutionMode mode;
+
+               private boolean running = true;
+
+               private IntegerTupleSource(ExecutionMode mode) {
+                       this.mode = mode;
+               }
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, Integer>> ctx) 
throws Exception {
+                       for (int x = 0; x < 10; x++) {
+                               ctx.collect(new Tuple2<>(x, x));
+                       }
+
+                       switch (mode) {
+                               case GENERATE:
+                               case MIGRATE:
+                                       synchronized (this) {
+                                               while (running) {
+                                                       this.wait();
+                                               }
+                                       }
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       synchronized (this) {
+                               running = false;
+                               this.notifyAll();
+                       }
+               }
+       }
+
+       private static final class StatefulWindowFunction extends 
RichWindowFunction<Tuple2<Integer, Integer>, Integer, Tuple, GlobalWindow> {
+
+               private static final long serialVersionUID = 
-7236313076792964055L;
+
+               private final ExecutionMode mode;
+               private transient ListState<Integer> state;
+
+               private boolean applyCalled = false;
+
+               private StatefulWindowFunction(ExecutionMode mode) {
+                       this.mode = mode;
+               }
+
+               @Override
+               public void open(Configuration config) {
+                       this.state = getRuntimeContext().getListState(new 
ListStateDescriptor<>("values", Integer.class));
+               }
+
+               @Override
+               public void apply(Tuple key, GlobalWindow window, 
Iterable<Tuple2<Integer, Integer>> values, Collector<Integer> out) throws 
Exception {
+                       // fail-safe to make sure apply is actually called
+                       applyCalled = true;
+                       switch (mode) {
+                               case GENERATE:
+                                       for (Tuple2<Integer, Integer> value : 
values) {
+                                               state.add(value.f1);
+                                       }
+                                       break;
+                               case MIGRATE:
+                               case RESTORE:
+                                       Iterator<Tuple2<Integer, Integer>> 
input = values.iterator();
+                                       Iterator<Integer> restored = 
state.get().iterator();
+                                       while (input.hasNext() && 
restored.hasNext()) {
+                                               Tuple2<Integer, Integer> value 
= input.next();
+                                               Integer rValue = 
restored.next();
+                                               Assert.assertEquals(rValue, 
value.f1);
+                                       }
+                                       Assert.assertEquals(restored.hasNext(), 
input.hasNext());
+                       }
+               }
+
+               @Override
+               public void close() {
+                       Assert.assertTrue("Apply was never called.", 
applyCalled);
+               }
+       }
+
+       private static class StatefulStringStoringMap extends 
RichMapFunction<Integer, Integer> implements ListCheckpointed<String> {
+
+               private static final long serialVersionUID = 
6092985758425330235L;
+               private final ExecutionMode mode;
+               private final String valueToStore;
+
+               private StatefulStringStoringMap(ExecutionMode mode, String 
valueToStore) {
+                       this.mode = mode;
+                       this.valueToStore = valueToStore;
+               }
+
+               @Override
+               public Integer map(Integer value) throws Exception {
+                       return value;
+               }
+
+               @Override
+               public List<String> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Arrays.asList(valueToStore + 
getRuntimeContext().getIndexOfThisSubtask());
+               }
+
+               @Override
+               public void restoreState(List<String> state) throws Exception {
+                       switch (mode) {
+                               case GENERATE:
+                                       break;
+                               case MIGRATE:
+                               case RESTORE:
+                                       Assert.assertEquals("Failed for " + 
valueToStore + getRuntimeContext().getIndexOfThisSubtask(), 1, state.size());
+                                       String value = state.get(0);
+                                       Assert.assertEquals(valueToStore + 
getRuntimeContext().getIndexOfThisSubtask(), value);
+                       }
+               }
+       }
+
+
+       private KeyedJob() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
new file mode 100644
index 0000000..5b51765
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * All classes extending this class will use the same savepoint and migration 
job.
+ */
+public abstract class AbstractNonKeyedOperatorRestoreTestBase extends 
AbstractOperatorRestoreTestBase {
+
+       @Override
+       public void createMigrationJob(StreamExecutionEnvironment env) {
+               /**
+                * Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> 
StatefulMap3)
+                */
+               DataStream<Integer> source = createSource(env, 
ExecutionMode.MIGRATE);
+
+               SingleOutputStreamOperator<Integer> first = 
createFirstStatefulMap(ExecutionMode.MIGRATE, source);
+               first.startNewChain();
+
+               SingleOutputStreamOperator<Integer> second = 
createSecondStatefulMap(ExecutionMode.MIGRATE, first);
+               second.startNewChain();
+
+               SingleOutputStreamOperator<Integer> stateless = 
createStatelessMap(second);
+
+               SingleOutputStreamOperator<Integer> third = 
createThirdStatefulMap(ExecutionMode.MIGRATE, stateless);
+       }
+
+       @Override
+       protected final String getMigrationSavepointName() {
+               return "nonKeyed";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
new file mode 100644
index 0000000..6838070
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change 
breaks up a chain.
+ */
+public class ChainBreakTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+       @Override
+       public void createRestoredJob(StreamExecutionEnvironment env) {
+               /**
+                * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 
-> Map -> StatefulMap3)
+                * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 
-> Map) -> StatefulMap3
+                */
+               DataStream<Integer> source = createSource(env, 
ExecutionMode.RESTORE);
+
+               SingleOutputStreamOperator<Integer> first = 
createFirstStatefulMap(ExecutionMode.RESTORE, source);
+               first.startNewChain();
+
+               SingleOutputStreamOperator<Integer> second = 
createSecondStatefulMap(ExecutionMode.RESTORE, first);
+               second.startNewChain();
+
+               SingleOutputStreamOperator<Integer> stateless = 
createStatelessMap(second);
+
+               SingleOutputStreamOperator<Integer> third = 
createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
+               third.startNewChain();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
new file mode 100644
index 0000000..e405e76
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change 
removes an operator from a chain.
+ */
+public class ChainLengthDecreaseTest extends 
AbstractNonKeyedOperatorRestoreTestBase {
+
+       @Override
+       public void createRestoredJob(StreamExecutionEnvironment env) {
+               /**
+                * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 
-> Map -> StatefulMap3)
+                * Modified job: Source -> StatefulMap1 -> CHAIN(Map -> 
StatefulMap3)
+                */
+               DataStream<Integer> source = createSource(env, 
ExecutionMode.RESTORE);
+
+               SingleOutputStreamOperator<Integer> first = 
createFirstStatefulMap(ExecutionMode.RESTORE, source);
+               first.startNewChain();
+
+               SingleOutputStreamOperator<Integer> stateless = 
createStatelessMap(first);
+               stateless.startNewChain();
+
+               SingleOutputStreamOperator<Integer> third = 
createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
new file mode 100644
index 0000000..b78aa10
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operator is restored if a topology change 
adds an operator to a chain.
+ */
+public class ChainLengthIncreaseTest extends 
AbstractNonKeyedOperatorRestoreTestBase {
+
+       @Override
+       public void createRestoredJob(StreamExecutionEnvironment env) {
+               /**
+                * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 
-> Map -> StatefulMap3)
+                * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 
-> Map -> StatefulMap3 -> StatefulMap4)
+                */
+               DataStream<Integer> source = createSource(env, 
ExecutionMode.RESTORE);
+
+               SingleOutputStreamOperator<Integer> first = 
createFirstStatefulMap(ExecutionMode.RESTORE, source);
+               first.startNewChain();
+
+               SingleOutputStreamOperator<Integer> second = 
createSecondStatefulMap(ExecutionMode.RESTORE, first);
+               second.startNewChain();
+
+               SingleOutputStreamOperator<Integer> stateless = 
createStatelessMap(second);
+
+               SingleOutputStreamOperator<Integer> stateless2 = 
createStatelessMap(stateless);
+
+               SingleOutputStreamOperator<Integer> third = 
createThirdStatefulMap(ExecutionMode.RESTORE, stateless2);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
new file mode 100644
index 0000000..7c68b4e
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change 
causes the ordering of a chain to change.
+ */
+public class ChainOrderTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+       @Override
+       public void createRestoredJob(StreamExecutionEnvironment env) {
+               /**
+                * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 
-> Map -> StatefulMap3)
+                * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap3 
-> Map -> StatefulMap2)
+                */
+               DataStream<Integer> source = createSource(env, 
ExecutionMode.RESTORE);
+
+               SingleOutputStreamOperator<Integer> first = 
createFirstStatefulMap(ExecutionMode.RESTORE, source);
+               first.startNewChain();
+
+               SingleOutputStreamOperator<Integer> third = 
createThirdStatefulMap(ExecutionMode.RESTORE, first);
+               third.startNewChain();
+
+               SingleOutputStreamOperator<Integer> stateless = 
createStatelessMap(third);
+
+               SingleOutputStreamOperator<Integer> second = 
createSecondStatefulMap(ExecutionMode.RESTORE, stateless);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
new file mode 100644
index 0000000..3f2fba4
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static 
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change 
creates a new chain.
+ */
+public class ChainUnionTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+       @Override
+       public void createRestoredJob(StreamExecutionEnvironment env) {
+               /**
+                * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 
-> Map -> StatefulMap3)
+                * Modified job: Source -> CHAIN(StatefulMap1 -> StatefulMap2 
-> Map -> StatefulMap3)
+                */
+               DataStream<Integer> source = createSource(env, 
ExecutionMode.RESTORE);
+
+               SingleOutputStreamOperator<Integer> first = 
createFirstStatefulMap(ExecutionMode.RESTORE, source);
+               first.startNewChain();
+
+               SingleOutputStreamOperator<Integer> second = 
createSecondStatefulMap(ExecutionMode.RESTORE, first);
+
+               SingleOutputStreamOperator<Integer> stateless = 
createStatelessMap(second);
+
+               SingleOutputStreamOperator<Integer> third = 
createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
new file mode 100644
index 0000000..32067b3
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.junit.Assert;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Savepoint generator to create the job used by the {@link 
AbstractNonKeyedOperatorRestoreTestBase}.
+ *
+ * The job should be cancelled manually through the REST API using the 
cancel-with-savepoint operation.
+ */
+public class NonKeyedJob {
+
+       public static void main(String[] args) throws Exception {
+               ParameterTool pt = ParameterTool.fromArgs(args);
+
+               String savepointsPath = pt.getRequired("savepoint-path");
+
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, 
savepointsPath);
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
+               env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+               env.setRestartStrategy(RestartStrategies.noRestart());
+
+               env.setStateBackend(new MemoryStateBackend());
+
+               /**
+                * Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> 
StatefulMap3)
+                */
+               DataStream<Integer> source = createSource(env, 
ExecutionMode.GENERATE);
+
+               SingleOutputStreamOperator<Integer> first = 
createFirstStatefulMap(ExecutionMode.GENERATE, source);
+               first.startNewChain();
+
+               SingleOutputStreamOperator<Integer> second = 
createSecondStatefulMap(ExecutionMode.GENERATE, first);
+               second.startNewChain();
+
+               SingleOutputStreamOperator<Integer> stateless = 
createStatelessMap(second);
+
+               SingleOutputStreamOperator<Integer> third = 
createThirdStatefulMap(ExecutionMode.GENERATE, stateless);
+
+               env.execute("job");
+       }
+
+       public static SingleOutputStreamOperator<Integer> 
createSource(StreamExecutionEnvironment env, ExecutionMode mode) {
+               return env.addSource(new IntegerSource(mode))
+                       .setParallelism(4);
+       }
+
+       public static SingleOutputStreamOperator<Integer> 
createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+               return input
+                       .map(new StatefulStringStoringMap(mode, "first"))
+                       .setParallelism(4)
+                       .uid("first");
+       }
+
+       public static SingleOutputStreamOperator<Integer> 
createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+               return input
+                       .map(new StatefulStringStoringMap(mode, "second"))
+                       .setParallelism(4)
+                       .uid("second");
+       }
+
+       public static SingleOutputStreamOperator<Integer> 
createThirdStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+               SingleOutputStreamOperator<Integer> map = input
+                       .map(new StatefulStringStoringMap(mode, "third"))
+                       .setParallelism(4);
+
+               // we cannot set the uid on a chained operator in 1.2
+               if (mode == ExecutionMode.MIGRATE || mode == 
ExecutionMode.RESTORE) {
+                       map.uid("third");
+               }
+
+               return map;
+       }
+
+       public static SingleOutputStreamOperator<Integer> 
createStatelessMap(DataStream<Integer> input) {
+               return input.map(new NoOpMapFunction())
+                       .setParallelism(4);
+       }
+
+       private static class StatefulStringStoringMap extends 
RichMapFunction<Integer, Integer> implements ListCheckpointed<String> {
+
+               private static final long serialVersionUID = 
6092985758425330235L;
+               private final ExecutionMode mode;
+               private final String valueToStore;
+
+               private StatefulStringStoringMap(ExecutionMode mode, String 
valueToStore) {
+                       this.mode = mode;
+                       this.valueToStore = valueToStore;
+               }
+
+               @Override
+               public Integer map(Integer value) throws Exception {
+                       return value;
+               }
+
+               @Override
+               public List<String> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Arrays.asList(valueToStore + 
getRuntimeContext().getIndexOfThisSubtask());
+               }
+
+               @Override
+               public void restoreState(List<String> state) throws Exception {
+                       switch (mode) {
+                               case GENERATE:
+                                       break;
+                               case MIGRATE:
+                               case RESTORE:
+                                       Assert.assertEquals("Failed for " + 
valueToStore + getRuntimeContext().getIndexOfThisSubtask(), 1, state.size());
+                                       String value = state.get(0);
+                                       Assert.assertEquals(valueToStore + 
getRuntimeContext().getIndexOfThisSubtask(), value);
+                       }
+               }
+       }
+
+       private static class NoOpMapFunction implements MapFunction<Integer, 
Integer> {
+
+               private static final long serialVersionUID = 
6584823409744624276L;
+
+               @Override
+               public Integer map(Integer value) throws Exception {
+                       return value;
+               }
+       }
+
+       private static final class IntegerSource extends 
RichParallelSourceFunction<Integer> {
+
+               private static final long serialVersionUID = 
1912878510707871659L;
+               private final ExecutionMode mode;
+
+               private volatile boolean running = true;
+
+               private IntegerSource(ExecutionMode mode) {
+                       this.mode = mode;
+               }
+
+               @Override
+               public void run(SourceContext<Integer> ctx) throws Exception {
+                       ctx.collect(1);
+
+                       switch (mode) {
+                               case GENERATE:
+                               case MIGRATE:
+                                       // keep the job running until 
cancel-with-savepoint was done
+                                       synchronized (this) {
+                                               while (running) {
+                                                       this.wait();
+                                               }
+                                       }
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       synchronized (this) {
+                               running = false;
+                               this.notifyAll();
+                       }
+               }
+       }
+
+       private NonKeyedJob() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata 
b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
new file mode 100644
index 0000000..9e03876
Binary files /dev/null and 
b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata 
b/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata
new file mode 100644
index 0000000..8fcd1ea
Binary files /dev/null and 
b/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata differ

Reply via email to