This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c3a9293  [FLINK-10712] Support state restore for 
RestartPipelinedRegionStrategy
c3a9293 is described below

commit c3a929346f9993d5177978053fc0bedd482faf7c
Author: Yun Tang <myas...@live.com>
AuthorDate: Wed Apr 17 19:48:40 2019 +0800

    [FLINK-10712] Support state restore for RestartPipelinedRegionStrategy
    
    This closes #7813.
---
 .../fs/RollingSinkFaultToleranceITCase.java        |  12 +-
 .../BucketingSinkFaultToleranceITCase.java         |  12 +-
 .../runtime/checkpoint/CheckpointCoordinator.java  |  15 +-
 .../flink/runtime/executiongraph/Execution.java    |   2 -
 .../executiongraph/failover/FailoverRegion.java    |  28 +-
 .../failover/RestartPipelinedRegionStrategy.java   |  18 +-
 ...ncurrentFailoverStrategyExecutionGraphTest.java |   4 +-
 .../runtime/executiongraph/FailoverRegionTest.java | 219 +++++++++++-
 .../ContinuousFileProcessingCheckpointITCase.java  |  23 +-
 .../test/checkpointing/RegionFailoverITCase.java   | 394 +++++++++++++++++++++
 .../StreamFaultToleranceTestBase.java              |  87 ++++-
 11 files changed, 748 insertions(+), 66 deletions(-)

diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index 886055d..daa8884 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -32,9 +32,9 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 
@@ -77,8 +77,8 @@ public class RollingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBas
 
        private static String outPath;
 
