This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c57e5a  [FLINK-12364] Introduce CheckpointFailureManager for 
centralized checkpoint failure handling
8c57e5a is described below

commit 8c57e5aed871b42d51e6218620ce7bd7991338c0
Author: vinoyang <yanghua1...@gmail.com>
AuthorDate: Wed Jun 19 17:00:59 2019 +0800

    [FLINK-12364] Introduce CheckpointFailureManager for centralized checkpoint 
failure handling
---
 flink-end-to-end-tests/test-scripts/common.sh      |   2 +-
 .../jobmanager/JMXJobManagerMetricTest.java        |   3 +-
 .../runtime/checkpoint/CheckpointCoordinator.java  |  74 ++--
 .../checkpoint/CheckpointFailureManager.java       | 135 +++++++
 .../checkpoint/CheckpointFailureReason.java        |  18 +-
 .../runtime/checkpoint/CheckpointIDCounter.java    |   7 +
 .../runtime/checkpoint/PendingCheckpoint.java      |   2 +-
 .../checkpoint/StandaloneCheckpointIDCounter.java  |   5 +
 .../checkpoint/ZooKeeperCheckpointIDCounter.java   |  21 +-
 .../decline/AlignmentLimitExceededException.java   |  33 --
 .../decline/CheckpointDeclineException.java        |  35 --
 ...pointDeclineOnCancellationBarrierException.java |  32 --
 .../CheckpointDeclineSubsumedException.java        |  32 --
 ...kpointDeclineTaskNotCheckpointingException.java |  32 --
 .../CheckpointDeclineTaskNotReadyException.java    |  32 --
 .../decline/InputEndOfStreamException.java         |  32 --
 .../runtime/executiongraph/ExecutionGraph.java     |  32 +-
 .../executiongraph/ExecutionGraphBuilder.java      |   9 +-
 .../tasks/CheckpointCoordinatorConfiguration.java  |  22 +-
 .../messages/checkpoint/DeclineCheckpoint.java     |  16 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   7 +-
 .../exceptions/CheckpointException.java            |  41 --
 .../org/apache/flink/runtime/taskmanager/Task.java |   7 +-
 .../CheckpointCoordinatorFailureTest.java          |  14 +-
 .../CheckpointCoordinatorMasterHooksTest.java      |  18 +-
 .../checkpoint/CheckpointCoordinatorTest.java      | 418 ++++++++++++++++-----
 .../checkpoint/CheckpointFailureManagerTest.java   | 117 ++++++
 .../checkpoint/CheckpointIDCounterTest.java        |   6 +
 .../CheckpointSettingsSerializableTest.java        |   3 +-
 .../checkpoint/CheckpointStateRestoreTest.java     |  43 ++-
 .../checkpoint/CheckpointStatsTrackerTest.java     |   3 +-
 .../ExecutionGraphCheckpointCoordinatorTest.java   |  25 +-
 .../executiongraph/ArchivedExecutionGraphTest.java |  11 +-
 ...ncurrentFailoverStrategyExecutionGraphTest.java |  12 +-
 .../ExecutionGraphDeploymentTest.java              |   3 +-
 .../runtime/executiongraph/FailoverRegionTest.java |  18 +-
 .../flink/runtime/jobgraph/JobGraphTest.java       |   7 +-
 .../tasks/JobCheckpointingSettingsTest.java        |   3 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   3 +-
 .../api/environment/CheckpointConfig.java          |  27 +-
 .../api/graph/StreamingJobGraphGenerator.java      |   5 +-
 .../flink/streaming/runtime/io/BarrierBuffer.java  |  31 +-
 .../flink/streaming/runtime/io/BarrierTracker.java |   6 +-
 .../io/BarrierBufferAlignmentLimitTest.java        |   8 +-
 .../runtime/io/BarrierBufferTestBase.java          |  63 +++-
 .../tasks/StreamTaskCancellationBarrierTest.java   |  11 +-
 .../jobmaster/JobMasterStopWithSavepointIT.java    |   3 +-
 .../jobmaster/JobMasterTriggerSavepointITCase.java |   3 +-
 .../test/streaming/runtime/IterateITCase.java      |   4 +-
 49 files changed, 943 insertions(+), 551 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index 361e49a..4eac9f4 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -361,7 +361,7 @@ function check_logs_for_exceptions {
    | grep -v "java.io.InvalidClassException: 
org.apache.flink.formats.avro.typeutils.AvroSerializer" \
    | grep -v "Caused by: java.lang.Exception: JobManager is shutting down" \
    | grep -v "java.lang.Exception: Artificial failure" \
-   | grep -v "org.apache.flink.runtime.checkpoint.decline" \
+   | grep -v "org.apache.flink.runtime.checkpoint.CheckpointException" \
    | grep -v "org.elasticsearch.ElasticsearchException" \
    | grep -v "Elasticsearch exception" \
    | grep -ic "exception" || true)
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index b946896..7156adb 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -101,7 +101,8 @@ public class JMXJobManagerMetricTest extends TestLogger {
                                        5,
                                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
                                        true,
-                                       false),
+                                       false,
+                                       0),
                                null));
 
                        ClusterClient<?> client = 