-       @BeforeClass
-       public static void createHDFS() throws IOException {
+       @Before
+       public void createHDFS() throws IOException {
                Configuration conf = new Configuration();
 
                File dataDir = tempFolder.newFolder();
@@ -94,8 +94,8 @@ public class RollingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBas
                                + "/string-non-rolling-out";
        }
 
-       @AfterClass
-       public static void destroyHDFS() {
+       @After
+       public void destroyHDFS() {
                if (hdfsCluster != null) {
                        hdfsCluster.shutdown();
                }
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
index dcb77bf..71b210d 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
@@ -32,9 +32,9 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 
@@ -77,8 +77,8 @@ public class BucketingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestB
 
        private static String outPath;
 
-       @BeforeClass
-       public static void createHDFS() throws IOException {
+       @Before
+       public void createHDFS() throws IOException {
                Configuration conf = new Configuration();
 
                File dataDir = tempFolder.newFolder();
@@ -94,8 +94,8 @@ public class BucketingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestB
                                + "/string-non-rolling-out";
        }
 
-       @AfterClass
-       public static void destroyHDFS() {
+       @After
+       public void destroyHDFS() {
                if (hdfsCluster != null) {
                        hdfsCluster.shutdown();
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index ac4c1e2..b6cbc66 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1202,12 +1202,23 @@ public class CheckpointCoordinator {
                                currentPeriodicTrigger = null;
                        }
 
+                       abortPendingCheckpoints(new Exception("Checkpoint 
Coordinator is suspending."));
+
+                       numUnsuccessfulCheckpointsTriggers.set(0);
+               }
+       }
+
+       /**
+        * Aborts all the pending checkpoints due to en exception.
+        * @param exception The exception.
+        */
+       public void abortPendingCheckpoints(Exception exception) {
+               synchronized (lock) {
                        for (PendingCheckpoint p : pendingCheckpoints.values()) 
{
-                               p.abortError(new Exception("Checkpoint 
Coordinator is suspending."));
+                               p.abortError(exception);
                        }
 
                        pendingCheckpoints.clear();
-                       numUnsuccessfulCheckpointsTriggers.set(0);
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index dd8db3a..59f79a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -89,7 +89,6 @@ import static 
org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
 import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A single execution of a vertex. While an {@link ExecutionVertex} can be 
executed multiple times
@@ -365,7 +364,6 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * @param taskRestore information to restore the state
         */
        public void setInitialState(@Nullable JobManagerTaskRestore 
taskRestore) {
-               checkState(state == CREATED, "Can only assign operator state 
when execution attempt is in CREATED");
                this.taskRestore = taskRestore;
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 6235750..635a7f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -23,9 +23,11 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.util.AbstractID;
@@ -38,6 +40,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
@@ -64,15 +67,19 @@ public class FailoverRegion {
 
        private final List<ExecutionVertex> connectedExecutionVertexes;
 
+       private final Map<JobVertexID, ExecutionJobVertex> tasks;
+
        /** Current status of the job execution */
        private volatile JobStatus state = JobStatus.RUNNING;
 
        public FailoverRegion(
                ExecutionGraph executionGraph,
-               List<ExecutionVertex> connectedExecutions) {
+               List<ExecutionVertex> connectedExecutions,
+               Map<JobVertexID, ExecutionJobVertex> tasks) {
 
                this.executionGraph = checkNotNull(executionGraph);
                this.connectedExecutionVertexes = 
checkNotNull(connectedExecutions);
+               this.tasks = checkNotNull(tasks);
 
                LOG.debug("Created failover region {} with vertices: {}", id, 
connectedExecutions);
        }
@@ -108,14 +115,7 @@ public class FailoverRegion {
                return state;
        }
 
-       /**
-        * get all execution vertexes contained in this region
-        */
-       public List<ExecutionVertex> getAllExecutionVertexes() {
-               return connectedExecutionVertexes;
-       }
-
-       // Notice the region to failover, 
+       // Notice the region to failover,
        private void failover(long globalModVersionOfFailover) {
                if (!executionGraph.getRestartStrategy().canRestart()) {
                        executionGraph.failGlobal(new 
FlinkException("RestartStrategy validate fail"));
@@ -206,13 +206,15 @@ public class FailoverRegion {
                try {
                        if (transitionState(JobStatus.CREATED, 
JobStatus.RUNNING)) {
                                // if we have checkpointed state, reload it 
into the executions
-                               //TODO: checkpoint support restore part 
ExecutionVertex cp
-                               /**
                                if (executionGraph.getCheckpointCoordinator() 
!= null) {
+                                       // we restart the checkpoint scheduler 
for
+                                       // i) enable new checkpoint could be 
triggered without waiting for last checkpoint expired.
+                                       // ii) ensure the EXACTLY_ONCE 
semantics if needed.
+                                       
executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(new 
Exception("FailoverRegion is restarting."));
+
                                        
executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
-                                                       
connectedExecutionVertexes, false, false);
+                                               tasks, false, true);
                                }
-                               */
 
                                HashSet<AllocationID> 
previousAllocationsInRegion = new HashSet<>(connectedExecutionVertexes.size());
                                for (ExecutionVertex connectedExecutionVertex : 
connectedExecutionVertexes) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
index 25fd290..811c0a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -36,11 +37,12 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A failover strategy that restarts regions of the ExecutionGraph. A region 
is defined
+ * A failover strategy that restarts regions of the ExecutionGraph with state. 
A region is defined
  * by this strategy as the weakly connected component of tasks that 
communicate via pipelined
  * data exchange.
  */
@@ -222,7 +224,19 @@ public class RestartPipelinedRegionStrategy extends 
FailoverStrategy {
 
        @VisibleForTesting
        protected FailoverRegion createFailoverRegion(ExecutionGraph eg, 
List<ExecutionVertex> connectedExecutions) {
-               return new FailoverRegion(eg, connectedExecutions);
+               Map<JobVertexID, ExecutionJobVertex> tasks = 
initTasks(connectedExecutions);
+               return new FailoverRegion(eg, connectedExecutions, tasks);
+       }
+
+       @VisibleForTesting
+       protected Map<JobVertexID, ExecutionJobVertex> 
initTasks(List<ExecutionVertex> connectedExecutions) {
+               Map<JobVertexID, ExecutionJobVertex> tasks = new 
HashMap<>(connectedExecutions.size());
+               for (ExecutionVertex executionVertex : connectedExecutions) {
+                       JobVertexID jobvertexId = 
executionVertex.getJobvertexId();
+                       ExecutionJobVertex jobVertex = 
executionVertex.getJobVertex();
+                       tasks.putIfAbsent(jobvertexId, jobVertex);
+               }
+               return tasks;
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
index 697b774..9ac72a2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
@@ -521,7 +522,8 @@ public class ConcurrentFailoverStrategyExecutionGraphTest 
extends TestLogger {
 
                @Override
                protected FailoverRegion createFailoverRegion(ExecutionGraph 
eg, List<ExecutionVertex> connectedExecutions) {
-                       return new FailoverRegion(eg, connectedExecutions) {
+                       Map<JobVertexID, ExecutionJobVertex> tasks = 
initTasks(connectedExecutions);
+                       return new FailoverRegion(eg, connectedExecutions, 
tasks) {
                                @Override
                                protected CompletableFuture<Void> 
createTerminationFutureOverAllConnectedVertexes() {
                                        ArrayList<CompletableFuture<?>> 
terminationAndBlocker = new ArrayList<>(2);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index 240d236..e555704 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -19,8 +19,25 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
@@ -28,54 +45,85 @@ import 
org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionSt
 import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilFailoverRegionState;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 public class FailoverRegionTest extends TestLogger {
 
+       private static final long checkpointId = 42L;
+
        /**
-        * Tests that a job only has one failover region and can recover from 
task failure successfully
-        * @throws Exception
+        * Tests that a job only has one failover region and can recover from 
task failure successfully with state.
+        * @throws Exception if fail to create the single region execution 
graph or fail to acknowledge all checkpoints.
         */
        @Test
        public void testSingleRegionFailover() throws Exception {
                RestartStrategy restartStrategy = new 
InfiniteDelayRestartStrategy(10);
                ExecutionGraph eg = 
createSingleRegionExecutionGraph(restartStrategy);
-               RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+               RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy) eg.getFailoverStrategy();
 
                ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
 
+               assertNotNull(eg.getCheckpointCoordinator());
+               
assertFalse(eg.getCheckpointCoordinator().getPendingCheckpoints().isEmpty());
+
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev).getState());
 
+               acknowledgeAllCheckpoints(eg.getCheckpointCoordinator(), 
eg.getAllExecutionVertices().iterator());
+
+               // verify checkpoint has been completed successfully.
+               assertEquals(1, 
eg.getCheckpointCoordinator().getCheckpointStore().getNumberOfRetainedCheckpoints());
+               assertEquals(checkpointId, 
eg.getCheckpointCoordinator().getCheckpointStore().getLatestCheckpoint().getCheckpointID());
+
                ev.getCurrentExecutionAttempt().fail(new Exception("Test 
Exception"));
                assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev).getState());
 
                for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
                        evs.getCurrentExecutionAttempt().completeCancelling();
                }
+
+               verifyCheckpointRestoredAsExpected(eg);
+
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev).getState());
        }
 
@@ -98,8 +146,9 @@ public class FailoverRegionTest extends TestLogger {
                final JobID jobId = new JobID();
                final String jobName = "Test Job Sample Name";
 
-               final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 
20);
-                               
+               final Map<ExecutionAttemptID, JobManagerTaskRestore> 
attemptIDInitStateMap = new HashMap<>();
+               final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 
20, new CollectTddTaskManagerGateway(attemptIDInitStateMap));
+
                JobVertex v1 = new JobVertex("vertex1");
                JobVertex v2 = new JobVertex("vertex2");
                JobVertex v3 = new JobVertex("vertex3");
@@ -133,9 +182,8 @@ public class FailoverRegionTest extends TestLogger {
                        slotProvider);
 
                eg.attachJobGraph(ordered);
-               
eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
 
-               RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+               RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy) eg.getFailoverStrategy();
 
                // the following two vertices are in the same failover region
                ExecutionVertex ev11 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[0];
@@ -148,11 +196,21 @@ public class FailoverRegionTest extends TestLogger {
                // the following vertices are in one failover region
                ExecutionVertex ev31 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[0];
                ExecutionVertex ev32 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[1];
-               ExecutionVertex ev4 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+               ExecutionVertex ev4 = 
eg.getJobVertex(v4.getID()).getTaskVertices()[0];
+
+               enableCheckpointing(eg);
 
+               
eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
                eg.scheduleForExecution();
+               assertEquals(JobStatus.RUNNING, eg.getState());
+
+               attachPendingCheckpoints(eg);
 
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev21).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev31).getState());
+
+               acknowledgeAllCheckpoints(eg.getCheckpointCoordinator(), 
Arrays.asList(ev11, ev21, ev12, ev22, ev31, ev32, ev4).iterator());
 
                ev21.scheduleForExecution(slotProvider, true, 
LocationPreferenceConstraint.ALL, Collections.emptySet());
                ev21.getCurrentExecutionAttempt().fail(new Exception("New 
fail"));
@@ -161,6 +219,8 @@ public class FailoverRegionTest extends TestLogger {
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev31).getState());
 
                ev11.getCurrentExecutionAttempt().completeCancelling();
+               verifyCheckpointRestoredAsExpected(eg);
+
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev22).getState());
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev31).getState());
@@ -182,14 +242,18 @@ public class FailoverRegionTest extends TestLogger {
                assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev31).getState());
 
                ev32.getCurrentExecutionAttempt().completeCancelling();
+               verifyCheckpointRestoredAsExpected(eg);
+
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev22).getState());
                assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev31).getState());
+
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev4).getState());
        }
 
        /**
-        * Tests that when a task fail, and restart strategy doesn't support 
restarting, the job will go to failed
-        * @throws Exception
+        * Tests that when a task fail, and restart strategy doesn't support 
restarting, the job will go to failed.
+        * @throws Exception if fail to create the single region execution 
graph.
         */
        @Test
        public void testNoManualRestart() throws Exception {
@@ -207,8 +271,9 @@ public class FailoverRegionTest extends TestLogger {
        }
 
        /**
-        * Tests that two failover regions failover at the same time, they will 
not influence each other
-        * @throws Exception
+        * Tests that two regions failover at the same time, they will not 
influence each other.
+        * @throws Exception if fail to create dummy job information, fail to 
schedule for execution
+        * or timeout before region switches to expected status.
         */
        @Test
        public void testMultiRegionFailoverAtSameTime() throws Exception {
@@ -282,7 +347,7 @@ public class FailoverRegionTest extends TestLogger {
         * Tests that if a task reports the result of its preceding task is 
failed,
         * its preceding task will be considered as failed, and start to 
failover
         * TODO: as the report part is not finished yet, this case is ignored 
temporarily
-        * @throws Exception
+        * @throws Exception if fail to create dummy job information or fail to 
schedule for execution.
         */
        @Ignore
        @Test
@@ -335,8 +400,8 @@ public class FailoverRegionTest extends TestLogger {
        }
 
        /**
-        * Tests that a new failure comes while the failover region is in 
CANCELLING
-        * @throws Exception
+        * Tests that a new failure comes while the failover region is in 
CANCELLING.
+        * @throws Exception if fail to create the single region execution 
graph.
         */
        @Test
        public void testFailWhileCancelling() throws Exception {
@@ -359,8 +424,8 @@ public class FailoverRegionTest extends TestLogger {
        }
 
        /**
-        * Tests that a new failure comes while the failover region is 
restarting
-        * @throws Exception
+        * Tests that a new failure comes while the failover region is 
restarting.
+        * @throws Exception if fail to create the single region execution 
graph.
         */
        @Test
        public void testFailWhileRestarting() throws Exception {
@@ -384,7 +449,19 @@ public class FailoverRegionTest extends TestLogger {
                assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev1).getState());
        }
 
-       private static ExecutionGraph 
createSingleRegionExecutionGraph(RestartStrategy restartStrategy) throws 
Exception {
+       // 
--------------------------------------------------------------------------------------------
+
+       private void verifyCheckpointRestoredAsExpected(ExecutionGraph eg) 
throws Exception {
+               // pending checkpoints have already been cancelled.
+               assertNotNull(eg.getCheckpointCoordinator());
+               
assertTrue(eg.getCheckpointCoordinator().getPendingCheckpoints().isEmpty());
+
+               // verify checkpoint has been restored successfully.
+               assertEquals(1, 
eg.getCheckpointCoordinator().getCheckpointStore().getNumberOfRetainedCheckpoints());
+               assertEquals(checkpointId, 
eg.getCheckpointCoordinator().getCheckpointStore().getLatestCheckpoint().getCheckpointID());
+       }
+
+       private ExecutionGraph createSingleRegionExecutionGraph(RestartStrategy 
restartStrategy) throws Exception {
                final JobID jobId = new JobID();
                final String jobName = "Test Job Sample Name";
 
@@ -425,8 +502,13 @@ public class FailoverRegionTest extends TestLogger {
                        e.printStackTrace();
                        fail("Job failed with exception: " + e.getMessage());
                }
+
+               enableCheckpointing(eg);
+
                
eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
                eg.scheduleForExecution();
+
+               attachPendingCheckpoints(eg);
                return eg;
        }
 
@@ -444,4 +526,105 @@ public class FailoverRegionTest extends TestLogger {
                }
        }
 
+       private static void enableCheckpointing(ExecutionGraph eg) {
+               ArrayList<ExecutionJobVertex> jobVertices = new 
ArrayList<>(eg.getAllVertices().values());
+               eg.enableCheckpointing(
+                               1000,
+                               100,
+                               0,
+                               1,
+                               
CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION,
+                               jobVertices,
+                               jobVertices,
+                               jobVertices,
+                               Collections.emptyList(),
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               new MemoryStateBackend(),
+                               new CheckpointStatsTracker(
+                                       0,
+                                       jobVertices,
+                                       
mock(CheckpointCoordinatorConfiguration.class),
+                                       new UnregisteredMetricsGroup()));
+       }
+
+       /**
+        * Attach pending checkpoints of chk-42 and chk-43 to the execution 
graph.
+        * If {@link #acknowledgeAllCheckpoints(CheckpointCoordinator, 
Iterator)} called then,
+        * chk-42 would become the completed checkpoint.
+        */
+       private void attachPendingCheckpoints(ExecutionGraph eg) throws 
IOException {
+               final Map<Long, PendingCheckpoint> pendingCheckpoints = new 
HashMap<>();
+               final Map<ExecutionAttemptID, ExecutionVertex> 
verticesToConfirm = new HashMap<>();
+               eg.getAllExecutionVertices().forEach(e -> {
+                       Execution ee = e.getCurrentExecutionAttempt();
+                       if (ee != null) {
+                               verticesToConfirm.put(ee.getAttemptId(), e);
+                       }
+               });
+
+               CheckpointCoordinator checkpointCoordinator = 
eg.getCheckpointCoordinator();
+               assertNotNull(checkpointCoordinator);
+               CheckpointStorageCoordinatorView checkpointStorage = 
checkpointCoordinator.getCheckpointStorage();
+               pendingCheckpoints.put(checkpointId, new PendingCheckpoint(
+                       eg.getJobID(),
+                       checkpointId,
+                       0L,
+                       verticesToConfirm,
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                       
checkpointStorage.initializeLocationForCheckpoint(checkpointId),
+                       eg.getFutureExecutor()));
+
+               long newCheckpointId = checkpointId + 1;
+               pendingCheckpoints.put(newCheckpointId, new PendingCheckpoint(
+                       eg.getJobID(),
+                       newCheckpointId,
+                       0L,
+                       verticesToConfirm,
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                       
checkpointStorage.initializeLocationForCheckpoint(newCheckpointId),
+                       eg.getFutureExecutor()));
+               Whitebox.setInternalState(checkpointCoordinator, 
"pendingCheckpoints", pendingCheckpoints);
+       }
+
+       /**
+        * Let the checkpoint coordinator to receive all acknowledges from 
given executionVertexes so that to complete the expected checkpoint.
+        */
+       private void acknowledgeAllCheckpoints(CheckpointCoordinator 
checkpointCoordinator, Iterator<ExecutionVertex> executionVertexes) throws 
IOException, CheckpointException {
+               while (executionVertexes.hasNext()) {
+                       ExecutionVertex executionVertex = 
executionVertexes.next();
+                       for (int index = 0; index < 
executionVertex.getJobVertex().getParallelism(); index++) {
+                               JobVertexID jobVertexID = 
executionVertex.getJobvertexId();
+                               OperatorStateHandle opStateBackend = 
CheckpointCoordinatorTest.generatePartitionableStateHandle(jobVertexID, index, 
2, 8, false);
+                               OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(opStateBackend, null, null, null);
+                               TaskStateSnapshot taskOperatorSubtaskStates = 
new TaskStateSnapshot();
+                               
taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID),
 operatorSubtaskState);
+
+                               AcknowledgeCheckpoint acknowledgeCheckpoint = 
new AcknowledgeCheckpoint(
+                                       executionVertex.getJobId(),
+                                       
executionVertex.getJobVertex().getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+                                       checkpointId,
+                                       new CheckpointMetrics(),
+                                       taskOperatorSubtaskStates);
+
+                               
checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint);
+                       }
+               }
+       }
+
+       private static class CollectTddTaskManagerGateway extends 
SimpleAckingTaskManagerGateway {
+
+               private final Map<ExecutionAttemptID, JobManagerTaskRestore> 
attemptIDInitStateMap;
+
+               CollectTddTaskManagerGateway(Map<ExecutionAttemptID, 
JobManagerTaskRestore> attemptIDInitStateMap) {
+                       this.attemptIDInitStateMap = attemptIDInitStateMap;
+               }
+
+               @Override
+               public CompletableFuture<Acknowledge> 
submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+                       attemptIDInitStateMap.put(tdd.getExecutionAttemptId(), 
tdd.getTaskRestore());
+                       return super.submitTask(tdd, timeout);
+               }
+       }
+
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index 280e11a..cbe99f4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -38,9 +38,10 @@ import org.apache.flink.util.Collector;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.AssumptionViolatedException;
+import org.junit.Before;
 
 import java.io.File;
 import java.io.IOException;
@@ -74,8 +75,14 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
 
        private static Map<Integer, Set<String>> actualCollectedContent = new 
HashMap<>();
 
-       @BeforeClass
-       public static void createHDFS() {
+       @Before
+       public void createHDFS() {
+               if 
(failoverStrategy.equals(FailoverStrategy.RestartPipelinedRegionStrategy)) {
+                       // TODO the 'NO_OF_RETRIES' is useless for current 
RestartPipelinedRegionStrategy,
+                       // for this ContinuousFileProcessingCheckpointITCase, 
using RestartPipelinedRegionStrategy would result in endless running.
+                       throw new AssumptionViolatedException("ignored 
ContinuousFileProcessingCheckpointITCase when using 
RestartPipelinedRegionStrategy");
+               }
+
                try {
                        baseDir = new 
File("./target/localfs/fs_tests").getAbsoluteFile();
                        FileUtil.fullyDelete(baseDir);
@@ -91,10 +98,12 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
                }
        }
 
-       @AfterClass
-       public static void destroyHDFS() {
+       @After
+       public void destroyHDFS() {
                try {
-                       FileUtil.fullyDelete(baseDir);
+                       if (baseDir != null) {
+                               FileUtil.fullyDelete(baseDir);
+                       }
                } catch (Throwable t) {
                        throw new RuntimeException(t);
                }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
new file mode 100644
index 0000000..b951b48
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for region failover with multi regions.
+ */
+public class RegionFailoverITCase extends TestLogger {
+
+       private static final int FAIL_BASE = 1000;
+       private static final int NUM_OF_REGIONS = 3;
+       private static final int MAX_PARALLELISM = 2 * NUM_OF_REGIONS;
+       private static final Set<Integer> EXPECTED_INDICES = IntStream.range(0, 
NUM_OF_REGIONS).boxed().collect(Collectors.toSet());
+       private static final int NUM_OF_RESTARTS = 3;
+       private static final int NUM_ELEMENTS = FAIL_BASE * 10;
+
+       private static AtomicLong lastCompletedCheckpointId = new AtomicLong(0);
+       private static AtomicInteger numCompletedCheckpoints = new 
AtomicInteger(0);
+       private static AtomicInteger jobFailedCnt = new AtomicInteger(0);
+
+       private static Map<Long, Integer> snapshotIndicesOfSubTask = new 
HashMap<>();
+
+       private static MiniClusterWithClientResource cluster;
+
+       private static boolean restoredState = false;
+
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       @Before
+       public void setup() throws Exception {
+               Configuration configuration = new Configuration();
+               
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, 
"region");
+
+               cluster = new MiniClusterWithClientResource(
+                       new MiniClusterResourceConfiguration.Builder()
+                               .setConfiguration(configuration)
+                               .setNumberTaskManagers(2)
+                               .setNumberSlotsPerTaskManager(2).build());
+               cluster.before();
+               jobFailedCnt.set(0);
+               numCompletedCheckpoints.set(0);
+       }
+
+       @AfterClass
+       public static void shutDownExistingCluster() {
+               if (cluster != null) {
+                       cluster.after();
+                       cluster = null;
+               }
+       }
+
+       /**
+        * Tests that a simple job (Source -> Map) with multi regions could 
restore with operator state.
+        *
+        * <p>The last subtask of Map function in the 1st stream graph would 
fail {@code NUM_OF_RESTARTS} times,
+        * and it will verify whether the restored state is identical to last 
completed checkpoint's.
+        */
+       @Test(timeout = 60000)
+       public void testMultiRegionFailover() {
+               try {
+                       JobGraph jobGraph = createJobGraph();
+                       ClusterClient<?> client = cluster.getClusterClient();
+                       client.submitJob(jobGraph, 
RegionFailoverITCase.class.getClassLoader());
+                       verifyAfterJobExecuted();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail(e.getMessage());
+               }
+       }
+
+       private void verifyAfterJobExecuted() {
+               Assert.assertTrue("The test multi-region job has never ever 
restored state.", restoredState);
+
+               int keyCount = 0;
+               for (Map<Integer, Integer> map : ValidatingSink.maps) {
+                       for (Map.Entry<Integer, Integer> entry : 
map.entrySet()) {
+                               assertEquals(4 * entry.getKey() + 1, (int) 
entry.getValue());
+                               keyCount += 1;
+                       }
+               }
+               assertEquals(NUM_ELEMENTS / 2, keyCount);
+       }
+
+       private JobGraph createJobGraph() {
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(NUM_OF_REGIONS);
+               env.setMaxParallelism(MAX_PARALLELISM);
+               env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE);
+               
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+               env.disableOperatorChaining();
+               
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(NUM_OF_RESTARTS,
 0L));
+               env.getConfig().disableSysoutLogging();
+
+               // Use DataStreamUtils#reinterpretAsKeyed to avoid merge 
regions and this stream graph would exist num of 'NUM_OF_REGIONS' individual 
regions.
+               DataStreamUtils.reinterpretAsKeyedStream(
+                       env.addSource(new 
StringGeneratingSourceFunction(NUM_ELEMENTS, NUM_ELEMENTS / NUM_OF_RESTARTS))
+                               .setParallelism(NUM_OF_REGIONS),
+                       (KeySelector<Tuple2<Integer, Integer>, Integer>) value 
-> value.f0,
+                       TypeInformation.of(Integer.class))
+                       .map(new FailingMapperFunction(NUM_OF_RESTARTS))
+                       .setParallelism(NUM_OF_REGIONS)
+                       .addSink(new ValidatingSink())
+                       .setParallelism(NUM_OF_REGIONS);
+
+               // another stream graph totally disconnected with the above one.
+               env.addSource(new StringGeneratingSourceFunction(NUM_ELEMENTS, 
NUM_ELEMENTS / NUM_OF_RESTARTS)).setParallelism(1)
+                       .map((MapFunction<Tuple2<Integer, Integer>, Object>) 
value -> value).setParallelism(1);
+
+               return env.getStreamGraph().getJobGraph();
+       }
+
+       private static class StringGeneratingSourceFunction extends 
RichParallelSourceFunction<Tuple2<Integer, Integer>>
+               implements CheckpointListener, CheckpointedFunction {
+               private static final long serialVersionUID = 1L;
+
+               private final long numElements;
+               private final long checkpointLatestAt;
+
+               private int index = -1;
+
+               private int lastRegionIndex = -1;
+
+               private volatile boolean isRunning = true;
+
+               private ListState<Integer> listState;
+
+               private static final ListStateDescriptor<Integer> 
stateDescriptor = new ListStateDescriptor<>("list-1", Integer.class);
+
+               private ListState<Integer> unionListState;
+
+               private static final ListStateDescriptor<Integer> 
unionStateDescriptor = new ListStateDescriptor<>("list-2", Integer.class);
+
+               StringGeneratingSourceFunction(long numElements, long 
checkpointLatestAt) {
+                       this.numElements = numElements;
+                       this.checkpointLatestAt = checkpointLatestAt;
+               }
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, Integer>> ctx) 
throws Exception {
+                       if (index < 0) {
+                               // not been restored, so initialize
+                               index = 0;
+                       }
+
+                       int subTaskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+                       while (isRunning && index < numElements) {
+
+                               synchronized (ctx.getCheckpointLock()) {
+                                       int key = index / 2;
+                                       int forwardTaskIndex = 
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, MAX_PARALLELISM, 
NUM_OF_REGIONS);
+                                       // pre-partition output keys
+                                       if (forwardTaskIndex == subTaskIndex) {
+                                               // we would send data with the 
same key twice.
+                                               ctx.collect(Tuple2.of(key, 
index));
+                                       }
+                                       index += 1;
+                               }
+
+                               if (numCompletedCheckpoints.get() < 3) {
+                                       // not yet completed enough 
checkpoints, so slow down
+                                       if (index < checkpointLatestAt) {
+                                               // mild slow down
+                                               Thread.sleep(1);
+                                       } else {
+                                               // wait until the checkpoints 
are completed
+                                               while (isRunning && 
numCompletedCheckpoints.get() < 3) {
+                                                       Thread.sleep(300);
+                                               }
+                                       }
+                               }
+                               if (jobFailedCnt.get() < NUM_OF_RESTARTS) {
+                                       // slow down if job has not failed for 
'NUM_OF_RESTARTS' times.
+                                       Thread.sleep(1);
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       isRunning = false;
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) {
+                       if (getRuntimeContext().getIndexOfThisSubtask() == 
NUM_OF_REGIONS - 1) {
+                               lastCompletedCheckpointId.set(checkpointId);
+                               snapshotIndicesOfSubTask.put(checkpointId, 
lastRegionIndex);
+                               numCompletedCheckpoints.incrementAndGet();
+                       }
+               }
+
+               @Override
+               public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
+                       int indexOfThisSubtask = 
getRuntimeContext().getIndexOfThisSubtask();
+                       if (indexOfThisSubtask != 0) {
+                               listState.clear();
+                               listState.add(index);
+                               if (indexOfThisSubtask == NUM_OF_REGIONS - 1) {
+                                       lastRegionIndex = index;
+                               }
+                       }
+                       unionListState.clear();
+                       unionListState.add(indexOfThisSubtask);
+               }
+
+               @Override
+               public void initializeState(FunctionInitializationContext 
context) throws Exception {
+                       int indexOfThisSubtask = 
getRuntimeContext().getIndexOfThisSubtask();
+                       if (context.isRestored()) {
+                               restoredState = true;
+
+                               unionListState = 
context.getOperatorStateStore().getUnionListState(unionStateDescriptor);
+                               Set<Integer> actualIndices = 
StreamSupport.stream(unionListState.get().spliterator(), 
false).collect(Collectors.toSet());
+                               
Assert.assertTrue(CollectionUtils.isEqualCollection(EXPECTED_INDICES, 
actualIndices));
+
+                               if (indexOfThisSubtask == 0) {
+                                       listState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+                                       Assert.assertTrue("list state should be 
empty for subtask-0",
+                                               ((List<Integer>) 
listState.get()).isEmpty());
+                               } else {
+                                       listState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+                                       Assert.assertTrue("list state should 
not be empty for subtask-" + indexOfThisSubtask,
+                                               ((List<Integer>) 
listState.get()).size() > 0);
+
+                                       if (indexOfThisSubtask == 
NUM_OF_REGIONS - 1) {
+                                               index = 
listState.get().iterator().next();
+                                               if (index != 
snapshotIndicesOfSubTask.get(lastCompletedCheckpointId.get())) {
+                                                       throw new 
RuntimeException("Test failed due to unexpected recovered index: " + index +
+                                                               ", while last 
completed checkpoint record index: " + 
snapshotIndicesOfSubTask.get(lastCompletedCheckpointId.get()));
+                                               }
+                                       }
+                               }
+                       } else {
+                               unionListState = 
context.getOperatorStateStore().getUnionListState(unionStateDescriptor);
+
+                               if (indexOfThisSubtask != 0) {
+                                       listState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+                               }
+                       }
+
+               }
+       }
+
+       private static class FailingMapperFunction extends 
RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+               private final int restartTimes;
+               private ValueState<Integer> valueState;
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+                       valueState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("value", Integer.class));
+               }
+
+               FailingMapperFunction(int restartTimes) {
+                       this.restartTimes = restartTimes;
+               }
+
+               @Override
+               public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> 
input) throws Exception {
+                       int indexOfThisSubtask = 
getRuntimeContext().getIndexOfThisSubtask();
+
+                       if (input.f1 > FAIL_BASE * (jobFailedCnt.get() + 1)) {
+
+                               // we would let region-0 to failover first
+                               if (jobFailedCnt.get() < 1 && 
indexOfThisSubtask == 0) {
+                                       jobFailedCnt.incrementAndGet();
+                                       throw new TestException();
+                               }
+
+                               // then let last region to failover
+                               if (jobFailedCnt.get() < restartTimes && 
indexOfThisSubtask == NUM_OF_REGIONS - 1) {
+                                       jobFailedCnt.incrementAndGet();
+                                       throw new TestException();
+                               }
+                       }
+
+                       // take input (1, 2) and (1, 3) for example, we would 
finally emit (1, 5) out with the usage of keyed state.
+                       Integer value = valueState.value();
+                       if (value == null) {
+                               valueState.update(input.f1);
+                               return input;
+                       } else {
+                               return Tuple2.of(input.f0, value + input.f1);
+                       }
+               }
+       }
+
+       private static class ValidatingSink extends 
RichSinkFunction<Tuple2<Integer, Integer>>
+               implements ListCheckpointed<HashMap<Integer, Integer>> {
+
+               @SuppressWarnings("unchecked")
+               private static Map<Integer, Integer>[] maps = (Map<Integer, 
Integer>[]) new Map<?, ?>[NUM_OF_REGIONS];
+
+               private HashMap<Integer, Integer> counts = new HashMap<>();
+
+               @Override
+               public void invoke(Tuple2<Integer, Integer> input) {
+                       counts.merge(input.f0, input.f1, Math::max);
+               }
+
+               @Override
+               public void close() throws Exception {
+                       maps[getRuntimeContext().getIndexOfThisSubtask()] = 
counts;
+               }
+
+               @Override
+               public List<HashMap<Integer, Integer>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
+                       return Collections.singletonList(this.counts);
+               }
+
+               @Override
+               public void restoreState(List<HashMap<Integer, Integer>> state) 
throws Exception {
+                       if (state.size() != 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       }
+                       this.counts.putAll(state.get(0));
+               }
+       }
+
+       private static class TestException extends IOException{
+               private static final long serialVersionUID = 1L;
+       }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 0ace56f5..6fae020 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -19,33 +19,84 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.test.util.TestUtils;
+import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.ClassRule;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.fail;
 
 /**
  * Test base for fault tolerant streaming programs.
  */
+@RunWith(Parameterized.class)
 public abstract class StreamFaultToleranceTestBase extends TestLogger {
 
+       @Parameterized.Parameters(name = "FailoverStrategy: {0}")
+       public static Collection<FailoverStrategy> parameters() {
+               return Arrays.asList(FailoverStrategy.RestartAllStrategy, 
FailoverStrategy.RestartPipelinedRegionStrategy);
+       }
+
+       /**
+        * The failover strategy to use.
+        */
+       public enum FailoverStrategy{
+               RestartAllStrategy,
+               RestartPipelinedRegionStrategy
+       }
+
+       @Parameterized.Parameter
+       public FailoverStrategy failoverStrategy;
+
        protected static final int NUM_TASK_MANAGERS = 3;
        protected static final int NUM_TASK_SLOTS = 4;
        protected static final int PARALLELISM = NUM_TASK_MANAGERS * 
NUM_TASK_SLOTS;
 
-       @ClassRule
-       public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE 
= new MiniClusterWithClientResource(
-               new MiniClusterResourceConfiguration.Builder()
-                       .setNumberTaskManagers(NUM_TASK_MANAGERS)
-                       .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
-                       .build());
+       private static MiniClusterWithClientResource cluster;
+
+       @Before
+       public void setup() throws Exception {
+               Configuration configuration = new Configuration();
+               switch (failoverStrategy) {
+                       case RestartPipelinedRegionStrategy:
+                               
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, 
"region");
+                               break;
+                       case RestartAllStrategy:
+                               
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");
+               }
+
+               cluster = new MiniClusterWithClientResource(
+                       new MiniClusterResourceConfiguration.Builder()
+                               .setConfiguration(configuration)
+                               .setNumberTaskManagers(NUM_TASK_MANAGERS)
+                               .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+                               .build());
+               cluster.before();
+       }
+
+       @After
+       public void shutDownExistingCluster() {
+               if (cluster != null) {
+                       cluster.after();
+                       cluster = null;
+               }
+       }
 
        /**
         * Implementations are expected to assemble the test topology in this 
function
@@ -73,7 +124,25 @@ public abstract class StreamFaultToleranceTestBase extends 
TestLogger {
 
                        testProgram(env);
 
-                       TestUtils.tryExecute(env, "Fault Tolerance Test");
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       try {
+                               cluster.getClusterClient().submitJob(jobGraph, 
getClass().getClassLoader()).getJobExecutionResult();
+                       }
+                       catch (ProgramInvocationException root) {
+                               Throwable cause = root.getCause();
+
+                               // search for nested SuccessExceptions
+                               int depth = 0;
+                               while (!(cause instanceof SuccessException)) {
+                                       if (cause == null || depth++ == 20) {
+                                               root.printStackTrace();
+                                               fail("Test failed: " + 
root.getMessage());
+                                       }
+                                       else {
+                                               cause = cause.getCause();
+                                       }
+                               }
+                       }
 
                        postSubmit();
                }

Reply via email to