MINI_CLUSTER_RESOURCE.getClusterClient();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b6f5a81..3dc5c1d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -33,6 +32,7 @@ import 
org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
@@ -185,15 +185,13 @@ public class CheckpointCoordinator {
 
        private boolean isPreferCheckpointForRecovery;
 
+       private final CheckpointFailureManager failureManager;
+
        // 
--------------------------------------------------------------------------------------------
 
        public CheckpointCoordinator(
                        JobID job,
-                       long baseInterval,
-                       long checkpointTimeout,
-                       long minPauseBetweenCheckpoints,
-                       int maxConcurrentCheckpointAttempts,
-                       CheckpointRetentionPolicy retentionPolicy,
+                       CheckpointCoordinatorConfiguration chkConfig,
                        ExecutionVertex[] tasksToTrigger,
                        ExecutionVertex[] tasksToWaitFor,
                        ExecutionVertex[] tasksToCommitTo,
@@ -202,31 +200,29 @@ public class CheckpointCoordinator {
                        StateBackend checkpointStateBackend,
                        Executor executor,
                        SharedStateRegistryFactory sharedStateRegistryFactory,
-                       boolean isPreferCheckpointForRecovery) {
+                       CheckpointFailureManager failureManager) {
 
                // sanity checks
                checkNotNull(checkpointStateBackend);
-               checkArgument(baseInterval > 0, "Checkpoint base interval must 
be larger than zero");
-               checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must 
be larger than zero");
-               checkArgument(minPauseBetweenCheckpoints >= 0, 
"minPauseBetweenCheckpoints must be >= 0");
-               checkArgument(maxConcurrentCheckpointAttempts >= 1, 
"maxConcurrentCheckpointAttempts must be >= 1");
 
                // max "in between duration" can be one year - this is to 
prevent numeric overflows
+               long minPauseBetweenCheckpoints = 
chkConfig.getMinPauseBetweenCheckpoints();
                if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
                        minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 
1_000;
                }
 
                // it does not make sense to schedule checkpoints more often 
then the desired
                // time between checkpoints
+               long baseInterval = chkConfig.getCheckpointInterval();
                if (baseInterval < minPauseBetweenCheckpoints) {
                        baseInterval = minPauseBetweenCheckpoints;
                }
 
                this.job = checkNotNull(job);
                this.baseInterval = baseInterval;
-               this.checkpointTimeout = checkpointTimeout;
+               this.checkpointTimeout = chkConfig.getCheckpointTimeout();
                this.minPauseBetweenCheckpointsNanos = 
minPauseBetweenCheckpoints * 1_000_000;
-               this.maxConcurrentCheckpointAttempts = 
maxConcurrentCheckpointAttempts;
+               this.maxConcurrentCheckpointAttempts = 
chkConfig.getMaxConcurrentCheckpoints();
                this.tasksToTrigger = checkNotNull(tasksToTrigger);
                this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
                this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
@@ -236,7 +232,8 @@ public class CheckpointCoordinator {
                this.executor = checkNotNull(executor);
                this.sharedStateRegistryFactory = 
checkNotNull(sharedStateRegistryFactory);
                this.sharedStateRegistry = 
sharedStateRegistryFactory.create(executor);
-               this.isPreferCheckpointForRecovery = 
isPreferCheckpointForRecovery;
+               this.isPreferCheckpointForRecovery = 
chkConfig.isPreferCheckpointForRecovery();
+               this.failureManager = checkNotNull(failureManager);
 
                this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
                this.masterHooks = new HashMap<>();
@@ -249,7 +246,7 @@ public class CheckpointCoordinator {
                
this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
                
this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
 
-               this.checkpointProperties = 
CheckpointProperties.forCheckpoint(retentionPolicy);
+               this.checkpointProperties = 
CheckpointProperties.forCheckpoint(chkConfig.getCheckpointRetentionPolicy());
 
                try {
                        this.checkpointStorage = 
checkpointStateBackend.createCheckpointStorage(job);
@@ -342,7 +339,7 @@ public class CheckpointCoordinator {
 
                                // clear and discard all pending checkpoints
                                for (PendingCheckpoint pending : 
pendingCheckpoints.values()) {
-                                       
pending.abort(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
+                                       failPendingCheckpoint(pending, 
CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
                                }
                                pendingCheckpoints.clear();
 
@@ -439,6 +436,10 @@ public class CheckpointCoordinator {
                        triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
                        return true;
                } catch (CheckpointException e) {
+                       long latestGeneratedCheckpointId = 
getCheckpointIdCounter().get();
+                       // here we can not get the failed pending checkpoint's 
id,
+                       // so we pass the negative latest generated checkpoint 
id as a special flag
+                       failureManager.handleCheckpointException(e, -1 * 
latestGeneratedCheckpointId);
                        return false;
                }
        }
@@ -459,7 +460,7 @@ public class CheckpointCoordinator {
                synchronized (lock) {
                        // abort if the coordinator has been shutdown in the 
meantime
                        if (shutdown) {
-                               throw new 
CheckpointException(CheckpointFailureReason.COORDINATOR_SHUTDOWN);
+                               throw new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
                        }
 
                        // Don't allow periodic checkpoint if scheduling has 
been disabled
@@ -599,7 +600,7 @@ public class CheckpointCoordinator {
                                        if (!checkpoint.isDiscarded()) {
                                                LOG.info("Checkpoint {} of job 
{} expired before completing.", checkpointID, job);
 
-                                               
checkpoint.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED);
+                                               
failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
                                                
pendingCheckpoints.remove(checkpointID);
                                                
rememberRecentCheckpointId(checkpointID);
 
@@ -614,7 +615,7 @@ public class CheckpointCoordinator {
                                        // since we released the lock in the 
meantime, we need to re-check
                                        // that the conditions still hold.
                                        if (shutdown) {
-                                               throw new 
CheckpointException(CheckpointFailureReason.COORDINATOR_SHUTDOWN);
+                                               throw new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
                                        }
                                        else if (!props.forceCheckpoint()) {
                                                if (triggerRequestQueued) {
@@ -699,7 +700,7 @@ public class CheckpointCoordinator {
                                                checkpointID, job, 
numUnsuccessful, t);
 
                                if (!checkpoint.isDiscarded()) {
-                                       
checkpoint.abort(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
+                                       failPendingCheckpoint(checkpoint, 
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
                                }
 
                                try {
@@ -891,11 +892,12 @@ public class CheckpointCoordinator {
                try {
                        try {
                                completedCheckpoint = 
pendingCheckpoint.finalizeCheckpoint();
+                               
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
                        }
                        catch (Exception e1) {
                                // abort the current pending checkpoint if we 
fails to finalize the pending checkpoint.
                                if (!pendingCheckpoint.isDiscarded()) {
-                                       
pendingCheckpoint.abort(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, 
e1);
+                                       
failPendingCheckpoint(pendingCheckpoint, 
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
                                }
 
                                throw new CheckpointException("Could not 
finalize the pending checkpoint " + checkpointId + '.',
@@ -1002,7 +1004,7 @@ public class CheckpointCoordinator {
                        // remove all pending checkpoints that are lesser than 
the current completed checkpoint
                        if (p.getCheckpointId() < checkpointId && 
p.canBeSubsumed()) {
                                rememberRecentCheckpointId(p.getCheckpointId());
-                               
p.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED);
+                               failPendingCheckpoint(p, 
CheckpointFailureReason.CHECKPOINT_SUBSUMED);
                                entries.remove();
                        }
                }
@@ -1275,7 +1277,7 @@ public class CheckpointCoordinator {
        public void abortPendingCheckpoints(CheckpointException exception) {
                synchronized (lock) {
                        for (PendingCheckpoint p : pendingCheckpoints.values()) 
{
-                               p.abort(exception.getCheckpointFailureReason());
+                               failPendingCheckpoint(p, 
exception.getCheckpointFailureReason());
                        }
 
                        pendingCheckpoints.clear();
@@ -1329,10 +1331,13 @@ public class CheckpointCoordinator {
 
                LOG.info("Discarding checkpoint {} of job {}.", checkpointId, 
job, cause);
 
-               if (cause == null || cause instanceof 
CheckpointDeclineException) {
-                       
pendingCheckpoint.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, cause);
+               if (cause == null) {
+                       failPendingCheckpoint(pendingCheckpoint, 
CheckpointFailureReason.CHECKPOINT_DECLINED);
+               } else if (cause instanceof CheckpointException) {
+                       CheckpointException exception = (CheckpointException) 
cause;
+                       failPendingCheckpoint(pendingCheckpoint, 
exception.getCheckpointFailureReason(), cause);
                } else {
-                       
pendingCheckpoint.abort(CheckpointFailureReason.JOB_FAILURE, cause);
+                       failPendingCheckpoint(pendingCheckpoint, 
CheckpointFailureReason.JOB_FAILURE, cause);
                }
 
                rememberRecentCheckpointId(checkpointId);
@@ -1384,4 +1389,21 @@ public class CheckpointCoordinator {
                        });
                }
        }
+
+       private void failPendingCheckpoint(
+                       final PendingCheckpoint pendingCheckpoint,
+                       final CheckpointFailureReason reason,
+                       final Throwable cause) {
+
+               CheckpointException exception = new CheckpointException(reason, 
cause);
+               pendingCheckpoint.abort(reason, cause);
+               failureManager.handleCheckpointException(exception, 
pendingCheckpoint.getCheckpointId());
+       }
+
+       private void failPendingCheckpoint(
+                       final PendingCheckpoint pendingCheckpoint,
+                       final CheckpointFailureReason reason) {
+
+               failPendingCheckpoint(pendingCheckpoint, reason, null);
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
new file mode 100644
index 0000000..4a95cdd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+       private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+       private final int tolerableCpFailureNumber;
+       private final FailJobCallback failureCallback;
+       private final AtomicInteger continuousFailureCounter;
+       private final Set<Long> countedCheckpointIds;
+
+       public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+               checkArgument(tolerableCpFailureNumber >= 0,
+                       "The tolerable checkpoint failure number is illegal, " +
+                               "it must be greater than or equal to 0 .");
+               this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+               this.continuousFailureCounter = new AtomicInteger(0);
+               this.failureCallback = checkNotNull(failureCallback);
+               this.countedCheckpointIds = ConcurrentHashMap.newKeySet();
+       }
+
+       /**
+        * Handle checkpoint exception with a handler callback.
+        *
+        * @param exception the checkpoint exception.
+        * @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
+        *                     checkpoint id sequence. In trigger phase, we may 
not get the checkpoint id when the failure
+        *                     happens before the checkpoint id generation. In 
this case, it will be specified a negative
+        *                      latest generated checkpoint id as a special 
flag.
+        */
+       public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
+               if (tolerableCpFailureNumber == 
UNLIMITED_TOLERABLE_FAILURE_NUMBER) {
+                       return;
+               }
+
+               CheckpointFailureReason reason = 
exception.getCheckpointFailureReason();
+               switch (reason) {
+                       case PERIODIC_SCHEDULER_SHUTDOWN:
+                       case ALREADY_QUEUED:
+                       case TOO_MANY_CONCURRENT_CHECKPOINTS:
+                       case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
+                       case NOT_ALL_REQUIRED_TASKS_RUNNING:
+                       case CHECKPOINT_SUBSUMED:
+                       case CHECKPOINT_COORDINATOR_SUSPEND:
+                       case CHECKPOINT_COORDINATOR_SHUTDOWN:
+                       case JOB_FAILURE:
+                       case JOB_FAILOVER_REGION:
+                       //for compatibility purposes with user job behavior
+                       case CHECKPOINT_DECLINED_TASK_NOT_READY:
+                       case CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING:
+                       case CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED:
+                       case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER:
+                       case CHECKPOINT_DECLINED_SUBSUMED:
+                       case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM:
+
+                       case EXCEPTION:
+                       case CHECKPOINT_EXPIRED:
+                       case TASK_CHECKPOINT_FAILURE:
+                       case TRIGGER_CHECKPOINT_FAILURE:
+                       case FINALIZE_CHECKPOINT_FAILURE:
+                               //ignore
+                               break;
+
+                       case CHECKPOINT_DECLINED:
+                               //we should make sure one checkpoint only be 
counted once
+                               if (countedCheckpointIds.add(checkpointId)) {
+                                       
continuousFailureCounter.incrementAndGet();
+                               }
+
+                               break;
+
+                       default:
+                               throw new FlinkRuntimeException("Unknown 
checkpoint failure reason : " + reason.name());
+               }
+
+               if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+                       clearCount();
+                       failureCallback.failJob();
+               }
+       }
+
+       /**
+        * Handle checkpoint success.
+        *
+        * @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
+        *                     checkpoint id sequence.
+        */
+       public void handleCheckpointSuccess(long checkpointId) {
+               clearCount();
+       }
+
+       private void clearCount() {
+               continuousFailureCounter.set(0);
+               countedCheckpointIds.clear();
+       }
+
+       /**
+        * A callback interface about how to fail a job.
+        */
+       public interface FailJobCallback {
+
+               void failJob();
+
+       }
+
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
index 35f457a..e00cce7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
@@ -23,8 +23,6 @@ package org.apache.flink.runtime.checkpoint;
  */
 public enum CheckpointFailureReason {
 
-       COORDINATOR_SHUTDOWN("Checkpoint coordinator is shut down."),
-
        PERIODIC_SCHEDULER_SHUTDOWN("Periodic checkpoint scheduler is shut 
down."),
 
        ALREADY_QUEUED("Another checkpoint request has already been queued."),
@@ -38,13 +36,23 @@ public enum CheckpointFailureReason {
 
        EXCEPTION("An Exception occurred while triggering the checkpoint."),
 
-       EXPIRED("The checkpoint expired before triggering was complete"),
-
        CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
 
        CHECKPOINT_SUBSUMED("Checkpoint has been subsumed."),
 
-       CHECKPOINT_DECLINED("Checkpoint was declined (tasks not ready)."),
+       CHECKPOINT_DECLINED("Checkpoint was declined."),
+
+       CHECKPOINT_DECLINED_TASK_NOT_READY("Checkpoint was declined (tasks not 
ready)"),
+
+       CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING("Task does not support 
checkpointing"),
+
+       CHECKPOINT_DECLINED_SUBSUMED("Checkpoint was canceled because a barrier 
from newer checkpoint was received."),
+
+       CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER("Task received cancellation 
from one of its inputs"),
+
+       CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED("The checkpoint alignment 
phase needed to buffer more than the configured maximum bytes"),
+
+       CHECKPOINT_DECLINED_INPUT_END_OF_STREAM("Checkpoint was declined 
because one input stream is finished"),
 
        CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
index 48cec7d..1af6730 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
@@ -48,6 +48,13 @@ public interface CheckpointIDCounter {
        long getAndIncrement() throws Exception;
 
        /**
+        * Atomically gets the current checkpoint ID.
+        *
+        * @return The current checkpoint ID
+        */
+       long get();
+
+       /**
         * Sets the current checkpoint ID.
         *
         * @param newId The new ID
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index d03c28f..ac086ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -406,7 +406,7 @@ public class PendingCheckpoint {
        /**
         * Aborts a checkpoint with reason and cause.
         */
-       public void abort(CheckpointFailureReason reason, Throwable cause) {
+       public void abort(CheckpointFailureReason reason, @Nullable Throwable 
cause) {
                try {
                        CheckpointException exception = new 
CheckpointException(reason, cause);
                        onCompletionPromise.completeExceptionally(exception);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index f43df5a..f63b0d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -44,6 +44,11 @@ public class StandaloneCheckpointIDCounter implements 
CheckpointIDCounter {
        }
 
        @Override
+       public long get() {
+               return checkpointIdCounter.get();
+       }
+
+       @Override
        public void setCount(long newCount) {
                checkpointIdCounter.set(newCount);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
index 7d15fab..2d08d58 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -111,11 +111,7 @@ public class ZooKeeperCheckpointIDCounter implements 
CheckpointIDCounter {
        @Override
        public long getAndIncrement() throws Exception {
                while (true) {
-                       ConnectionState connState = 
connStateListener.getLastState();
-
-                       if (connState != null) {
-                               throw new IllegalStateException("Connection 
state: " + connState);
-                       }
+                       checkConnectionState();
 
                        VersionedValue<Integer> current = 
sharedCount.getVersionedValue();
                        int newCount = current.getValue() + 1;
@@ -133,6 +129,13 @@ public class ZooKeeperCheckpointIDCounter implements 
CheckpointIDCounter {
        }
 
        @Override
+       public long get() {
+               checkConnectionState();
+
+               return sharedCount.getVersionedValue().getValue();
+       }
+
+       @Override
        public void setCount(long newId) throws Exception {
                ConnectionState connState = connStateListener.getLastState();
 
@@ -149,6 +152,14 @@ public class ZooKeeperCheckpointIDCounter implements 
CheckpointIDCounter {
                sharedCount.setCount((int) newId);
        }
 
+       private void checkConnectionState() {
+               ConnectionState connState = connStateListener.getLastState();
+
+               if (connState != null) {
+                       throw new IllegalStateException("Connection state: " + 
connState);
+               }
+       }
+
        /**
         * Connection state listener. In case of {@link 
ConnectionState#SUSPENDED} or {@link
         * ConnectionState#LOST} we are not guaranteed to read a current count 
from ZooKeeper.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
deleted file mode 100644
index 64d57bc..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.decline;
-
-/**
- * Exception indicating that a checkpoint was declined because too many bytes 
were
- * buffered in the alignment phase.
- */
-public final class AlignmentLimitExceededException extends 
CheckpointDeclineException {
-
-       private static final long serialVersionUID = 1L;
-
-       public AlignmentLimitExceededException(long numBytes) {
-               super("The checkpoint alignment phase needed to buffer more 
than the configured maximum ("
-                               + numBytes + " bytes).");
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
deleted file mode 100644
index 8a2802c..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.decline;
-
-/**
- * Base class of all exceptions that indicate a declined checkpoint.
- */
-public abstract class CheckpointDeclineException extends Exception {
-
-       private static final long serialVersionUID = 1L;
-
-       public CheckpointDeclineException(String message) {
-               super(message);
-       }
-
-       public CheckpointDeclineException(String message, Throwable cause) {
-               super(message, cause);
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
deleted file mode 100644
index 9ae4096..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.decline;
-
-/**
- * Exception indicating that a checkpoint was declined because a cancellation
- * barrier was received.
- */
-public final class CheckpointDeclineOnCancellationBarrierException extends 
CheckpointDeclineException {
-
-       private static final long serialVersionUID = 1L;
-
-       public CheckpointDeclineOnCancellationBarrierException() {
-               super("Task received cancellation from one of its inputs");
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
deleted file mode 100644
index 5380469..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.decline;
-
-/**
- * Exception indicating that a checkpoint was declined because a newer 
checkpoint
- * barrier was received on an input before the pending checkpoint's barrier. 
- */
-public final class CheckpointDeclineSubsumedException extends 
CheckpointDeclineException {
-
-       private static final long serialVersionUID = 1L;
-
-       public CheckpointDeclineSubsumedException(long newCheckpointId) {
-               super("Checkpoint was canceled because a barrier from newer 
checkpoint " + newCheckpointId + " was received.");
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
deleted file mode 100644
index e5773d1..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.decline;
-
-/**
- * Exception indicating that a checkpoint was declined because a task does not 
support
- * checkpointing.
- */
-public final class CheckpointDeclineTaskNotCheckpointingException extends 
CheckpointDeclineException {
-
-       private static final long serialVersionUID = 1L;
-
-       public CheckpointDeclineTaskNotCheckpointingException(String taskName) {
-               super("Task '" + taskName + "'does not support checkpointing");
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
deleted file mode 100644
index a1214fe..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.decline;
-
-/**
- * Exception indicating that a checkpoint was declined because a task was not
- * ready to perform a checkpoint.
- */
-public final class CheckpointDeclineTaskNotReadyException extends 
CheckpointDeclineException {
-
-       private static final long serialVersionUID = 1L;
-
-       public CheckpointDeclineTaskNotReadyException(String taskName) {
-               super("Task " + taskName + " was not running");
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
deleted file mode 100644
index 86b29dc..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.decline;
-
-/**
- * Exception indicating that a checkpoint was declined because one of the input
- * stream reached its end before the alignment was complete.
- */
-public final class InputEndOfStreamException extends 
CheckpointDeclineException {
-
-       private static final long serialVersionUID = 1L;
-
-       public InputEndOfStreamException() {
-               super("Checkpoint was declined because one input stream is 
finished");
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index fa4a393..0c91276 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -34,8 +34,8 @@ import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -72,6 +72,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
@@ -110,7 +111,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -517,11 +517,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
        }
 
        public void enableCheckpointing(
-                       long interval,
-                       long checkpointTimeout,
-                       long minPauseBetweenCheckpoints,
-                       int maxConcurrentCheckpoints,
-                       CheckpointRetentionPolicy retentionPolicy,
+                       CheckpointCoordinatorConfiguration chkConfig,
                        List<ExecutionJobVertex> verticesToTrigger,
                        List<ExecutionJobVertex> verticesToWaitFor,
                        List<ExecutionJobVertex> verticesToCommitTo,
@@ -529,12 +525,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore checkpointStore,
                        StateBackend checkpointStateBackend,
-                       CheckpointStatsTracker statsTracker,
-                       boolean isPreferCheckpointForRecovery) {
-
-               // simple sanity checks
-               checkArgument(interval >= 10, "checkpoint interval must not be 
below 10ms");
-               checkArgument(checkpointTimeout >= 10, "checkpoint timeout must 
not be below 10ms");
+                       CheckpointStatsTracker statsTracker) {
 
                checkState(state == JobStatus.CREATED, "Job must be in CREATED 
state");
                checkState(checkpointCoordinator == null, "checkpointing 
already enabled");
@@ -545,14 +536,15 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
 
                checkpointStatsTracker = checkNotNull(statsTracker, 
"CheckpointStatsTracker");
 
+               CheckpointFailureManager failureManager = new 
CheckpointFailureManager(chkConfig.getTolerableCheckpointFailureNumber(), () ->
+                       getJobMasterMainThreadExecutor().execute(() ->
+                               failGlobal(new FlinkRuntimeException("Exceeded 
checkpoint tolerable failure threshold."))
+                       ));
+
                // create the coordinator that triggers and commits checkpoints 
and holds the state
                checkpointCoordinator = new CheckpointCoordinator(
                        jobInformation.getJobId(),
-                       interval,
-                       checkpointTimeout,
-                       minPauseBetweenCheckpoints,
-                       maxConcurrentCheckpoints,
-                       retentionPolicy,
+                       chkConfig,
                        tasksToTrigger,
                        tasksToWaitFor,
                        tasksToCommitTo,
@@ -561,7 +553,7 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                        checkpointStateBackend,
                        ioExecutor,
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       isPreferCheckpointForRecovery);
+                       failureManager);
 
                // register the master hooks on the checkpoint coordinator
                for (MasterTriggerRestoreHook<?> hook : masterHooks) {
@@ -574,7 +566,7 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
 
                // interval of max long value indicates disable periodic 
checkpoint,
                // the CheckpointActivatorDeactivator should be created only if 
the interval is not max value
-               if (interval != Long.MAX_VALUE) {
+               if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {
                        // the periodic checkpoint scheduler is activated and 
deactivated as a result of
                        // job status changes (running -> on, all other states 
-> off)
                        
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 117b7b2..fa194e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -289,11 +289,7 @@ public class ExecutionGraphBuilder {
                        final CheckpointCoordinatorConfiguration chkConfig = 
snapshotSettings.getCheckpointCoordinatorConfiguration();
 
                        executionGraph.enableCheckpointing(
-                               chkConfig.getCheckpointInterval(),
-                               chkConfig.getCheckpointTimeout(),
-                               chkConfig.getMinPauseBetweenCheckpoints(),
-                               chkConfig.getMaxConcurrentCheckpoints(),
-                               chkConfig.getCheckpointRetentionPolicy(),
+                               chkConfig,
                                triggerVertices,
                                ackVertices,
                                confirmVertices,
@@ -301,8 +297,7 @@ public class ExecutionGraphBuilder {
                                checkpointIdCounter,
                                completedCheckpoints,
                                rootBackend,
-                               checkpointStatsTracker,
-                               chkConfig.isPreferCheckpointForRecovery());
+                               checkpointStatsTracker);
                }
 
                // create all the metrics for the Execution Graph
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index 9e57d12..cff5777 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -42,6 +42,8 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
 
        private final int maxConcurrentCheckpoints;
 
+       private final int tolerableCheckpointFailureNumber;
+
        /** Settings for what to do with checkpoints when a job finishes. */
        private final CheckpointRetentionPolicy checkpointRetentionPolicy;
 
@@ -63,11 +65,13 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                        int maxConcurrentCheckpoints,
                        CheckpointRetentionPolicy checkpointRetentionPolicy,
                        boolean isExactlyOnce,
-                       boolean isPerfetCheckpointForRecovery) {
+                       boolean isPerfetCheckpointForRecovery,
+                       int tolerableCpFailureNumber) {
 
                // sanity checks
-               if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-                       minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1) {
+               if (checkpointInterval < 10 || checkpointTimeout < 10 ||
+                       minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1 ||
+                       tolerableCpFailureNumber < 0) {
                        throw new IllegalArgumentException();
                }
 
@@ -78,6 +82,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                this.checkpointRetentionPolicy = 
Preconditions.checkNotNull(checkpointRetentionPolicy);
                this.isExactlyOnce = isExactlyOnce;
                this.isPreferCheckpointForRecovery = 
isPerfetCheckpointForRecovery;
+               this.tolerableCheckpointFailureNumber = 
tolerableCpFailureNumber;
        }
 
        public long getCheckpointInterval() {
@@ -108,6 +113,10 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                return isPreferCheckpointForRecovery;
        }
 
+       public int getTolerableCheckpointFailureNumber() {
+               return tolerableCheckpointFailureNumber;
+       }
+
        @Override
        public boolean equals(Object o) {
                if (this == o) {
@@ -123,7 +132,8 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                        maxConcurrentCheckpoints == 
that.maxConcurrentCheckpoints &&
                        isExactlyOnce == that.isExactlyOnce &&
                        checkpointRetentionPolicy == 
that.checkpointRetentionPolicy &&
-                       isPreferCheckpointForRecovery == 
that.isPreferCheckpointForRecovery;
+                       isPreferCheckpointForRecovery == 
that.isPreferCheckpointForRecovery &&
+                       tolerableCheckpointFailureNumber == 
that.tolerableCheckpointFailureNumber;
        }
 
        @Override
@@ -135,7 +145,8 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                                maxConcurrentCheckpoints,
                                checkpointRetentionPolicy,
                                isExactlyOnce,
-                               isPreferCheckpointForRecovery);
+                               isPreferCheckpointForRecovery,
+                               tolerableCheckpointFailureNumber);
        }
 
        @Override
@@ -146,6 +157,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                        ", minPauseBetweenCheckpoints=" + 
minPauseBetweenCheckpoints +
                        ", maxConcurrentCheckpoints=" + 
maxConcurrentCheckpoints +
                        ", checkpointRetentionPolicy=" + 
checkpointRetentionPolicy +
+                       ", tolerableCheckpointFailureNumber=" + 
tolerableCheckpointFailureNumber +
                        '}';
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
index 43bbc21..40fcf2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
@@ -19,12 +19,7 @@
 package org.apache.flink.runtime.messages.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
-import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.SerializedThrowable;
 
@@ -48,14 +43,7 @@ public class DeclineCheckpoint extends 
AbstractCheckpointMessage implements java
        public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, 
long checkpointId, Throwable reason) {
                super(job, taskExecutionId, checkpointId);
 
-               if (reason == null ||
-                       reason.getClass() == 
AlignmentLimitExceededException.class ||
-                       reason.getClass() == 
CheckpointDeclineOnCancellationBarrierException.class ||
-                       reason.getClass() == 
CheckpointDeclineSubsumedException.class ||
-                       reason.getClass() == 
CheckpointDeclineTaskNotCheckpointingException.class ||
-                       reason.getClass() == 
CheckpointDeclineTaskNotReadyException.class ||
-                       reason.getClass() == InputEndOfStreamException.class) {
-                       // null or known common exceptions that cannot 
reference any dynamically loaded code
+               if (reason == null || reason instanceof CheckpointException) {
                        this.reason = reason;
                } else {
                        // some other exception. replace with a serialized 
throwable, to be on the safe side
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 4238d71..4153182 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
@@ -78,7 +80,6 @@ import 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.state.TaskLocalStateStore;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
-import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
@@ -700,7 +701,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        final String message = "TaskManager received a 
checkpoint request for unknown task " + executionAttemptID + '.';
 
                        log.debug(message);
-                       return FutureUtils.completedExceptionally(new 
CheckpointException(message));
+                       return FutureUtils.completedExceptionally(new 
CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
                }
        }
 
@@ -721,7 +722,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        final String message = "TaskManager received a 
checkpoint confirmation for unknown task " + executionAttemptID + '.';
 
                        log.debug(message);
-                       return FutureUtils.completedExceptionally(new 
CheckpointException(message));
+                       return FutureUtils.completedExceptionally(new 
CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java
deleted file mode 100644
index 80f2aa0..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor.exceptions;
-
-import org.apache.flink.runtime.taskexecutor.TaskExecutor;
-
-/**
- * Exception indicating a problem with checkpointing on the {@link 
TaskExecutor} side.
- */
-public class CheckpointException extends TaskManagerException {
-
-       private static final long serialVersionUID = 3366394086880327955L;
-
-       public CheckpointException(String message) {
-               super(message);
-       }
-
-       public CheckpointException(String message, Throwable cause) {
-               super(message, cause);
-       }
-
-       public CheckpointException(Throwable cause) {
-               super(cause);
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 5512c54..2084819 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -33,9 +33,10 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -1121,7 +1122,7 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
                                                if (!success) {
                                                        
checkpointResponder.declineCheckpoint(
                                                                        
getJobID(), getExecutionId(), checkpointID,
-                                                                       new 
CheckpointDeclineTaskNotReadyException(taskName));
+                                                                       new 
CheckpointException("Task Name" + taskName, 
CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
                                                }
                                        }
                                        catch (Throwable t) {
@@ -1149,7 +1150,7 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
 
                        // send back a message that we did not do the checkpoint
                        checkpointResponder.declineCheckpoint(jobId, 
executionId, checkpointID,
-                                       new 
CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
+                                       new CheckpointException("Task name with 
subtask : " + taskNameWithSubtask, 
CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 1709fae..2edbb1e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -65,14 +66,21 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
 
                final long triggerTimestamp = 1L;
 
+               CheckpointFailureManager failureManager = new 
CheckpointFailureManager(0, () -> {});
+
                // set up the coordinator and validate the initial state
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jid,
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        600000,
                        600000,
                        0,
                        Integer.MAX_VALUE,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       chkConfig,
                        new ExecutionVertex[]{vertex},
                        new ExecutionVertex[]{vertex},
                        new ExecutionVertex[]{vertex},
@@ -81,7 +89,7 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                coord.triggerCheckpoint(triggerTimestamp, false);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 762b805..9990772 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -421,13 +422,18 @@ public class CheckpointCoordinatorMasterHooksTest {
        // 
------------------------------------------------------------------------
 
        private static CheckpointCoordinator 
instantiateCheckpointCoordinator(JobID jid, ExecutionVertex... ackVertices) {
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+                       10000000L,
+                       600000L,
+                       0L,
+                       1,
+                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
                return new CheckpointCoordinator(
                                jid,
-                               10000000L,
-                               600000L,
-                               0L,
-                               1,
-                               
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               chkConfig,
                                new ExecutionVertex[0],
                                ackVertices,
                                new ExecutionVertex[0],
@@ -436,7 +442,7 @@ public class CheckpointCoordinatorMasterHooksTest {
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               new CheckpointFailureManager(0, () -> {}));
        }
 
        private static <T> T mockGeneric(Class<?> clazz) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 1676b01..fcd7150 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
@@ -63,6 +64,7 @@ import 
org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -117,9 +119,16 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
        private static final String TASK_MANAGER_LOCATION_INFO = "Unknown 
location";
 
+       private CheckpointFailureManager failureManager;
+
        @Rule
        public TemporaryFolder tmpFolder = new TemporaryFolder();
 
+       @Before
+       public void setUp() throws Exception {
+               failureManager = new CheckpointFailureManager(0, () -> {});
+       }
+
        @Test
        public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
                try {
@@ -137,13 +146,18 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        ExecutionVertex ackVertex2 = 
mockExecutionVertex(ackAttemptID2);
 
                        // set up the coordinator and validate the initial state
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                600000,
                                600000,
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                new ExecutionVertex[] { ackVertex1, ackVertex2 
},
                                new ExecutionVertex[] {},
@@ -152,7 +166,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -199,13 +213,18 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        ExecutionVertex ackVertex2 = 
mockExecutionVertex(ackAttemptID2);
 
                        // set up the coordinator and validate the initial state
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                600000,
                                600000,
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                new ExecutionVertex[] { ackVertex1, ackVertex2 
},
                                new ExecutionVertex[] {},
@@ -214,7 +233,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -252,13 +271,18 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        ExecutionVertex ackVertex2 = 
mock(ExecutionVertex.class);
 
                        // set up the coordinator and validate the initial state
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                600000,
                                600000,
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                new ExecutionVertex[] { ackVertex1, ackVertex2 
},
                                new ExecutionVertex[] {},
@@ -267,7 +291,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -288,6 +312,77 @@ public class CheckpointCoordinatorTest extends TestLogger {
                }
        }
 
+       @Test
+       public void 
testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() {
+               final JobID jid = new JobID();
+               final long timestamp = System.currentTimeMillis();
+
+               // create some mock Execution vertices that receive the 
checkpoint trigger messages
+               final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+               final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+               ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+               ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+               final String errorMsg = "Exceeded checkpoint failure tolerance 
number!";
+
+               CheckpointFailureManager checkpointFailureManager = new 
CheckpointFailureManager(0, () -> {
+                       throw new RuntimeException(errorMsg);
+               });
+
+               // set up the coordinator
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       chkConfig,
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new ExecutionVertex[] { vertex1, vertex2 },
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       new MemoryStateBackend(),
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       checkpointFailureManager);
+
+               try {
+                       // trigger the checkpoint. this should succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+                       long checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+                       PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().get(checkpointId);
+
+                       // acknowledge from one of the tasks
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId), 
TASK_MANAGER_LOCATION_INFO);
+                       assertFalse(checkpoint.isDiscarded());
+                       assertFalse(checkpoint.isFullyAcknowledged());
+
+                       // decline checkpoint from the other task
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+
+                       fail("Test failed.");
+               }
+               catch (Exception e) {
+                       //expected
+                       assertTrue(e instanceof RuntimeException);
+                       assertEquals(errorMsg, e.getMessage());
+               } finally {
+                       try {
+                               coord.shutdown(JobStatus.FINISHED);
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                               fail(e.getMessage());
+                       }
+               }
+       }
+
        /**
         * This test triggers a checkpoint and then sends a decline checkpoint 
message from
         * one of the tasks. The expected behaviour is that said checkpoint is 
discarded and a new
@@ -306,13 +401,18 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2);
 
                        // set up the coordinator and validate the initial state
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                600000,
                                600000,
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
@@ -321,7 +421,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -410,13 +510,18 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2);
 
                        // set up the coordinator and validate the initial state
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                600000,
                                600000,
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
@@ -425,7 +530,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -531,13 +636,18 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2);
 
                        // set up the coordinator and validate the initial state
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                600000,
                                600000,
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
@@ -546,7 +656,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -700,13 +810,18 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
 
                        // set up the coordinator and validate the initial state
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                600000,
                                600000,
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                new ExecutionVertex[] { ackVertex1, ackVertex2, 
ackVertex3 },
                                new ExecutionVertex[] { commitVertex },
@@ -715,7 +830,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -832,13 +947,18 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
 
                        // set up the coordinator and validate the initial state
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                600000,
                                600000,
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                new ExecutionVertex[] { ackVertex1, ackVertex2, 
ackVertex3 },
                                new ExecutionVertex[] { commitVertex },
@@ -847,7 +967,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -997,14 +1117,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                        // set up the coordinator
                        // the timeout for the checkpoint is a 200 milliseconds
-
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                600000,
                                200,
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { triggerVertex },
                                new ExecutionVertex[] { ackVertex1, ackVertex2 
},
                                new ExecutionVertex[] { commitVertex },
@@ -1013,7 +1137,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        // trigger a checkpoint, partially acknowledged
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1077,13 +1201,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        ExecutionVertex ackVertex2 = 
mockExecutionVertex(ackAttemptID2);
                        ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
 
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                200000,
                                200000,
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { triggerVertex },
                                new ExecutionVertex[] { ackVertex1, ackVertex2 
},
                                new ExecutionVertex[] { commitVertex },
@@ -1092,7 +1221,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1142,13 +1271,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                final long timestamp = 1L;
 
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jobId,
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        20000L,
                        20000L,
                        0L,
                        1,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jobId,
+                       chkConfig,
                        new ExecutionVertex[] { triggerVertex },
                        new ExecutionVertex[] {triggerVertex, ackVertex1, 
ackVertex2},
                        new ExecutionVertex[0],
@@ -1157,7 +1291,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1276,13 +1410,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                                }
                        }).when(execution).triggerCheckpoint(anyLong(), 
anyLong(), any(CheckpointOptions.class));
 
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                10,        // periodic interval is 10 ms
                                200000,    // timeout is very long (200 s)
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { triggerVertex },
                                new ExecutionVertex[] { ackVertex },
                                new ExecutionVertex[] { commitVertex },
@@ -1291,8 +1430,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
-
+                               failureManager);
 
                        coord.startCheckpointScheduler();
 
@@ -1367,13 +1505,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                final long delay = 50;
 
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+                       12,           // periodic interval is 12 ms
+                       200_000,     // timeout is very long (200 s)
+                       delay,       // 50 ms delay between checkpoints
+                       1,
+                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
                final CheckpointCoordinator coord = new CheckpointCoordinator(
                                jid,
-                               2,           // periodic interval is 2 ms
-                               200_000,     // timeout is very long (200 s)
-                               delay,       // 50 ms delay between checkpoints
-                               1,
-                               
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               chkConfig,
                                new ExecutionVertex[] { vertex },
                                new ExecutionVertex[] { vertex },
                                new ExecutionVertex[] { vertex },
@@ -1382,7 +1525,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                try {
                        coord.startCheckpointScheduler();
@@ -1442,13 +1585,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
 
                // set up the coordinator and validate the initial state
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jid,
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        600000,
                        600000,
                        0,
                        Integer.MAX_VALUE,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       chkConfig,
                        new ExecutionVertex[] { vertex1, vertex2 },
                        new ExecutionVertex[] { vertex1, vertex2 },
                        new ExecutionVertex[] { vertex1, vertex2 },
@@ -1457,7 +1605,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
                assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1595,13 +1743,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                StandaloneCheckpointIDCounter counter = new 
StandaloneCheckpointIDCounter();
 
                // set up the coordinator and validate the initial state
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jid,
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        600000,
                        600000,
                        0,
                        Integer.MAX_VALUE,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       chkConfig,
                        new ExecutionVertex[] { vertex1, vertex2 },
                        new ExecutionVertex[] { vertex1, vertex2 },
                        new ExecutionVertex[] { vertex1, vertex2 },
@@ -1610,7 +1763,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -1690,13 +1843,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                                return null;
                        }).when(execution).notifyCheckpointComplete(anyLong(), 
anyLong());
 
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                10,        // periodic interval is 10 ms
                                200000,    // timeout is very long (200 s)
                                0L,        // no extra delay
                                maxConcurrentAttempts,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { triggerVertex },
                                new ExecutionVertex[] { ackVertex },
                                new ExecutionVertex[] { commitVertex },
@@ -1705,7 +1863,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        coord.startCheckpointScheduler();
 
@@ -1765,13 +1923,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        ExecutionVertex ackVertex = 
mockExecutionVertex(ackAttemptID);
                        ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
 
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                10,        // periodic interval is 10 ms
                                200000,    // timeout is very long (200 s)
                                0L,        // no extra delay
                                maxConcurrentAttempts, // max two concurrent 
checkpoints
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { triggerVertex },
                                new ExecutionVertex[] { ackVertex },
                                new ExecutionVertex[] { commitVertex },
@@ -1780,7 +1943,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        coord.startCheckpointScheduler();
 
@@ -1843,13 +2006,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        final AtomicReference<ExecutionState> currentState = 
new AtomicReference<>(ExecutionState.CREATED);
                        
when(triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer(invocation
 -> currentState.get());
 
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                10,        // periodic interval is 10 ms
                                200000,    // timeout is very long (200 s)
                                0L,        // no extra delay
                                2, // max two concurrent checkpoints
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { triggerVertex },
                                new ExecutionVertex[] { ackVertex },
                                new ExecutionVertex[] { commitVertex },
@@ -1858,7 +2026,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        coord.startCheckpointScheduler();
 
@@ -1897,13 +2065,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                StandaloneCheckpointIDCounter checkpointIDCounter = new 
StandaloneCheckpointIDCounter();
 
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jobId,
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        100000,
                        200000,
                        0L,
                        1, // max one checkpoint at a time => should not affect 
savepoints
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jobId,
+                       chkConfig,
                        new ExecutionVertex[] { vertex1 },
                        new ExecutionVertex[] { vertex1 },
                        new ExecutionVertex[] { vertex1 },
@@ -1912,7 +2085,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                List<CompletableFuture<CompletedCheckpoint>> savepointFutures = 
new ArrayList<>();
 
@@ -1952,13 +2125,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
                ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
 
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jobId,
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        100000,
                        200000,
                        100000000L, // very long min delay => should not affect 
savepoints
                        1,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jobId,
+                       chkConfig,
                        new ExecutionVertex[] { vertex1 },
                        new ExecutionVertex[] { vertex1 },
                        new ExecutionVertex[] { vertex1 },
@@ -1967,7 +2145,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -2016,13 +2194,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                CompletedCheckpointStore store = new 
RecoverableCompletedCheckpointStore();
 
                // set up the coordinator and validate the initial state
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jid,
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        600000,
                        600000,
                        0,
                        Integer.MAX_VALUE,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       chkConfig,
                        arrayExecutionVertices,
                        arrayExecutionVertices,
                        arrayExecutionVertices,
@@ -2031,7 +2214,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2132,13 +2315,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                ExecutionVertex[] arrayExecutionVertices = 
allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
                // set up the coordinator and validate the initial state
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jid,
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        600000,
                        600000,
                        0,
                        Integer.MAX_VALUE,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       chkConfig,
                        arrayExecutionVertices,
                        arrayExecutionVertices,
                        arrayExecutionVertices,
@@ -2147,7 +2335,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2262,13 +2450,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                        CompletedCheckpointStore store = new 
RecoverableCompletedCheckpointStore(2);
 
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                600000,
                                600000,
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               isPreferCheckpoint,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { stateful1, stateless1 },
                                new ExecutionVertex[] { stateful1, stateless1 },
                                new ExecutionVertex[] { stateful1, stateless1 },
@@ -2277,7 +2470,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               isPreferCheckpoint);
+                               failureManager);
 
                        //trigger a checkpoint and wait to become a completed 
checkpoint
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -2423,13 +2616,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                                allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
 
                // set up the coordinator and validate the initial state
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jid,
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        600000,
                        600000,
                        0,
                        Integer.MAX_VALUE,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       chkConfig,
                        arrayExecutionVertices,
                        arrayExecutionVertices,
                        arrayExecutionVertices,
@@ -2438,7 +2636,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2709,13 +2907,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                
when(standaloneCompletedCheckpointStore.getLatestCheckpoint(false)).thenReturn(completedCheckpoint);
 
                // set up the coordinator and validate the initial state
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       new JobID(),
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        600000,
                        600000,
                        0,
                        Integer.MAX_VALUE,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       new JobID(),
+                       chkConfig,
                        newJobVertex1.getTaskVertices(),
                        newJobVertex1.getTaskVertices(),
                        newJobVertex1.getTaskVertices(),
@@ -2724,7 +2927,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                coord.restoreLatestCheckpointedState(tasks, false, true);
 
@@ -2862,13 +3065,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1);
 
                        // set up the coordinator and validate the initial state
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                600000,
                                600000,
                                0,
                                Integer.MAX_VALUE,
                                CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
@@ -2877,7 +3085,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -3298,13 +3506,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
 
                // set up the coordinator and validate the initial state
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       new JobID(),
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        600000,
                        600000,
                        0,
                        Integer.MAX_VALUE,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       new JobID(),
+                       chkConfig,
                        new ExecutionVertex[] { vertex1 },
                        new ExecutionVertex[] { vertex1 },
                        new ExecutionVertex[] { vertex1 },
@@ -3313,7 +3526,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                // Periodic
                try {
@@ -3520,13 +3733,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                ExecutionVertex vertex1 = mockExecutionVertex(new 
ExecutionAttemptID());
 
                // set up the coordinator and validate the initial state
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       new JobID(),
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        600000,
                        600000,
                        0,
                        Integer.MAX_VALUE,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       new JobID(),
+                       chkConfig,
                        new ExecutionVertex[]{vertex1},
                        new ExecutionVertex[]{vertex1},
                        new ExecutionVertex[]{vertex1},
@@ -3535,7 +3753,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
                coord.setCheckpointStatsTracker(tracker);
@@ -3560,13 +3778,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                StandaloneCompletedCheckpointStore store = new 
StandaloneCompletedCheckpointStore(1);
 
                // set up the coordinator and validate the initial state
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       new JobID(),
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        600000,
                        600000,
                        0,
                        Integer.MAX_VALUE,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       new JobID(),
+                       chkConfig,
                        new ExecutionVertex[]{vertex1},
                        new ExecutionVertex[]{vertex1},
                        new ExecutionVertex[]{vertex1},
@@ -3575,7 +3798,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                store.addCheckpoint(new CompletedCheckpoint(
                        new JobID(),
@@ -3624,13 +3847,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                final List<SharedStateRegistry> createdSharedStateRegistries = 
new ArrayList<>(2);
 
                // set up the coordinator and validate the initial state
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jid,
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        600000,
                        600000,
                        0,
                        Integer.MAX_VALUE,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       chkConfig,
                        arrayExecutionVertices,
                        arrayExecutionVertices,
                        arrayExecutionVertices,
@@ -3643,7 +3871,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                        
createdSharedStateRegistries.add(instance);
                                        return instance;
                                },
-                       false);
+                       failureManager);
 
                final int numCheckpoints = 3;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
new file mode 100644
index 0000000..2f9c151
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the checkpoint failure manager.
+ */
+public class CheckpointFailureManagerTest extends TestLogger {
+
+       @Test
+       public void testContinuousFailure() {
+               TestFailJobCallback callback = new TestFailJobCallback();
+               CheckpointFailureManager failureManager = new 
CheckpointFailureManager(2, callback);
+
+               failureManager.handleCheckpointException(new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 1);
+               failureManager.handleCheckpointException(
+                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2);
+
+               //ignore this
+               failureManager.handleCheckpointException(
+                       new 
CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3);
+
+               failureManager.handleCheckpointException(
+                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 4);
+               assertEquals(1, callback.getInvokeCounter());
+       }
+
+       @Test
+       public void testBreakContinuousFailure() {
+               TestFailJobCallback callback = new TestFailJobCallback();
+               CheckpointFailureManager failureManager = new 
CheckpointFailureManager(2, callback);
+
+               failureManager.handleCheckpointException(new 
CheckpointException(CheckpointFailureReason.EXCEPTION), 1);
+               failureManager.handleCheckpointException(
+                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2);
+
+               //ignore this
+               failureManager.handleCheckpointException(
+                       new 
CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3);
+
+               //reset
+               failureManager.handleCheckpointSuccess(4);
+
+               failureManager.handleCheckpointException(
+                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED), 5);
+               assertEquals(0, callback.getInvokeCounter());
+       }
+
+       @Test
+       public void testTotalCountValue() {
+               TestFailJobCallback callback = new TestFailJobCallback();
+               CheckpointFailureManager failureManager = new 
CheckpointFailureManager(0, callback);
+               for (CheckpointFailureReason reason : 
CheckpointFailureReason.values()) {
+                       failureManager.handleCheckpointException(new 
CheckpointException(reason), -1);
+               }
+
+               assertEquals(1, callback.getInvokeCounter());
+       }
+
+       @Test
+       public void testIgnoreOneCheckpointRepeatedlyCountMultiTimes() {
+               TestFailJobCallback callback = new TestFailJobCallback();
+               CheckpointFailureManager failureManager = new 
CheckpointFailureManager(2, callback);
+
+               failureManager.handleCheckpointException(new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 1);
+               failureManager.handleCheckpointException(
+                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2);
+
+               //ignore this
+               failureManager.handleCheckpointException(
+                       new 
CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3);
+
+               //ignore repeatedly report from one checkpoint
+               failureManager.handleCheckpointException(
+                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2);
+               assertEquals(0, callback.getInvokeCounter());
+       }
+
+       /**
+        * A failure handler callback for testing.
+        */
+       private static class TestFailJobCallback implements 
CheckpointFailureManager.FailJobCallback {
+
+               private int invokeCounter = 0;
+
+               @Override
+               public void failJob() {
+                       invokeCounter++;
+               }
+
+               public int getInvokeCounter() {
+                       return invokeCounter;
+               }
+       }
+
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
index 9ece607..fa89018 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
@@ -116,8 +116,11 @@ public abstract class CheckpointIDCounterTest extends 
TestLogger {
                        counter.start();
 
                        assertEquals(1, counter.getAndIncrement());
+                       assertEquals(2, counter.get());
                        assertEquals(2, counter.getAndIncrement());
+                       assertEquals(3, counter.get());
                        assertEquals(3, counter.getAndIncrement());
+                       assertEquals(4, counter.get());
                        assertEquals(4, counter.getAndIncrement());
                }
                finally {
@@ -177,6 +180,7 @@ public abstract class CheckpointIDCounterTest extends 
TestLogger {
                        }
 
                        // The final count
+                       assertEquals(expectedTotal + 1, counter.get());
                        assertEquals(expectedTotal + 1, 
counter.getAndIncrement());
                }
                finally {
@@ -198,7 +202,9 @@ public abstract class CheckpointIDCounterTest extends 
TestLogger {
 
                // Test setCount
                counter.setCount(1337);
+               assertEquals(1337, counter.get());
                assertEquals(1337, counter.getAndIncrement());
+               assertEquals(1338, counter.get());
                assertEquals(1338, counter.getAndIncrement());
 
                counter.shutdown(JobStatus.FINISHED);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index bf438aa..24e0b85 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -91,7 +91,8 @@ public class CheckpointSettingsSerializableTest extends 
TestLogger {
                                        1,
                                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
                                        true,
-                                       false),
+                                       false,
+                                       0),
                                new SerializedValue<StateBackend>(new 
CustomStateBackend(outOfClassPath)),
                                serHooks);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 8f4f607..08a7a8c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -37,6 +38,7 @@ import org.apache.flink.util.SerializableObject;
 
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.hamcrest.MockitoHamcrest;
@@ -61,6 +63,13 @@ public class CheckpointStateRestoreTest {
 
        private static final String TASK_MANAGER_LOCATION_INFO = "Unknown 
location";
 
+       private CheckpointFailureManager failureManager;
+
+       @Before
+       public void setUp() throws Exception {
+               failureManager = new CheckpointFailureManager(0, () -> {});
+       }
+
        /**
         * Tests that on restore the task state is reset for each stateful task.
         */
@@ -97,13 +106,18 @@ public class CheckpointStateRestoreTest {
                        map.put(statefulId, stateful);
                        map.put(statelessId, stateless);
 
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                200000L,
                                200000L,
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               chkConfig,
                                new ExecutionVertex[] { stateful1, stateful2, 
stateful3, stateless1, stateless2 },
                                new ExecutionVertex[] { stateful1, stateful2, 
stateful3, stateless1, stateless2 },
                                new ExecutionVertex[0],
@@ -112,7 +126,7 @@ public class CheckpointStateRestoreTest {
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        // create ourselves a checkpoint with state
                        final long timestamp = 34623786L;
@@ -176,13 +190,18 @@ public class CheckpointStateRestoreTest {
        @Test
        public void testNoCheckpointAvailable() {
                try {
-                       CheckpointCoordinator coord = new CheckpointCoordinator(
-                               new JobID(),
+                       CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                                200000L,
                                200000L,
                                0,
                                Integer.MAX_VALUE,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               false,
+                               0);
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               new JobID(),
+                               chkConfig,
                                new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
                                new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
                                new ExecutionVertex[0],
@@ -191,7 +210,7 @@ public class CheckpointStateRestoreTest {
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
                                SharedStateRegistry.DEFAULT_FACTORY,
-                               false);
+                               failureManager);
 
                        try {
                                coord.restoreLatestCheckpointedState(new 
HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
@@ -235,13 +254,19 @@ public class CheckpointStateRestoreTest {
                tasks.put(jobVertexId1, jobVertex1);
                tasks.put(jobVertexId2, jobVertex2);
 
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                       new JobID(),
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        Integer.MAX_VALUE,
                        Integer.MAX_VALUE,
                        0,
                        Integer.MAX_VALUE,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       new JobID(),
+                       chkConfig,
                        new ExecutionVertex[] {},
                        new ExecutionVertex[] {},
                        new ExecutionVertex[] {},
@@ -250,7 +275,7 @@ public class CheckpointStateRestoreTest {
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
                        SharedStateRegistry.DEFAULT_FACTORY,
-                       false);
+                       failureManager);
 
                // --- (2) Checkpoint misses state for a jobVertex (should 
work) ---
                Map<OperatorID, OperatorState> checkpointTaskStates = new 
HashMap<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index fce0f3a..69d8997 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -65,7 +65,8 @@ public class CheckpointStatsTrackerTest {
                                123,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
                                false,
-                               false
+                               false,
+                               0
                        ),
                        null);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index bdb9975..267b685 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -153,12 +154,18 @@ public class ExecutionGraphCheckpointCoordinatorTest 
extends TestLogger {
 
                
executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
 
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+                       100,
+                       100,
+                       100,
+                       1,
+                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+
                executionGraph.enableCheckpointing(
-                               100,
-                               100,
-                               100,
-                               1,
-                               
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               chkConfig,
                                Collections.emptyList(),
                                Collections.emptyList(),
                                Collections.emptyList(),
@@ -166,8 +173,7 @@ public class ExecutionGraphCheckpointCoordinatorTest 
extends TestLogger {
                                counter,
                                store,
                                new MemoryStateBackend(),
-                               CheckpointStatsTrackerTest.createTestTracker(),
-                               false);
+                               CheckpointStatsTrackerTest.createTestTracker());
 
                JobVertex jobVertex = new JobVertex("MockVertex");
                jobVertex.setInvokableClass(AbstractInvokable.class);
@@ -199,6 +205,11 @@ public class ExecutionGraphCheckpointCoordinatorTest 
extends TestLogger {
                }
 
                @Override
+               public long get() {
+                       throw new UnsupportedOperationException("Not 
implemented.");
+               }
+
+               @Override
                public void setCount(long newId) {
                        throw new UnsupportedOperationException("Not 
implemented.");
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 4098643..0d481d6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -127,12 +127,18 @@ public class ArchivedExecutionGraphTest extends 
TestLogger {
                                mock(CheckpointCoordinatorConfiguration.class),
                                new UnregisteredMetricsGroup());
 
-               runtimeGraph.enableCheckpointing(
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
                        100,
                        100,
                        100,
                        1,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+
+               runtimeGraph.enableCheckpointing(
+                       chkConfig,
                        Collections.<ExecutionJobVertex>emptyList(),
                        Collections.<ExecutionJobVertex>emptyList(),
                        Collections.<ExecutionJobVertex>emptyList(),
@@ -140,8 +146,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        new MemoryStateBackend(),
-                       statsTracker,
-                       false);
+                       statsTracker);
 
                runtimeGraph.setJsonPlan("{}");
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
index 9675631..64c9ae4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
@@ -363,7 +363,8 @@ public class ConcurrentFailoverStrategyExecutionGraphTest 
extends TestLogger {
                        3,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
                        true,
-                       false);
+                       false,
+                       0);
 
                final ExecutionGraph graph = createSampleGraph(
                        jid,
@@ -386,11 +387,7 @@ public class ConcurrentFailoverStrategyExecutionGraphTest 
extends TestLogger {
                final StandaloneCheckpointIDCounter 
standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
 
                graph.enableCheckpointing(
-                       
checkpointCoordinatorConfiguration.getCheckpointInterval(),
-                       
checkpointCoordinatorConfiguration.getCheckpointTimeout(),
-                       
checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints(),
-                       
checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints(),
-                       
checkpointCoordinatorConfiguration.getCheckpointRetentionPolicy(),
+                       checkpointCoordinatorConfiguration,
                        allVertices,
                        allVertices,
                        allVertices,
@@ -402,8 +399,7 @@ public class ConcurrentFailoverStrategyExecutionGraphTest 
extends TestLogger {
                                1,
                                allVertices,
                                checkpointCoordinatorConfiguration,
-                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()),
-                       false);
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()));
 
                final CheckpointCoordinator checkpointCoordinator = 
graph.getCheckpointCoordinator();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index fac040a..85d3258 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -803,7 +803,8 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
                                        1,
                                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
                                        false,
-                                       false),
+                                       false,
+                                       0),
                                null));
 
                final Time timeout = Time.seconds(10L);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index c953b5d..7575dcd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -605,12 +605,17 @@ public class FailoverRegionTest extends TestLogger {
 
        private static void enableCheckpointing(ExecutionGraph eg) {
                ArrayList<ExecutionJobVertex> jobVertices = new 
ArrayList<>(eg.getAllVertices().values());
+               CheckpointCoordinatorConfiguration chkConfig = new 
CheckpointCoordinatorConfiguration(
+                       1000,
+                       100,
+                       0,
+                       1,
+                       CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION,
+                       true,
+                       false,
+                       0);
                eg.enableCheckpointing(
-                               1000,
-                               100,
-                               0,
-                               1,
-                               
CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION,
+                               chkConfig,
                                jobVertices,
                                jobVertices,
                                jobVertices,
@@ -622,8 +627,7 @@ public class FailoverRegionTest extends TestLogger {
                                        0,
                                        jobVertices,
                                        
mock(CheckpointCoordinatorConfiguration.class),
-                                       new UnregisteredMetricsGroup()),
-                               false);
+                                       new UnregisteredMetricsGroup()));
        }
 
        /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index d778cf0..02bf4c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -324,9 +324,9 @@ public class JobGraphTest extends TestLogger {
        }
 
        @Test
-       public void checkpointingIsEnabledIfIntervalIsPositive() {
+       public void checkpointingIsEnabledIfIntervalIsqAndLegal() {
                final JobGraph jobGraph = new JobGraph();
-               
jobGraph.setSnapshotSettings(createCheckpointSettingsWithInterval(1));
+               
jobGraph.setSnapshotSettings(createCheckpointSettingsWithInterval(10));
 
                assertTrue(jobGraph.isCheckpointingEnabled());
        }
@@ -347,7 +347,8 @@ public class JobGraphTest extends TestLogger {
                        Integer.MAX_VALUE,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
                        true,
-                       false);
+                       false,
+                       0);
 
                return new JobCheckpointingSettings(
                        Collections.emptyList(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
index 4665a63..45f5773 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -50,7 +50,8 @@ public class JobCheckpointingSettingsTest {
                                12,
                                CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
                                false,
-                               false),
+                               false,
+                               0),
                        new SerializedValue<>(new MemoryStateBackend()));
 
                JobCheckpointingSettings copy = 
CommonTestUtils.createCopySerializable(settings);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index c2b0bac..58de718 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -1789,7 +1789,8 @@ public class JobMasterTest extends TestLogger {
                        1,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
                        true,
-                       false);
+                       false,
+                       0);
                final JobCheckpointingSettings checkpointingSettings = new 
JobCheckpointingSettings(
                        Collections.emptyList(),
                        Collections.emptyList(),
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 8b9ad55..c2c3536 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -75,6 +75,9 @@ public class CheckpointConfig implements java.io.Serializable 
{
        /** Determines if a job will fallback to checkpoint when there is a 
more recent savepoint. **/
        private boolean preferCheckpointForRecovery = false;
 
+       /** Determines the threshold that we tolerance checkpoint failure 
number. */
+       private int tolerableCheckpointFailureNumber = 0;
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -125,8 +128,8 @@ public class CheckpointConfig implements 
java.io.Serializable {
         * @param checkpointInterval The checkpoint interval, in milliseconds.
         */
        public void setCheckpointInterval(long checkpointInterval) {
-               if (checkpointInterval <= 0) {
-                       throw new IllegalArgumentException("Checkpoint interval 
must be larger than zero");
+               if (checkpointInterval < 10) {
+                       throw new IllegalArgumentException("Checkpoint interval 
must be larger than or equal to 10ms");
                }
                this.checkpointInterval = checkpointInterval;
        }
@@ -146,8 +149,8 @@ public class CheckpointConfig implements 
java.io.Serializable {
         * @param checkpointTimeout The checkpoint timeout, in milliseconds.
         */
        public void setCheckpointTimeout(long checkpointTimeout) {
-               if (checkpointTimeout <= 0) {
-                       throw new IllegalArgumentException("Checkpoint timeout 
must be larger than zero");
+               if (checkpointTimeout < 10) {
+                       throw new IllegalArgumentException("Checkpoint timeout 
must be larger than or equal to 10ms");
                }
                this.checkpointTimeout = checkpointTimeout;
        }
@@ -253,6 +256,22 @@ public class CheckpointConfig implements 
java.io.Serializable {
        }
 
        /**
+        * Get the tolerable checkpoint failure number which used by the 
checkpoint failure manager
+        * to determine when we need to fail the job.
+        */
+       public int getTolerableCheckpointFailureNumber() {
+               return tolerableCheckpointFailureNumber;
+       }
+
+       /**
+        * Set the tolerable checkpoint failure number, the default value is 0 
that means
+        * we do not tolerance any checkpoint failure.
+        */
+       public void setTolerableCheckpointFailureNumber(int 
tolerableCheckpointFailureNumber) {
+               this.tolerableCheckpointFailureNumber = 
tolerableCheckpointFailureNumber;
+       }
+
+       /**
         * Enables checkpoints to be persisted externally.
         *
         * <p>Externalized checkpoints write their meta data out to persistent
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index edf0314..e191dea 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -606,7 +606,7 @@ public class StreamingJobGraphGenerator {
                CheckpointConfig cfg = streamGraph.getCheckpointConfig();
 
                long interval = cfg.getCheckpointInterval();
-               if (interval > 0) {
+               if (interval >= 10) {
                        ExecutionConfig executionConfig = 
streamGraph.getExecutionConfig();
                        // propagate the expected behaviour for checkpoint 
errors to task.
                        
executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
@@ -724,7 +724,8 @@ public class StreamingJobGraphGenerator {
                                cfg.getMaxConcurrentCheckpoints(),
                                retentionAfterTermination,
                                isExactlyOnce,
-                               cfg.isPreferCheckpointForRecovery()),
+                               cfg.isPreferCheckpointForRecovery(),
+                               cfg.getTolerableCheckpointFailureNumber()),
                        serializedStateBackend,
                        serializedHooks);
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index b507b38..63fa1ac 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -19,13 +19,10 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
-import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
-import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -277,7 +274,10 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                        currentCheckpointId);
 
                                // let the task know we are not completing this
-                               notifyAbort(currentCheckpointId, new 
CheckpointDeclineSubsumedException(barrierId));
+                               notifyAbort(currentCheckpointId,
+                                       new CheckpointException(
+                                               "Barrier id: " + barrierId,
+                                               
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
 
                                // abort the current checkpoint
                                releaseBlocksAndResetBarriers();
@@ -359,7 +359,11 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                startOfAlignmentTimestamp = 0L;
                                latestAlignmentDurationNanos = 0L;
 
-                               notifyAbort(currentCheckpointId, new 
CheckpointDeclineSubsumedException(barrierId));
+                               notifyAbort(currentCheckpointId,
+                                       new CheckpointException(
+                                               "Barrier id: " + barrierId,
+                                               
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED
+                                       ));
 
                                notifyAbortOnCancellationBarrier(barrierId);
                        }
@@ -394,7 +398,8 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
                if (numBarriersReceived > 0) {
                        // let the task know we skip a checkpoint
-                       notifyAbort(currentCheckpointId, new 
InputEndOfStreamException());
+                       notifyAbort(currentCheckpointId,
+                               new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
 
                        // no chance to complete this checkpoint
                        releaseBlocksAndResetBarriers();
@@ -420,10 +425,11 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        }
 
        private void notifyAbortOnCancellationBarrier(long checkpointId) throws 
Exception {
-               notifyAbort(checkpointId, new 
CheckpointDeclineOnCancellationBarrierException());
+               notifyAbort(checkpointId,
+                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
        }
 
-       private void notifyAbort(long checkpointId, CheckpointDeclineException 
cause) throws Exception {
+       private void notifyAbort(long checkpointId, CheckpointException cause) 
throws Exception {
                if (toNotifyOnCheckpoint != null) {
                        
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
                }
@@ -438,7 +444,10 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                maxBufferedBytes);
 
                        releaseBlocksAndResetBarriers();
-                       notifyAbort(currentCheckpointId, new 
AlignmentLimitExceededException(maxBufferedBytes));
+                       notifyAbort(currentCheckpointId,
+                               new CheckpointException(
+                                       "Max buffered bytes: " + 
maxBufferedBytes,
+                                       
CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED));
                }
        }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index ee0b3a2..49d2991 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,10 +19,11 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
@@ -290,7 +291,8 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
        private void notifyAbort(long checkpointId) throws Exception {
                if (toNotifyOnCheckpoint != null) {
                        toNotifyOnCheckpoint.abortCheckpointOnBarrier(
-                                       checkpointId, new 
CheckpointDeclineOnCancellationBarrierException());
+                               checkpointId,
+                               new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
                }
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 2e4ba51..0a284e1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -136,7 +136,8 @@ public class BarrierBufferAlignmentLimitTest {
                // trying to pull the next makes the alignment overflow - so 
buffered buffers are replayed
                check(sequence[5], buffer.pollNext().get());
                validateAlignmentTime(startTs, buffer);
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L), 
any(AlignmentLimitExceededException.class));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L),
+                       argThat(new 
BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)));
 
                // playing back buffered events
                check(sequence[7], buffer.pollNext().get());
@@ -231,7 +232,8 @@ public class BarrierBufferAlignmentLimitTest {
                // checkpoint alignment aborted due to too much data
                check(sequence[4], buffer.pollNext().get());
                validateAlignmentTime(startTs, buffer);
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), 
any(AlignmentLimitExceededException.class));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L),
+                       argThat(new 
BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)));
 
                // replay buffered data - in the middle, the alignment for 
checkpoint 4 starts
                check(sequence[6], buffer.pollNext().get());
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
index 6475bfc..c9981b5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
@@ -20,12 +20,11 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
-import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -50,7 +49,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
@@ -498,7 +496,8 @@ public abstract class BarrierBufferTestBase {
                check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
                assertEquals(3L, buffer.getCurrentCheckpointId());
                validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
-               verify(toNotify).abortCheckpointOnBarrier(eq(2L), 
isA(CheckpointDeclineSubsumedException.class));
+               verify(toNotify).abortCheckpointOnBarrier(eq(2L),
+                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)));
                check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 3 alignment in progress
@@ -506,7 +505,8 @@ public abstract class BarrierBufferTestBase {
 
                // checkpoint 3 aborted (end of partition)
                check(sequence[20], buffer.pollNext().get(), PAGE_SIZE);
-               verify(toNotify).abortCheckpointOnBarrier(eq(3L), 
isA(InputEndOfStreamException.class));
+               verify(toNotify).abortCheckpointOnBarrier(eq(3L),
+                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM)));
 
                // replay buffered data from checkpoint 3
                check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
@@ -854,13 +854,15 @@ public abstract class BarrierBufferTestBase {
                check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
                assertEquals(5L, buffer.getCurrentCheckpointId());
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L),
+                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
                assertEquals(6L, buffer.getCurrentCheckpointId());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L),
+                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
        }
 
@@ -923,7 +925,8 @@ public abstract class BarrierBufferTestBase {
                // canceled checkpoint on last barrier
                startTs = System.nanoTime();
                check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L),
+                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
                validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
                check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
 
@@ -938,7 +941,8 @@ public abstract class BarrierBufferTestBase {
 
                // this checkpoint gets immediately canceled
                check(sequence[24], buffer.pollNext().get(), PAGE_SIZE);
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L),
+                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                // some buffers
@@ -954,7 +958,8 @@ public abstract class BarrierBufferTestBase {
                check(sequence[33], buffer.pollNext().get(), PAGE_SIZE);
 
                check(sequence[37], buffer.pollNext().get(), PAGE_SIZE);
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L),
+                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
        }
 
@@ -1008,7 +1013,8 @@ public abstract class BarrierBufferTestBase {
 
                // re-read the queued cancellation barriers
                check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L),
+                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
@@ -1025,7 +1031,8 @@ public abstract class BarrierBufferTestBase {
 
                // no further checkpoint (abort) notifications
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(),
+                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
        }
 
        /**
@@ -1085,7 +1092,8 @@ public abstract class BarrierBufferTestBase {
                // cancelled by cancellation barrier
                check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
                validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
-               verify(toNotify).abortCheckpointOnBarrier(eq(1L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+               verify(toNotify).abortCheckpointOnBarrier(eq(1L),
+                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
 
                // the next checkpoint alignment starts now
                startTs = System.nanoTime();
@@ -1160,7 +1168,8 @@ public abstract class BarrierBufferTestBase {
                // future barrier aborts checkpoint
                startTs = System.nanoTime();
                check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), 
any(CheckpointDeclineSubsumedException.class));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L),
+                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)));
                check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
 
                // alignment of next checkpoint
@@ -1329,4 +1338,28 @@ public abstract class BarrierBufferTestBase {
                        description.appendText("CheckpointMetaData - id = " + 
checkpointId);
                }
        }
+
+       /**
+        * A validation matcher for checkpoint exception against failure reason.
+        */
+       public static class CheckpointExceptionMatcher extends 
BaseMatcher<CheckpointException> {
+
+               private final CheckpointFailureReason failureReason;
+
+               public CheckpointExceptionMatcher(CheckpointFailureReason 
failureReason) {
+                       this.failureReason = failureReason;
+               }
+
+               @Override
+               public boolean matches(Object o) {
+                       return o != null &&
+                               o.getClass() == CheckpointException.class &&
+                               ((CheckpointException) 
o).getCheckpointFailureReason().equals(failureReason);
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       description.appendText("CheckpointException - reason = 
" + failureReason);
+               }
+       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index d1b3697..56c3889 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+import org.apache.flink.streaming.runtime.io.BarrierBufferTestBase;
 
 import org.junit.Test;
 
@@ -38,11 +39,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.hamcrest.MockitoHamcrest.argThat;
 
 /**
  * Test checkpoint cancellation barrier.
@@ -108,7 +109,8 @@ public class StreamTaskCancellationBarrierTest {
                testHarness.waitForInputProcessing();
 
                // the decline call should go to the coordinator
-               verify(environment, times(1)).declineCheckpoint(eq(2L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+               verify(environment, times(1)).declineCheckpoint(eq(2L),
+                       argThat(new 
BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
 
                // a cancellation barrier should be downstream
                Object result = testHarness.getOutput().poll();
@@ -152,7 +154,8 @@ public class StreamTaskCancellationBarrierTest {
                testHarness.waitForInputProcessing();
 
                // the decline call should go to the coordinator
-               verify(environment, times(1)).declineCheckpoint(eq(2L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+               verify(environment, times(1)).declineCheckpoint(eq(2L),
+                       argThat(new 
BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
 
                // a cancellation barrier should be downstream
                Object result = testHarness.getOutput().poll();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index 57a5121..1e383f8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -245,7 +245,8 @@ public class JobMasterStopWithSavepointIT extends 
AbstractTestBase {
                                                1,
                                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
                                                true,
-                                               false),
+                                               false,
+                                               0),
                                null));
 
                clusterClient.submitJob(jobGraph, 
ClassLoader.getSystemClassLoader());
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index 6635e31..9a1b4c1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -107,7 +107,8 @@ public class JobMasterTriggerSavepointITCase extends 
AbstractTestBase {
                                1,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
                                true,
-                               false),
+                               false,
+                               0),
                        null));
 
                clusterClient.submitJob(jobGraph, 
ClassLoader.getSystemClassLoader());
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
index 332584d..bd9cd75 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
@@ -609,7 +609,7 @@ public class IterateITCase extends AbstractTestBase {
                                // Test force checkpointing
 
                                try {
-                                       env.enableCheckpointing(1, 
CheckpointingMode.EXACTLY_ONCE, false);
+                                       env.enableCheckpointing(10, 
CheckpointingMode.EXACTLY_ONCE, false);
                                        env.execute();
 
                                        // this statement should never be 
reached
@@ -618,7 +618,7 @@ public class IterateITCase extends AbstractTestBase {
                                        // expected behaviour
                                }
 
-                               env.enableCheckpointing(1, 
CheckpointingMode.EXACTLY_ONCE, true);
+                               env.enableCheckpointing(10, 
CheckpointingMode.EXACTLY_ONCE, true);
                                env.getStreamGraph().getJobGraph();
 
                                break; // success

Reply via email to