[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16668547#comment-16668547
 ] 

ASF GitHub Bot commented on FLINK-10074:
----------------------------------------

yanghua closed pull request #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 7d88f0d94ea..b091192f0a5 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1354,6 +1354,8 @@ public void runBrokerFailureTest() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(parallelism);
                env.enableCheckpointing(500);
+               env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
+               
env.getCheckpointConfig().setTolerableFailureNumber(Integer.MAX_VALUE);
                env.setRestartStrategy(RestartStrategies.noRestart());
                env.getConfig().disableSysoutLogging();
 
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 8c0d7665fed..23507bfbdf8 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -57,6 +57,8 @@ public static void generateRandomizedIntegerSequence(
                env.setParallelism(numPartitions);
                env.getConfig().disableSysoutLogging();
                env.setRestartStrategy(RestartStrategies.noRestart());
+               env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
+               
env.getCheckpointConfig().setTolerableFailureNumber(Integer.MAX_VALUE);
 
                DataStream<Integer> stream = env.addSource(
                                new RichParallelSourceFunction<Integer>() {
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 6b7caaac6ec..719560ef860 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -883,16 +883,6 @@ public boolean isFailTaskOnCheckpointError() {
                return failTaskOnCheckpointError;
        }
 
-       /**
-        * This method is visible because of the way the configuration is 
currently forwarded from the checkpoint config to
-        * the task. This should not be called by the user, please use 
CheckpointConfig.setFailOnCheckpointingErrors(...)
-        * instead.
-        */
-       @Internal
-       public void setFailTaskOnCheckpointError(boolean 
failTaskOnCheckpointError) {
-               this.failTaskOnCheckpointError = failTaskOnCheckpointError;
-       }
-
        @Override
        public boolean equals(Object obj) {
                if (obj instanceof ExecutionConfig) {
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 179cf9c6de8..f4714c2cce6 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -100,6 +100,8 @@ public void testJobManagerJMXMetricAccess() throws 
Exception {
                                        50,
                                        5,
                                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                                       true,
+                                       0,
                                        true),
                                null));
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 57337b6286f..a541b6ec444 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -180,6 +180,8 @@
        /** Registry that tracks state which is shared across (incremental) 
checkpoints */
        private SharedStateRegistry sharedStateRegistry;
 
+       private final CheckpointFailureManager failureManager;
+
        // 
--------------------------------------------------------------------------------------------
 
        public CheckpointCoordinator(
@@ -196,10 +198,12 @@ public CheckpointCoordinator(
                        CompletedCheckpointStore completedCheckpointStore,
                        StateBackend checkpointStateBackend,
                        Executor executor,
-                       SharedStateRegistryFactory sharedStateRegistryFactory) {
+                       SharedStateRegistryFactory sharedStateRegistryFactory,
+                       CheckpointFailureManager failureManager) {
 
                // sanity checks
                checkNotNull(checkpointStateBackend);
+               checkNotNull(failureManager);
                checkArgument(baseInterval > 0, "Checkpoint base interval must 
be larger than zero");
                checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must 
be larger than zero");
                checkArgument(minPauseBetweenCheckpoints >= 0, 
"minPauseBetweenCheckpoints must be >= 0");
@@ -230,6 +234,7 @@ public CheckpointCoordinator(
                this.executor = checkNotNull(executor);
                this.sharedStateRegistryFactory = 
checkNotNull(sharedStateRegistryFactory);
                this.sharedStateRegistry = 
sharedStateRegistryFactory.create(executor);
+               this.failureManager = failureManager;
 
                this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
                this.masterHooks = new HashMap<>();
@@ -546,6 +551,8 @@ public CheckpointTriggerResult triggerCheckpoint(
                                                LOG.info("Checkpoint {} of job 
{} expired before completing.", checkpointID, job);
 
                                                checkpoint.abortExpired();
+                                               
failureManager.tryHandleFailure(new Throwable("Checkpoint " + checkpointID + " 
of job " +
+                                                       job + " expired before 
completing"), checkpointID);
                                                
pendingCheckpoints.remove(checkpointID);
                                                
rememberRecentCheckpointId(checkpointID);
 
@@ -642,6 +649,8 @@ else if (!props.forceCheckpoint()) {
 
                                if (!checkpoint.isDiscarded()) {
                                        checkpoint.abortError(new 
Exception("Failed to trigger checkpoint", t));
+                                       failureManager.tryHandleFailure(
+                                               new Exception("Failed to 
trigger checkpoint : " + checkpointID, t), checkpointID);
                                }
 
                                try {
@@ -831,6 +840,8 @@ private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) thro
                                // abort the current pending checkpoint if we 
fails to finalize the pending checkpoint.
                                if (!pendingCheckpoint.isDiscarded()) {
                                        pendingCheckpoint.abortError(e1);
+                                       failureManager.tryHandleFailure(
+                                               new Exception("Checkpoint : " + 
checkpointId, e1), checkpointId);
                                }
 
                                throw new CheckpointException("Could not 
finalize the pending checkpoint " + checkpointId + '.', e1);
@@ -874,6 +885,8 @@ public void run() {
                LOG.info("Completed checkpoint {} for job {} ({} bytes in {} 
ms).", checkpointId, job,
                        completedCheckpoint.getStateSize(), 
completedCheckpoint.getDuration());
 
+               failureManager.resetCounter();
+
                if (LOG.isDebugEnabled()) {
                        StringBuilder builder = new StringBuilder();
                        builder.append("Checkpoint state: ");
@@ -1147,10 +1160,6 @@ public CompletedCheckpointStore getCheckpointStore() {
                return completedCheckpointStore;
        }
 
-       public CheckpointIDCounter getCheckpointIdCounter() {
-               return checkpointIdCounter;
-       }
-
        public long getCheckpointTimeout() {
                return checkpointTimeout;
        }
@@ -1254,6 +1263,8 @@ private void discardCheckpoint(PendingCheckpoint 
pendingCheckpoint, @Nullable Th
                LOG.info("Discarding checkpoint {} of job {} because: {}", 
checkpointId, job, reason);
 
                pendingCheckpoint.abortDeclined();
+               failureManager.tryHandleFailure("Discarding checkpoint " + 
checkpointId +" of job " +
+                       job + " because: " + reason, checkpointId);
                rememberRecentCheckpointId(checkpointId);
 
                // we don't have to schedule another "dissolving" checkpoint 
any more because the
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
new file mode 100644
index 00000000000..19ec7368d5a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The checkpoint failure manager to manage how to process checkpoint failure.
+ */
+public class CheckpointFailureManager {
+
+       private final boolean failOnCheckpointingErrors;
+       private final int tolerableCpFailureNumber;
+       private final AtomicInteger continuousFailureCounter;
+       private final ExecutionGraph executionGraph;
+       private final Object lock = new Object();
+
+       public CheckpointFailureManager(
+               boolean failOnCheckpointingErrors,
+               int tolerableCpFailureNumber,
+               ExecutionGraph executionGraph) {
+               this.failOnCheckpointingErrors = failOnCheckpointingErrors;
+               this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+               this.continuousFailureCounter = new AtomicInteger(0);
+               this.executionGraph = checkNotNull(executionGraph);
+       }
+
+       @VisibleForTesting
+       public AtomicInteger getContinuousFailureCounter() {
+               return continuousFailureCounter;
+       }
+
+       public void resetCounter() {
+               continuousFailureCounter.set(0);
+       }
+
+       public void tryHandleFailure(String reason, long checkpointId) {
+               synchronized (lock) {
+                       if (failOnCheckpointingErrors ||
+                               continuousFailureCounter.incrementAndGet() > 
tolerableCpFailureNumber) {
+                               executionGraph.failGlobal(new 
Throwable(reason));
+                       }
+               }
+       }
+
+       public void tryHandleFailure(Throwable cause, long checkpointId) {
+               synchronized (lock) {
+                       if (failOnCheckpointingErrors ||
+                               continuousFailureCounter.incrementAndGet() > 
tolerableCpFailureNumber) {
+                               executionGraph.failGlobal(cause);
+                       }
+               }
+       }
+
+       public void tryHandleFailure(CheckpointDeclineReason reason, long 
checkpointId) {
+               synchronized (lock) {
+                       if (failOnCheckpointingErrors ||
+                               continuousFailureCounter.incrementAndGet() > 
tolerableCpFailureNumber) {
+                               executionGraph.failGlobal(
+                                       new Throwable("Checkpoint : " + 
checkpointId + reason.message()));
+                       }
+               }
+       }
+
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index acb1e16fe71..8f11748de2b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -34,6 +34,7 @@
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -475,7 +476,8 @@ public void enableCheckpointing(
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore checkpointStore,
                        StateBackend checkpointStateBackend,
-                       CheckpointStatsTracker statsTracker) {
+                       CheckpointStatsTracker statsTracker,
+                       CheckpointFailureManager failureManager) {
 
                // simple sanity checks
                checkArgument(interval >= 10, "checkpoint interval must not be 
below 10ms");
@@ -505,7 +507,8 @@ public void enableCheckpointing(
                        checkpointStore,
                        checkpointStateBackend,
                        ioExecutor,
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       failureManager);
 
                // register the master hooks on the checkpoint coordinator
                for (MasterTriggerRestoreHook<?> hook : masterHooks) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index f1a861d2ca1..0d2286ea431 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -28,6 +28,7 @@
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
@@ -342,6 +343,11 @@ public static ExecutionGraph buildGraph(
 
                        final CheckpointCoordinatorConfiguration chkConfig = 
snapshotSettings.getCheckpointCoordinatorConfiguration();
 
+                       CheckpointFailureManager failureManager = new 
CheckpointFailureManager(
+                               chkConfig.isFailOnCheckpointingErrors(),
+                               chkConfig.getTolerableCpFailureNumber(),
+                               executionGraph);
+
                        executionGraph.enableCheckpointing(
                                chkConfig.getCheckpointInterval(),
                                chkConfig.getCheckpointTimeout(),
@@ -355,7 +361,8 @@ public static ExecutionGraph buildGraph(
                                checkpointIdCounter,
                                completedCheckpoints,
                                rootBackend,
-                               checkpointStatsTracker);
+                               checkpointStatsTracker,
+                               failureManager);
                }
 
                // create all the metrics for the Execution Graph
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index 4ecbda57b28..348c42e33e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -42,6 +42,10 @@
 
        private final int maxConcurrentCheckpoints;
 
+       private final int tolerableCpFailureNumber;
+
+       private final boolean failOnCheckpointingErrors;
+
        /** Settings for what to do with checkpoints when a job finishes. */
        private final CheckpointRetentionPolicy checkpointRetentionPolicy;
 
@@ -60,11 +64,14 @@ public CheckpointCoordinatorConfiguration(
                        long minPauseBetweenCheckpoints,
                        int maxConcurrentCheckpoints,
                        CheckpointRetentionPolicy checkpointRetentionPolicy,
-                       boolean isExactlyOnce) {
+                       boolean isExactlyOnce,
+                       int tolerableCpFailureNumber,
+                       boolean failOnCheckpointingErrors) {
 
                // sanity checks
                if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-                       minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1) {
+                       minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1 ||
+                       tolerableCpFailureNumber < 0) {
                        throw new IllegalArgumentException();
                }
 
@@ -74,6 +81,8 @@ public CheckpointCoordinatorConfiguration(
                this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
                this.checkpointRetentionPolicy = 
Preconditions.checkNotNull(checkpointRetentionPolicy);
                this.isExactlyOnce = isExactlyOnce;
+               this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+               this.failOnCheckpointingErrors = failOnCheckpointingErrors;
        }
 
        public long getCheckpointInterval() {
@@ -100,6 +109,14 @@ public boolean isExactlyOnce() {
                return isExactlyOnce;
        }
 
+       public int getTolerableCpFailureNumber() {
+               return tolerableCpFailureNumber;
+       }
+
+       public boolean isFailOnCheckpointingErrors() {
+               return failOnCheckpointingErrors;
+       }
+
        @Override
        public boolean equals(Object o) {
                if (this == o) {
@@ -114,7 +131,8 @@ public boolean equals(Object o) {
                        minPauseBetweenCheckpoints == 
that.minPauseBetweenCheckpoints &&
                        maxConcurrentCheckpoints == 
that.maxConcurrentCheckpoints &&
                        isExactlyOnce == that.isExactlyOnce &&
-                       checkpointRetentionPolicy == 
that.checkpointRetentionPolicy;
+                       checkpointRetentionPolicy == 
that.checkpointRetentionPolicy &&
+                       tolerableCpFailureNumber == 
that.tolerableCpFailureNumber;
        }
 
        @Override
@@ -125,7 +143,8 @@ public int hashCode() {
                                minPauseBetweenCheckpoints,
                                maxConcurrentCheckpoints,
                                checkpointRetentionPolicy,
-                               isExactlyOnce);
+                               isExactlyOnce,
+                               tolerableCpFailureNumber);
        }
 
        @Override
@@ -136,6 +155,7 @@ public String toString() {
                        ", minPauseBetweenCheckpoints=" + 
minPauseBetweenCheckpoints +
                        ", maxConcurrentCheckpoints=" + 
maxConcurrentCheckpoints +
                        ", checkpointRetentionPolicy=" + 
checkpointRetentionPolicy +
+                       ", tolerableCpFailureNumber=" + 
tolerableCpFailureNumber +
                        '}';
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 32b32cfb3d6..3c0639c152f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -80,7 +80,8 @@ public void testFailingCompletedCheckpointStoreAdd() throws 
Exception {
                        new FailingCompletedCheckpointStore(),
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                coord.triggerCheckpoint(triggerTimestamp, false);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index f644c01caf5..06b49201bbd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -435,7 +435,8 @@ private static CheckpointCoordinator 
instantiateCheckpointCoordinator(JobID jid,
                                new StandaloneCompletedCheckpointStore(10),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
        }
 
        private static <T> T mockGeneric(Class<?> clazz) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 3650f43066d..142ea2bcc0b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -23,15 +23,23 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.DummyJobInformation;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
@@ -51,6 +59,7 @@
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.util.InstantiationUtil;
@@ -143,7 +152,8 @@ public void 
testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
                                new StandaloneCompletedCheckpointStore(1),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -204,7 +214,8 @@ public void testCheckpointAbortsIfTriggerTasksAreFinished() 
{
                                new StandaloneCompletedCheckpointStore(1),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -256,7 +267,8 @@ public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
                                new StandaloneCompletedCheckpointStore(1),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -309,7 +321,8 @@ public void testTriggerAndDeclineCheckpointSimple() {
                                new StandaloneCompletedCheckpointStore(1),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -412,7 +425,8 @@ public void testTriggerAndDeclineCheckpointComplex() {
                                new StandaloneCompletedCheckpointStore(1),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -532,7 +546,8 @@ public void testTriggerAndConfirmSimpleCheckpoint() {
                                new StandaloneCompletedCheckpointStore(1),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -700,7 +715,8 @@ public void testMultipleConcurrentCheckpoints() {
                                new StandaloneCompletedCheckpointStore(2),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -831,7 +847,8 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() {
                                new StandaloneCompletedCheckpointStore(10),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -996,7 +1013,8 @@ public void testCheckpointTimeoutIsolated() {
                                new StandaloneCompletedCheckpointStore(2),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        // trigger a checkpoint, partially acknowledged
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1074,7 +1092,8 @@ public void testHandleMessagesForNonExistingCheckpoints() 
{
                                new StandaloneCompletedCheckpointStore(2),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1138,7 +1157,8 @@ public void testStateCleanupForLateOrUnknownMessages() 
throws Exception {
                        new StandaloneCompletedCheckpointStore(1),
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1271,7 +1291,8 @@ public Void answer(InvocationOnMock invocation) throws 
Throwable {
                                new StandaloneCompletedCheckpointStore(2),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
 
                        coord.startCheckpointScheduler();
@@ -1361,7 +1382,8 @@ public void testMinTimeBetweenCheckpointsInterval() 
throws Exception {
                                new StandaloneCompletedCheckpointStore(2),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                try {
                        coord.startCheckpointScheduler();
@@ -1435,7 +1457,8 @@ public void testTriggerAndConfirmSimpleSavepoint() throws 
Exception {
                        new StandaloneCompletedCheckpointStore(1),
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
                assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1587,7 +1610,8 @@ public void testSavepointsAreNotSubsumed() throws 
Exception {
                        new StandaloneCompletedCheckpointStore(10),
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -1681,7 +1705,8 @@ private void testMaxConcurrentAttempts(int 
maxConcurrentAttempts) {
                                new StandaloneCompletedCheckpointStore(2),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        coord.startCheckpointScheduler();
 
@@ -1755,7 +1780,8 @@ public void testMaxConcurrentAttempsWithSubsumption() {
                                new StandaloneCompletedCheckpointStore(2),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        coord.startCheckpointScheduler();
 
@@ -1832,7 +1858,8 @@ public void testPeriodicSchedulingWithInactiveTasks() {
                                new StandaloneCompletedCheckpointStore(2),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        coord.startCheckpointScheduler();
 
@@ -1885,7 +1912,8 @@ public void testConcurrentSavepoints() throws Exception {
                        new StandaloneCompletedCheckpointStore(2),
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                List<CompletableFuture<CompletedCheckpoint>> savepointFutures = 
new ArrayList<>();
 
@@ -1939,7 +1967,8 @@ public void testMinDelayBetweenSavepoints() throws 
Exception {
                        new StandaloneCompletedCheckpointStore(2),
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -2002,7 +2031,8 @@ public void testRestoreLatestCheckpointedState() throws 
Exception {
                        store,
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2117,7 +2147,8 @@ public void 
testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws
                        new StandaloneCompletedCheckpointStore(1),
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2264,7 +2295,8 @@ private void 
testRestoreLatestCheckpointedStateWithChangingParallelism(boolean s
                        new StandaloneCompletedCheckpointStore(1),
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2549,7 +2581,8 @@ public void testStateRecoveryWithTopologyChange(int 
scaleType) throws Exception
                        standaloneCompletedCheckpointStore,
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                coord.restoreLatestCheckpointedState(tasks, false, true);
 
@@ -2701,7 +2734,8 @@ public void testExternalizedCheckpoints() throws 
Exception {
                                new StandaloneCompletedCheckpointStore(1),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -2777,6 +2811,434 @@ public void testReplicateModeStateHandle() {
                Assert.assertEquals(3, checkCounts.get("t-6").intValue());
        }
 
+       @Test
+       public void testTolerableFailureForCompletingCheckpointFailed() {
+               JobID jid = new JobID();
+
+               final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
+               final ExecutionVertex vertex = 
CheckpointCoordinatorTest.mockExecutionVertex(executionAttemptId);
+
+               final long triggerTimestamp = 1L;
+               final boolean failOnCheckpointingErrors = false;
+               final int tolerableFailureNumber = 1;
+               final CheckpointFailureManager failureManager = new 
CheckpointFailureManager(
+                       failOnCheckpointingErrors, tolerableFailureNumber, 
mock(ExecutionGraph.class));
+
+               // set up the coordinator and validate the initial state
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       new ExecutionVertex[]{vertex},
+                       new ExecutionVertex[]{vertex},
+                       new ExecutionVertex[]{vertex},
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(2),
+                       new MemoryStateBackend(),
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       failureManager);
+
+               TaskStateSnapshot subtaskState = spy(new TaskStateSnapshot());
+
+               KeyedStateHandle managedKeyedHandle = 
mock(KeyedStateHandle.class);
+               KeyedStateHandle rawKeyedHandle = mock(KeyedStateHandle.class);
+               OperatorStateHandle managedOpHandle = 
mock(OperatorStreamStateHandle.class);
+               OperatorStateHandle rawOpHandle = 
mock(OperatorStreamStateHandle.class);
+
+               final OperatorSubtaskState operatorSubtaskState = spy(new 
OperatorSubtaskState(
+                       managedOpHandle,
+                       rawOpHandle,
+                       managedKeyedHandle,
+                       rawKeyedHandle));
+
+               subtaskState.putSubtaskStateByOperatorID(new OperatorID(), 
operatorSubtaskState);
+
+               
when(subtaskState.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID(vertex.getJobvertexId()))).thenReturn(operatorSubtaskState);
+
+               coord.triggerCheckpoint(triggerTimestamp, false);
+
+               assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+               PendingCheckpoint pendingCheckpoint = 
coord.getPendingCheckpoints().values().iterator().next();
+
+               assertFalse(pendingCheckpoint.isDiscarded());
+
+               long checkpointId = 
coord.getPendingCheckpoints().keySet().iterator().next();
+               AcknowledgeCheckpoint acknowledgeMessage = new 
AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new 
CheckpointMetrics(), subtaskState);
+
+               try {
+                       coord.receiveAcknowledgeMessage(acknowledgeMessage);
+                       fail("Expected a checkpoint exception because the 
completed checkpoint store could not " +
+                               "store the completed checkpoint.");
+               } catch (CheckpointException e) {
+                       // ignore because we expected this exception
+               }
+
+               assertEquals(1, 
failureManager.getContinuousFailureCounter().get());
+
+               coord.triggerCheckpoint(triggerTimestamp, false);
+               checkpointId = 
coord.getPendingCheckpoints().keySet().iterator().next();
+               acknowledgeMessage = new AcknowledgeCheckpoint(jid, 
executionAttemptId, checkpointId, new CheckpointMetrics(), subtaskState);
+
+               try {
+                       coord.receiveAcknowledgeMessage(acknowledgeMessage);
+               } catch (CheckpointException e) {
+
+               }
+
+               assertTrue(failureManager.getContinuousFailureCounter().get() > 
tolerableFailureNumber);
+
+               // make sure that the pending checkpoint has been discarded 
after we could not complete it
+               assertTrue(pendingCheckpoint.isDiscarded());
+       }
+
+       @Test
+       public void testTolerableFailureForTriggeringCheckpointTimeout() {
+               try {
+                       final JobID jid = new JobID();
+                       final long timestamp = System.currentTimeMillis();
+
+                       // create some mock execution vertices
+
+                       final ExecutionAttemptID triggerAttemptID = new 
ExecutionAttemptID();
+
+                       final ExecutionAttemptID ackAttemptID1 = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID ackAttemptID2 = new 
ExecutionAttemptID();
+
+                       final ExecutionAttemptID commitAttemptID = new 
ExecutionAttemptID();
+
+                       final int tolerableFailureNumber = 1;
+                       final boolean failOnCheckpointErrors = false;
+                       final CheckpointFailureManager failureManager = new 
CheckpointFailureManager(
+                               failOnCheckpointErrors,
+                               tolerableFailureNumber,
+                               mock(ExecutionGraph.class)
+                       );
+
+                       ExecutionVertex triggerVertex = 
mockExecutionVertex(triggerAttemptID);
+
+                       ExecutionVertex ackVertex1 = 
mockExecutionVertex(ackAttemptID1);
+                       ExecutionVertex ackVertex2 = 
mockExecutionVertex(ackAttemptID2);
+
+                       ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
+
+                       // set up the coordinator
+                       // the timeout for the checkpoint is a 200 milliseconds
+
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               600000,
+                               200,
+                               0,
+                               Integer.MAX_VALUE,
+                               
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               new ExecutionVertex[] { triggerVertex },
+                               new ExecutionVertex[] { ackVertex1, ackVertex2 
},
+                               new ExecutionVertex[] { commitVertex },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(2),
+                               new MemoryStateBackend(),
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               failureManager);
+
+                       // trigger a checkpoint, partially acknowledged
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
+                       assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+                       PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().values().iterator().next();
+                       assertFalse(checkpoint.isDiscarded());
+
+                       OperatorID opID1 = 
OperatorID.fromJobVertexID(ackVertex1.getJobvertexId());
+
+                       TaskStateSnapshot taskOperatorSubtaskStates1 = spy(new 
TaskStateSnapshot());
+                       OperatorSubtaskState subtaskState1 = 
mock(OperatorSubtaskState.class);
+                       
taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
+
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId(), new 
CheckpointMetrics(), taskOperatorSubtaskStates1));
+
+                       // wait until the checkpoint must have expired.
+                       // we check every 250 msecs conservatively for 5 seconds
+                       // to give even slow build servers a very good chance 
of completing this
+                       long deadline = System.currentTimeMillis() + 5000;
+                       do {
+                               Thread.sleep(250);
+                       }
+                       while (!checkpoint.isDiscarded() &&
+                               coord.getNumberOfPendingCheckpoints() > 0 &&
+                               System.currentTimeMillis() < deadline);
+
+                       assertTrue("Checkpoint was not canceled by the 
timeout", checkpoint.isDiscarded());
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       assertEquals(1, 
failureManager.getContinuousFailureCounter().get());
+
+                       // validate that the received states have been discarded
+                       verify(subtaskState1, times(1)).discardState();
+
+                       // no confirm message must have been sent
+                       verify(commitVertex.getCurrentExecutionAttempt(), 
times(0)).notifyCheckpointComplete(anyLong(), anyLong());
+
+                       coord.shutdown(JobStatus.FINISHED);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testTolerableFailureForDecliningCheckpointFailed() {
+               try {
+                       final JobID jid = new JobID();
+                       final long timestamp = System.currentTimeMillis();
+                       final int tolerableFailureNumber = 1;
+                       final boolean failOnCheckpointingError = false;
+                       CheckpointFailureManager failureManager = new 
CheckpointFailureManager(
+                               failOnCheckpointingError,
+                               tolerableFailureNumber,
+                               mock(ExecutionGraph.class)
+                       );
+
+                       // create some mock Execution vertices that receive the 
checkpoint trigger messages
+                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID attemptID2 = new 
ExecutionAttemptID();
+                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1);
+                       ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2);
+
+                       // set up the coordinator and validate the initial state
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               new MemoryStateBackend(),
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               failureManager);
+
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       // trigger the first checkpoint. this should succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+                       // validate that we have a pending checkpoint
+                       assertEquals(1, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       // we have one task scheduled that will cancel after 
timeout
+                       assertEquals(1, coord.getNumScheduledTasks());
+
+                       long checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+                       PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().get(checkpointId);
+
+                       assertNotNull(checkpoint);
+                       assertEquals(checkpointId, 
checkpoint.getCheckpointId());
+                       assertEquals(timestamp, 
checkpoint.getCheckpointTimestamp());
+                       assertEquals(jid, checkpoint.getJobId());
+                       assertEquals(2, 
checkpoint.getNumberOfNonAcknowledgedTasks());
+                       assertEquals(0, 
checkpoint.getNumberOfAcknowledgedTasks());
+                       assertEquals(0, checkpoint.getOperatorStates().size());
+                       assertFalse(checkpoint.isDiscarded());
+                       assertFalse(checkpoint.isFullyAcknowledged());
+
+                       // check that the vertices received the trigger 
checkpoint message
+                       
verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, 
timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());
+                       
verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, 
timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());
+
+                       // acknowledge from one of the tasks
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+                       assertEquals(1, 
checkpoint.getNumberOfAcknowledgedTasks());
+                       assertEquals(1, 
checkpoint.getNumberOfNonAcknowledgedTasks());
+                       assertFalse(checkpoint.isDiscarded());
+                       assertFalse(checkpoint.isFullyAcknowledged());
+
+                       // acknowledge the same task again (should not matter)
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+                       assertFalse(checkpoint.isDiscarded());
+                       assertFalse(checkpoint.isFullyAcknowledged());
+
+
+                       // decline checkpoint from the other task, this should 
cancel the checkpoint
+                       // and trigger a new one
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
+                       assertTrue(checkpoint.isDiscarded());
+
+                       assertEquals(tolerableFailureNumber, 
failureManager.getContinuousFailureCounter().get());
+
+                       // the canceler is also removed
+                       assertEquals(0, coord.getNumScheduledTasks());
+
+                       // validate that we have no new pending checkpoint
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       // decline again, nothing should happen
+                       // decline from the other task, nothing should happen
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpointId));
+                       assertTrue(checkpoint.isDiscarded());
+
+                       assertEquals(1, 
failureManager.getContinuousFailureCounter().get());
+
+                       //trigger a new checkpoint
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
+                       checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
+
+                       // trigger job fail
+                       
assertTrue(failureManager.getContinuousFailureCounter().get() > 
tolerableFailureNumber);
+
+                       coord.shutdown(JobStatus.FINISHED);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testTolerableFailureForResettingCounter() {
+               try {
+                       final JobID jid = new JobID();
+                       final long timestamp = System.currentTimeMillis();
+                       final int tolerableFailureNumber = 1;
+                       final boolean failOnCheckpointErrors = false;
+                       final CheckpointFailureManager failureManager = new 
CheckpointFailureManager(
+                               failOnCheckpointErrors,
+                               tolerableFailureNumber,
+                               mock(ExecutionGraph.class));
+
+                       // create some mock Execution vertices that receive the 
checkpoint trigger messages
+                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID attemptID2 = new 
ExecutionAttemptID();
+                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1);
+                       ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2);
+
+                       // set up the coordinator and validate the initial state
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               new MemoryStateBackend(),
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               failureManager);
+
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       // trigger the first checkpoint. this should succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+                       // validate that we have a pending checkpoint
+                       assertEquals(1, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       // we have one task scheduled that will cancel after 
timeout
+                       assertEquals(1, coord.getNumScheduledTasks());
+
+                       long checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+                       PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().get(checkpointId);
+
+                       assertNotNull(checkpoint);
+                       assertEquals(checkpointId, 
checkpoint.getCheckpointId());
+                       assertEquals(timestamp, 
checkpoint.getCheckpointTimestamp());
+                       assertEquals(jid, checkpoint.getJobId());
+                       assertEquals(2, 
checkpoint.getNumberOfNonAcknowledgedTasks());
+                       assertEquals(0, 
checkpoint.getNumberOfAcknowledgedTasks());
+                       assertEquals(0, checkpoint.getOperatorStates().size());
+                       assertFalse(checkpoint.isDiscarded());
+                       assertFalse(checkpoint.isFullyAcknowledged());
+
+                       // check that the vertices received the trigger 
checkpoint message
+                       
verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, 
timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());
+                       
verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, 
timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());
+
+                       // acknowledge from one of the tasks
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+                       assertEquals(1, 
checkpoint.getNumberOfAcknowledgedTasks());
+                       assertEquals(1, 
checkpoint.getNumberOfNonAcknowledgedTasks());
+                       assertFalse(checkpoint.isDiscarded());
+                       assertFalse(checkpoint.isFullyAcknowledged());
+
+                       // acknowledge the same task again (should not matter)
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+                       assertFalse(checkpoint.isDiscarded());
+                       assertFalse(checkpoint.isFullyAcknowledged());
+
+
+                       // decline checkpoint from the other task, this should 
cancel the checkpoint
+                       // and trigger a new one
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
+                       assertTrue(checkpoint.isDiscarded());
+
+                       assertEquals(tolerableFailureNumber, 
failureManager.getContinuousFailureCounter().get());
+
+                       // the canceler is also removed
+                       assertEquals(0, coord.getNumScheduledTasks());
+
+                       // validate that we have no new pending checkpoint
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       // decline again, nothing should happen
+                       // decline from the other task, nothing should happen
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpointId));
+                       assertTrue(checkpoint.isDiscarded());
+
+                       assertEquals(1, 
failureManager.getContinuousFailureCounter().get());
+
+                       //trigger a new checkpoint
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
+                       checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
+
+                       // trigger job fail
+                       
assertTrue(failureManager.getContinuousFailureCounter().get() > 
tolerableFailureNumber);
+
+                       // trigger the checkpoint. this should succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
+                       checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+
+                       // counter has been reset
+                       assertEquals(0, 
failureManager.getContinuousFailureCounter().get());
+
+                       coord.shutdown(JobStatus.FINISHED);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
@@ -3192,7 +3654,8 @@ public void testStopPeriodicScheduler() throws Exception {
                        new StandaloneCompletedCheckpointStore(1),
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                // Periodic
                CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
@@ -3400,7 +3863,8 @@ public void 
testCheckpointStatsTrackerPendingCheckpointCallback() {
                        new StandaloneCompletedCheckpointStore(1),
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
                coord.setCheckpointStatsTracker(tracker);
@@ -3439,7 +3903,8 @@ public void testCheckpointStatsTrackerRestoreCallback() 
throws Exception {
                        store,
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                store.addCheckpoint(new CompletedCheckpoint(
                        new JobID(),
@@ -3506,7 +3971,8 @@ public void testSharedStateRegistrationOnRestore() throws 
Exception {
                                        SharedStateRegistry instance = new 
SharedStateRegistry(deleteExecutor);
                                        
createdSharedStateRegistries.add(instance);
                                        return instance;
-                               });
+                               },
+                       mock(CheckpointFailureManager.class));
 
                final int numCheckpoints = 3;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index ffebc52f8a0..b9dfa231f1e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -84,6 +84,8 @@ public void 
testDeserializationOfUserCodeWithUserClassLoader() throws Exception
                                        0L,
                                        1,
                                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                                       true,
+                                       0,
                                        true),
                                new SerializedValue<StateBackend>(new 
CustomStateBackend(outOfClassPath)),
                                serHooks);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 236717ff434..e45625a4d7b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -109,7 +109,8 @@ public void testSetState() {
                                new StandaloneCompletedCheckpointStore(1),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        // create ourselves a checkpoint with state
                        final long timestamp = 34623786L;
@@ -187,7 +188,8 @@ public void testNoCheckpointAvailable() {
                                new StandaloneCompletedCheckpointStore(1),
                                new MemoryStateBackend(),
                                Executors.directExecutor(),
-                               SharedStateRegistry.DEFAULT_FACTORY);
+                               SharedStateRegistry.DEFAULT_FACTORY,
+                               mock(CheckpointFailureManager.class));
 
                        try {
                                coord.restoreLatestCheckpointedState(new 
HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
@@ -245,7 +247,8 @@ public void testNonRestoredState() throws Exception {
                        new StandaloneCompletedCheckpointStore(1),
                        new MemoryStateBackend(),
                        Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
 
                // --- (2) Checkpoint misses state for a jobVertex (should 
work) ---
                Map<OperatorID, OperatorState> checkpointTaskStates = new 
HashMap<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 82dcd023f9b..23bb576e420 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -64,7 +64,9 @@ public void testGetSnapshottingSettings() throws Exception {
                                191929L,
                                123,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-                               false
+                               false,
+                               0,
+                               true
                        ),
                        null);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index f6b7730a72e..fae57f26268 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -86,6 +86,8 @@ public void testCoordinatorShutsDownOnFailure() {
                                                0L,
                                                Integer.MAX_VALUE,
                                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                                               true,
+                                               0,
                                                true),
                                        null));
                        testGraph.setExecutionConfig(executionConfig);
@@ -157,6 +159,8 @@ public void testCoordinatorShutsDownOnSuccess() {
                                                0L,
                                                Integer.MAX_VALUE,
                                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                                               true,
+                                               0,
                                                true),
                                        null));
                        
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 4b7449b46ef..4434561b60a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -104,7 +104,8 @@ private ExecutionGraph 
createExecutionGraphAndEnableCheckpointing(
                                counter,
                                store,
                                new MemoryStateBackend(),
-                               CheckpointStatsTrackerTest.createTestTracker());
+                               CheckpointStatsTrackerTest.createTestTracker(),
+                               mock(CheckpointFailureManager.class));
 
                JobVertex jobVertex = new JobVertex("MockVertex");
                jobVertex.setInvokableClass(AbstractInvokable.class);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 7c2a02c3314..66c71f5f379 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -28,6 +28,7 @@
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
@@ -135,7 +136,8 @@ public static void setupExecutionGraph() throws Exception {
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        new MemoryStateBackend(),
-                       statsTracker);
+                       statsTracker,
+                       mock(CheckpointFailureManager.class));
 
                runtimeGraph.setJsonPlan("{}");
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 12b4277941f..e1054dc5094 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -694,7 +694,9 @@ private ExecutionGraph createExecutionGraph(Configuration 
configuration) throws
                                        0,
                                        1,
                                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-                                       false),
+                                       false,
+                                       0,
+                                       true),
                                null));
 
                final Time timeout = Time.seconds(10L);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index a3a26f336e4..99270d8b03f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
@@ -313,6 +314,8 @@ public void testLocalFailureFailsPendingCheckpoints() 
throws Exception {
                        1L,
                        3,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       0,
                        true);
 
                final ExecutionGraph graph = createSampleGraph(
@@ -342,7 +345,8 @@ public void testLocalFailureFailsPendingCheckpoints() 
throws Exception {
                                1,
                                allVertices,
                                checkpointCoordinatorConfiguration,
-                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()));
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()),
+                               mock(CheckpointFailureManager.class));
 
                final CheckpointCoordinator checkpointCoordinator = 
graph.getCheckpointCoordinator();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
index 51e7fec1223..832b113aa4c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -49,7 +49,9 @@ public void testIsJavaSerializable() throws Exception {
                                112,
                                12,
                                CheckpointRetentionPolicy.RETAIN_ON_FAILURE,
-                               false),
+                               false,
+                               0,
+                               true),
                        new SerializedValue<>(new MemoryStateBackend()));
 
                JobCheckpointingSettings copy = 
CommonTestUtils.createCopySerializable(settings);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index d991983e6f2..b7efe048d27 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -258,6 +258,8 @@ public void testJobRecoveryWhenLosingLeadership() throws 
Exception {
                                                0L,
                                                1,
                                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                                               true,
+                                               0,
                                                true),
                                        null));
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index f3c72545e69..3cdaac6e6bf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -1429,6 +1429,8 @@ private JobGraph 
createJobGraphFromJobVerticesWithCheckpointing(SavepointRestore
                        1000L,
                        1,
                        
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       0,
                        true);
                final JobCheckpointingSettings checkpointingSettings = new 
JobCheckpointingSettings(
                        Collections.emptyList(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 0c9dd49c1d1..89d01a7e1ce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -312,7 +312,7 @@ public void acknowledgeCheckpoint(long checkpointId, 
CheckpointMetrics checkpoin
 
        @Override
        public void declineCheckpoint(long checkpointId, Throwable cause) {
-               throw new UnsupportedOperationException();
+
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
index 13a319490d6..6890b151521 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
@@ -98,6 +98,8 @@ public void testAskTimeoutEqualsCheckpointTimeout() throws 
Exception {
                                1L,
                                1,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               0,
                                true));
 
                JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor);
@@ -132,6 +134,8 @@ public void testSavepointDirectoryConfiguration() throws 
Exception {
                                1L,
                                1,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               0,
                                true));
 
                JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory");
@@ -185,6 +189,8 @@ public void testTriggerNewRequest() throws Exception {
                                1L,
                                1,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               0,
                                true));
 
                JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor);
@@ -317,6 +323,8 @@ public void testFailedCancellation() throws Exception {
                                1L,
                                1,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               0,
                                true));
 
                JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
index 47ebb18a115..99fb8ff29de 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
@@ -156,6 +156,8 @@ private static GraphAndSettings 
createGraphAndSettings(boolean externalized, boo
                long timeout = 996979L;
                long minPause = 119191919L;
                int maxConcurrent = 12929329;
+               int tolerableCpFailureNumber = 0;
+               boolean failOnCheckpointingErrors = false;
 
                CheckpointRetentionPolicy retentionPolicy = externalized
                        ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE
@@ -167,7 +169,9 @@ private static GraphAndSettings 
createGraphAndSettings(boolean externalized, boo
                        minPause,
                        maxConcurrent,
                        retentionPolicy,
-                       exactlyOnce);
+                       exactlyOnce,
+                       tolerableCpFailureNumber,
+                       failOnCheckpointingErrors);
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
                
when(graph.getCheckpointCoordinatorConfiguration()).thenReturn(chkConfig);
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 3ffe7701682..5ace2f18527 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -843,6 +843,8 @@ class JobManagerITCase(_system: ActorSystem)
               60000,
               1,
               CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+              true,
+              0,
               true),
             null))
 
@@ -904,6 +906,8 @@ class JobManagerITCase(_system: ActorSystem)
               60000,
               1,
               CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+              true,
+              0,
               true),
             null))
 
@@ -973,6 +977,8 @@ class JobManagerITCase(_system: ActorSystem)
               60000,
               1,
               CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+              true,
+              0,
               true),
             null))
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 87c800d2130..aec83d65a07 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -22,6 +22,7 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.Preconditions;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -46,6 +47,9 @@
        /** The default limit of concurrently happening checkpoints: one. */
        public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
 
+       /** The default tolerable failure number: none. */
+       public static final int DEFAULT_TOLERABLE_FAILURE_NUMBER = 0;
+
        // 
------------------------------------------------------------------------
 
        /** Checkpointing mode (exactly-once vs. at-least-once). */
@@ -72,6 +76,9 @@
        /** Determines if a tasks are failed or not if there is an error in 
their checkpointing. Default: true */
        private boolean failOnCheckpointingErrors = true;
 
+       /** Determines tolerate how many times the checkpoint fails in 
succession to trigger task fail. */
+       private int tolerableFailureNumber = DEFAULT_TOLERABLE_FAILURE_NUMBER;
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -206,6 +213,24 @@ public void setMaxConcurrentCheckpoints(int 
maxConcurrentCheckpoints) {
                this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
        }
 
+       /**
+        * Get the number which allow how many times the checkpoint fails in 
succession to
+        * trigger the job to fail. Default: 0.
+        */
+       public int getTolerableFailureNumber() {
+               return tolerableFailureNumber;
+       }
+
+       /**
+        * Set the number which allow how many times the checkpoint fails in 
succession to
+        * trigger the job to fail. Default: 0.
+        */
+       public void setTolerableFailureNumber(int tolerableFailureNumber) {
+               Preconditions.checkArgument(
+                       tolerableFailureNumber >= 0, "The number must greater 
than or equal to zero.");
+               this.tolerableFailureNumber = tolerableFailureNumber;
+       }
+
        /**
         * Checks whether checkpointing is forced, despite currently 
non-checkpointable iteration feedback.
         *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 69213024975..2d6c036336c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -593,14 +592,7 @@ private void configureCheckpointing() {
                CheckpointConfig cfg = streamGraph.getCheckpointConfig();
 
                long interval = cfg.getCheckpointInterval();
-               if (interval > 0) {
-                       ExecutionConfig executionConfig = 
streamGraph.getExecutionConfig();
-                       // propagate the expected behaviour for checkpoint 
errors to task.
-                       
executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
-               } else {
-                       // interval of max value means disable periodic 
checkpoint
-                       interval = Long.MAX_VALUE;
-               }
+               interval = interval > 0 ? interval : Long.MAX_VALUE;
 
                //  --- configure the participating vertices ---
 
@@ -711,7 +703,9 @@ private void configureCheckpointing() {
                                cfg.getMinPauseBetweenCheckpoints(),
                                cfg.getMaxConcurrentCheckpoints(),
                                retentionAfterTermination,
-                               isExactlyOnce),
+                               isExactlyOnce,
+                               cfg.getTolerableFailureNumber(),
+                               cfg.isFailOnCheckpointingErrors()),
                        serializedStateBackend,
                        serializedHooks);
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java
deleted file mode 100644
index 01407952550..00000000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-
-/**
- * Handler for exceptions that happen on checkpointing. The handler can reject 
and rethrow the offered exceptions.
- */
-public interface CheckpointExceptionHandler {
-
-       /**
-        * Offers the exception for handling. If the exception cannot be 
handled from this instance, it is rethrown.
-        *
-        * @param checkpointMetaData metadata for the checkpoint for which the 
exception occurred.
-        * @param exception  the exception to handle.
-        * @throws Exception rethrows the exception if it cannot be handled.
-        */
-       void tryHandleCheckpointException(CheckpointMetaData 
checkpointMetaData, Exception exception) throws Exception;
-}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
deleted file mode 100644
index 430f43e3db8..00000000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.util.Preconditions;
-
-/**
- * This factory produces {@link CheckpointExceptionHandler} instances that 
handle exceptions during checkpointing in a
- * {@link StreamTask}.
- */
-public class CheckpointExceptionHandlerFactory {
-
-       /**
-        * Returns a {@link CheckpointExceptionHandler} that either causes a 
task to fail completely or to just declines
-        * checkpoint on exception, depending on the parameter flag.
-        */
-       public CheckpointExceptionHandler createCheckpointExceptionHandler(
-               boolean failTaskOnCheckpointException,
-               Environment environment) {
-
-               if (failTaskOnCheckpointException) {
-                       return new FailingCheckpointExceptionHandler();
-               } else {
-                       return new 
DecliningCheckpointExceptionHandler(environment);
-               }
-       }
-
-       /**
-        * This handler makes the task fail by rethrowing a reported exception.
-        */
-       static final class FailingCheckpointExceptionHandler implements 
CheckpointExceptionHandler {
-
-               @Override
-               public void tryHandleCheckpointException(
-                       CheckpointMetaData checkpointMetaData,
-                       Exception exception) throws Exception {
-
-                       throw exception;
-               }
-       }
-
-       /**
-        * This handler makes the task decline the checkpoint as reaction to 
the reported exception. The task is not failed.
-        */
-       static final class DecliningCheckpointExceptionHandler implements 
CheckpointExceptionHandler {
-
-               final Environment environment;
-
-               DecliningCheckpointExceptionHandler(Environment environment) {
-                       this.environment = 
Preconditions.checkNotNull(environment);
-               }
-
-               @Override
-               public void tryHandleCheckpointException(
-                       CheckpointMetaData checkpointMetaData,
-                       Exception exception) throws Exception {
-
-                       
environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), exception);
-               }
-       }
-}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 9ee8892cf95..16faeee88b5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -171,12 +171,6 @@
        /** Thread pool for async snapshot workers. */
        private ExecutorService asyncOperationsThreadPool;
 
-       /** Handler for exceptions during checkpointing in the stream task. 
Used in synchronous part of the checkpoint. */
-       private CheckpointExceptionHandler 
synchronousCheckpointExceptionHandler;
-
-       /** Wrapper for synchronousCheckpointExceptionHandler to deal with 
rethrown exceptions. Used in the async part. */
-       private AsyncCheckpointExceptionHandler 
asynchronousCheckpointExceptionHandler;
-
        private final 
List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
streamRecordWriters;
 
        // 
------------------------------------------------------------------------
@@ -245,14 +239,6 @@ public final void invoke() throws Exception {
 
                        asyncOperationsThreadPool = 
Executors.newCachedThreadPool();
 
-                       CheckpointExceptionHandlerFactory 
cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
-
-                       synchronousCheckpointExceptionHandler = 
cpExceptionHandlerFactory.createCheckpointExceptionHandler(
-                               
getExecutionConfig().isFailTaskOnCheckpointError(),
-                               getEnvironment());
-
-                       asynchronousCheckpointExceptionHandler = new 
AsyncCheckpointExceptionHandler(this);
-
                        stateBackend = createStateBackend();
                        checkpointStorage = 
stateBackend.createCheckpointStorage(getEnvironment().getJobID());
 
@@ -724,7 +710,8 @@ private void checkpointState(
                        checkpointMetaData,
                        checkpointOptions,
                        storage,
-                       checkpointMetrics);
+                       checkpointMetrics,
+                       getEnvironment());
 
                checkpointingOperation.executeCheckpointing();
        }
@@ -754,10 +741,6 @@ private StateBackend createStateBackend() throws Exception 
{
                                LOG);
        }
 
-       protected CheckpointExceptionHandlerFactory 
createCheckpointExceptionHandlerFactory() {
-               return new CheckpointExceptionHandlerFactory();
-       }
-
        /**
         * Returns the {@link ProcessingTimeService} responsible for telling 
the current
         * processing time and registering timers.
@@ -819,18 +802,22 @@ public String toString() {
                private final 
AtomicReference<CheckpointingOperation.AsyncCheckpointState> 
asyncCheckpointState = new AtomicReference<>(
                        CheckpointingOperation.AsyncCheckpointState.RUNNING);
 
+               private final Environment environment;
+
                AsyncCheckpointRunnable(
                        StreamTask<?, ?> owner,
                        Map<OperatorID, OperatorSnapshotFutures> 
operatorSnapshotsInProgress,
                        CheckpointMetaData checkpointMetaData,
                        CheckpointMetrics checkpointMetrics,
-                       long asyncStartNanos) {
+                       long asyncStartNanos,
+                       Environment environment) {
 
                        this.owner = Preconditions.checkNotNull(owner);
                        this.operatorSnapshotsInProgress = 
Preconditions.checkNotNull(operatorSnapshotsInProgress);
                        this.checkpointMetaData = 
Preconditions.checkNotNull(checkpointMetaData);
                        this.checkpointMetrics = 
Preconditions.checkNotNull(checkpointMetrics);
                        this.asyncStartNanos = asyncStartNanos;
+                       this.environment = environment;
                }
 
                @Override
@@ -944,9 +931,7 @@ private void handleExecutionException(Exception e) {
 
                                        // We only report the exception for the 
original cause of fail and cleanup.
                                        // Otherwise this followup exception 
could race the original exception in failing the task.
-                                       
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
-                                               checkpointMetaData,
-                                               checkpointException);
+                                       
environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), 
checkpointException);
 
                                        currentState = 
CheckpointingOperation.AsyncCheckpointState.DISCARDED;
                                } else {
@@ -1027,6 +1012,8 @@ public CloseableRegistry getCancelables() {
                private long startSyncPartNano;
                private long startAsyncPartNano;
 
+               private final Environment environment;
+
                // ------------------------
 
                private final Map<OperatorID, OperatorSnapshotFutures> 
operatorSnapshotsInProgress;
@@ -1036,7 +1023,8 @@ public CheckpointingOperation(
                                CheckpointMetaData checkpointMetaData,
                                CheckpointOptions checkpointOptions,
                                CheckpointStreamFactory 
checkpointStorageLocation,
-                               CheckpointMetrics checkpointMetrics) {
+                               CheckpointMetrics checkpointMetrics,
+                               Environment environment) {
 
                        this.owner = Preconditions.checkNotNull(owner);
                        this.checkpointMetaData = 
Preconditions.checkNotNull(checkpointMetaData);
@@ -1045,6 +1033,7 @@ public CheckpointingOperation(
                        this.storageLocation = 
Preconditions.checkNotNull(checkpointStorageLocation);
                        this.allOperators = 
owner.operatorChain.getAllOperators();
                        this.operatorSnapshotsInProgress = new 
HashMap<>(allOperators.length);
+                       this.environment = environment;
                }
 
                public void executeCheckpointing() throws Exception {
@@ -1070,7 +1059,8 @@ public void executeCheckpointing() throws Exception {
                                        operatorSnapshotsInProgress,
                                        checkpointMetaData,
                                        checkpointMetrics,
-                                       startAsyncPartNano);
+                                       startAsyncPartNano,
+                                       environment);
 
                                
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
                                
owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
@@ -1102,7 +1092,7 @@ public void executeCheckpointing() throws Exception {
                                                
checkpointMetrics.getSyncDurationMillis());
                                }
 
-                               
owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData,
 ex);
+                               
environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), ex);
                        }
                }
 
@@ -1126,36 +1116,6 @@ private void checkpointStreamOperator(StreamOperator<?> 
op) throws Exception {
                }
        }
 
-       /**
-        * Wrapper for synchronous {@link CheckpointExceptionHandler}. This 
implementation catches unhandled, rethrown
-        * exceptions and reports them through {@link 
#handleAsyncException(String, Throwable)}. As this implementation
-        * always handles the exception in some way, it never rethrows.
-        */
-       static final class AsyncCheckpointExceptionHandler implements 
CheckpointExceptionHandler {
-
-               /** Owning stream task to which we report async exceptions. */
-               final StreamTask<?, ?> owner;
-
-               /** Synchronous exception handler to which we delegate. */
-               final CheckpointExceptionHandler 
synchronousCheckpointExceptionHandler;
-
-               AsyncCheckpointExceptionHandler(StreamTask<?, ?> owner) {
-                       this.owner = Preconditions.checkNotNull(owner);
-                       this.synchronousCheckpointExceptionHandler =
-                               
Preconditions.checkNotNull(owner.synchronousCheckpointExceptionHandler);
-               }
-
-               @Override
-               public void tryHandleCheckpointException(CheckpointMetaData 
checkpointMetaData, Exception exception) {
-                       try {
-                               
synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData,
 exception);
-                       } catch (Exception unhandled) {
-                               AsynchronousException asyncException = new 
AsynchronousException(unhandled);
-                               owner.handleAsyncException("Failure in 
asynchronous checkpoint materialization", asyncException);
-                       }
-               }
-       }
-
        @VisibleForTesting
        public static <OUT> 
List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
createStreamRecordWriters(
                        StreamConfig configuration,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
deleted file mode 100644
index 08cee559621..00000000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.TestTaskStateManager;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
-import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test that the configuration mechanism for how tasks react on checkpoint 
errors works correctly.
- */
-public class CheckpointExceptionHandlerConfigurationTest extends TestLogger {
-
-       @Test
-       public void testConfigurationFailOnException() throws Exception {
-               testConfigForwarding(true);
-       }
-
-       @Test
-       public void testConfigurationDeclineOnException() throws Exception {
-               testConfigForwarding(false);
-       }
-
-       @Test
-       public void testFailIsDefaultConfig() {
-               ExecutionConfig newExecutionConfig = new ExecutionConfig();
-               
Assert.assertTrue(newExecutionConfig.isFailTaskOnCheckpointError());
-       }
-
-       private void testConfigForwarding(boolean failOnException) throws 
Exception {
-
-               final boolean expectedHandlerFlag = failOnException;
-
-               final DummyEnvironment environment = new 
DummyEnvironment("test", 1, 0);
-               environment.setTaskStateManager(new TestTaskStateManager());
-               
environment.getExecutionConfig().setFailTaskOnCheckpointError(expectedHandlerFlag);
-
-               final CheckpointExceptionHandlerFactory inspectingFactory = new 
CheckpointExceptionHandlerFactory() {
-
-                       @Override
-                       public CheckpointExceptionHandler 
createCheckpointExceptionHandler(
-                               boolean failTaskOnCheckpointException,
-                               Environment environment) {
-
-                               Assert.assertEquals(expectedHandlerFlag, 
failTaskOnCheckpointException);
-                               return 
super.createCheckpointExceptionHandler(failTaskOnCheckpointException, 
environment);
-                       }
-               };
-
-               StreamTask streamTask = new StreamTask(environment, null) {
-                       @Override
-                       protected void init() throws Exception {}
-
-                       @Override
-                       protected void run() throws Exception {}
-
-                       @Override
-                       protected void cleanup() throws Exception {}
-
-                       @Override
-                       protected void cancelTask() throws Exception {}
-
-                       @Override
-                       protected CheckpointExceptionHandlerFactory 
createCheckpointExceptionHandlerFactory() {
-                               return inspectingFactory;
-                       }
-               };
-
-               streamTask.invoke();
-       }
-
-       @Test
-       public void testCheckpointConfigDefault() throws Exception {
-               StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               
Assert.assertTrue(streamExecutionEnvironment.getCheckpointConfig().isFailOnCheckpointingErrors());
-       }
-
-       @Test
-       public void testPropagationFailFromCheckpointConfig() throws Exception {
-               doTestPropagationFromCheckpointConfig(true);
-       }
-
-       @Test
-       public void testPropagationDeclineFromCheckpointConfig() throws 
Exception {
-               doTestPropagationFromCheckpointConfig(false);
-       }
-
-       public void doTestPropagationFromCheckpointConfig(boolean 
failTaskOnCheckpointErrors) throws Exception {
-               StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               streamExecutionEnvironment.setParallelism(1);
-               
streamExecutionEnvironment.getCheckpointConfig().setCheckpointInterval(1000);
-               
streamExecutionEnvironment.getCheckpointConfig().setFailOnCheckpointingErrors(failTaskOnCheckpointErrors);
-               streamExecutionEnvironment.addSource(new 
SourceFunction<Integer>() {
-
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                       }
-
-                       @Override
-                       public void cancel() {
-                       }
-
-               }).addSink(new DiscardingSink<>());
-
-               StreamGraph streamGraph = 
streamExecutionEnvironment.getStreamGraph();
-               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
-               SerializedValue<ExecutionConfig> serializedExecutionConfig = 
jobGraph.getSerializedExecutionConfig();
-               ExecutionConfig executionConfig =
-                       
serializedExecutionConfig.deserializeValue(Thread.currentThread().getContextClassLoader());
-
-               Assert.assertEquals(failTaskOnCheckpointErrors, 
executionConfig.isFailTaskOnCheckpointError());
-       }
-}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
deleted file mode 100644
index 2f581622220..00000000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for the current implementations of {@link CheckpointExceptionHandler} 
and their factory.
- */
-public class CheckpointExceptionHandlerTest extends TestLogger {
-
-       @Test
-       public void testRethrowingHandler() {
-               DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
-               CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
-               CheckpointExceptionHandler exceptionHandler =
-                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
-
-               CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
-               Exception testException = new Exception("test");
-               try {
-                       
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
-                       Assert.fail("Exception not rethrown.");
-               } catch (Exception e) {
-                       Assert.assertEquals(testException, e);
-               }
-
-               Assert.assertNull(environment.getLastDeclinedCheckpointCause());
-       }
-
-       @Test
-       public void testDecliningHandler() {
-               DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
-               CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
-               CheckpointExceptionHandler exceptionHandler =
-                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(false, 
environment);
-
-               CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
-               Exception testException = new Exception("test");
-               try {
-                       
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
-               } catch (Exception e) {
-                       Assert.fail("Exception not handled, but rethrown.");
-               }
-
-               Assert.assertEquals(failedCheckpointMetaData.getCheckpointId(), 
environment.getLastDeclinedCheckpointId());
-               Assert.assertEquals(testException, 
environment.getLastDeclinedCheckpointCause());
-       }
-
-       static final class DeclineDummyEnvironment extends DummyEnvironment {
-
-               private long lastDeclinedCheckpointId;
-               private Throwable lastDeclinedCheckpointCause;
-
-               DeclineDummyEnvironment() {
-                       super("test", 1, 0);
-                       this.lastDeclinedCheckpointId = Long.MIN_VALUE;
-                       this.lastDeclinedCheckpointCause = null;
-               }
-
-               @Override
-               public void declineCheckpoint(long checkpointId, Throwable 
cause) {
-                       this.lastDeclinedCheckpointId = checkpointId;
-                       this.lastDeclinedCheckpointCause = cause;
-               }
-
-               long getLastDeclinedCheckpointId() {
-                       return lastDeclinedCheckpointId;
-               }
-
-               Throwable getLastDeclinedCheckpointCause() {
-                       return lastDeclinedCheckpointCause;
-               }
-       }
-}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index bc864a2e5d1..4b860a4a768 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -111,7 +111,8 @@ public void testReportingFromSnapshotToTaskStateManager() {
                                snapshots,
                                checkpointMetaData,
                                checkpointMetrics,
-                               0L);
+                               0L,
+                               streamMockEnvironment);
 
                checkpointRunnable.run();
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index a94f8ac879d..67c2052cae6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -155,7 +155,6 @@
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -377,18 +376,8 @@ public void testFailingCheckpointStreamOperator() throws 
Exception {
                Whitebox.setInternalState(streamTask, "configuration", new 
StreamConfig(new Configuration()));
                Whitebox.setInternalState(streamTask, "checkpointStorage", new 
MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));
 
-               CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
-               CheckpointExceptionHandler checkpointExceptionHandler =
-                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
mockEnvironment);
-               Whitebox.setInternalState(streamTask, 
"synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
-
-               StreamTask.AsyncCheckpointExceptionHandler 
asyncCheckpointExceptionHandler =
-                       new 
StreamTask.AsyncCheckpointExceptionHandler(streamTask);
-               Whitebox.setInternalState(streamTask, 
"asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
-
                try {
                        streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-                       fail("Expected test exception here.");
                } catch (Exception e) {
                        assertEquals(testException, e.getCause());
                }
@@ -449,20 +438,9 @@ public void testFailingAsyncCheckpointRunnable() throws 
Exception {
                Whitebox.setInternalState(streamTask, "configuration", new 
StreamConfig(new Configuration()));
                Whitebox.setInternalState(streamTask, "checkpointStorage", new 
MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));
 
-               CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
-               CheckpointExceptionHandler checkpointExceptionHandler =
-                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
mockEnvironment);
-               Whitebox.setInternalState(streamTask, 
"synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
-
-               StreamTask.AsyncCheckpointExceptionHandler 
asyncCheckpointExceptionHandler =
-                       new 
StreamTask.AsyncCheckpointExceptionHandler(streamTask);
-               Whitebox.setInternalState(streamTask, 
"asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
-
                
mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
                streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forCheckpointWithDefaultLocation());
 
-               verify(streamTask).handleAsyncException(anyString(), 
any(Throwable.class));
-
                verify(operatorSnapshotResult1).cancel();
                verify(operatorSnapshotResult2).cancel();
                verify(operatorSnapshotResult3).cancel();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index cd8a4fafd9a..79426aaff02 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -91,7 +91,6 @@
 import java.util.concurrent.RunnableFuture;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -115,28 +114,13 @@ public void testDeclineOnCheckpointErrorInAsyncPart() 
throws Exception {
                runTestDeclineOnCheckpointError(new 
AsyncFailureInducingStateBackend());
        }
 
-       @Test
-       public void testTaskFailingOnCheckpointErrorInSyncPart() throws 
Exception {
-               Throwable failureCause = 
runTestTaskFailingOnCheckpointError(new SyncFailureInducingStateBackend());
-               assertNotNull(failureCause);
-
-               String expectedMessageStart = "Could not perform checkpoint";
-               assertEquals(expectedMessageStart, 
failureCause.getMessage().substring(0, expectedMessageStart.length()));
-       }
-
-       @Test
-       public void testTaskFailingOnCheckpointErrorInAsyncPart() throws 
Exception {
-               Throwable failureCause = 
runTestTaskFailingOnCheckpointError(new AsyncFailureInducingStateBackend());
-               assertEquals(AsynchronousException.class, 
failureCause.getClass());
-       }
-
        @Test
        public void testBlockingNonInterruptibleCheckpoint() throws Exception {
 
                StateBackend lockingStateBackend = new 
BackendForTestStream(LockingOutputStream::new);
 
                Task task =
-                       createTask(new TestOperator(), lockingStateBackend, 
mock(CheckpointResponder.class), true);
+                       createTask(new TestOperator(), lockingStateBackend, 
mock(CheckpointResponder.class));
 
                // start the task and wait until it is in "restore"
                task.startTaskThread();
@@ -156,7 +140,7 @@ private void 
runTestDeclineOnCheckpointError(AbstractStateBackend backend) throw
                TestDeclinedCheckpointResponder checkpointResponder = new 
TestDeclinedCheckpointResponder();
 
                Task task =
-                       createTask(new FilterOperator(), backend, 
checkpointResponder, false);
+                       createTask(new FilterOperator(), backend, 
checkpointResponder);
 
                // start the task and wait until it is in "restore"
                task.startTaskThread();
@@ -169,20 +153,6 @@ private void 
runTestDeclineOnCheckpointError(AbstractStateBackend backend) throw
                task.getExecutingThread().join();
        }
 
-       private Throwable 
runTestTaskFailingOnCheckpointError(AbstractStateBackend backend) throws 
Exception {
-
-               Task task =
-                       createTask(new FilterOperator(), backend, 
mock(CheckpointResponder.class), true);
-
-               // start the task and wait until it is in "restore"
-               task.startTaskThread();
-
-               task.getExecutingThread().join();
-
-               assertEquals(ExecutionState.FAILED, task.getExecutionState());
-               return task.getFailureCause();
-       }
-
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
@@ -190,8 +160,7 @@ private Throwable 
runTestTaskFailingOnCheckpointError(AbstractStateBackend backe
        private static Task createTask(
                StreamOperator<?> op,
                StateBackend backend,
-               CheckpointResponder checkpointResponder,
-               boolean failOnCheckpointErrors) throws IOException {
+               CheckpointResponder checkpointResponder) throws IOException {
 
                Configuration taskConfig = new Configuration();
                StreamConfig cfg = new StreamConfig(taskConfig);
@@ -200,7 +169,6 @@ private static Task createTask(
                cfg.setStateBackend(backend);
 
                ExecutionConfig executionConfig = new ExecutionConfig();
-               
executionConfig.setFailTaskOnCheckpointError(failOnCheckpointErrors);
 
                JobInformation jobInformation = new JobInformation(
                                new JobID(),
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
index 95c357dc4b6..07b2fd5fb23 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
@@ -105,6 +105,8 @@ private void setUpWithCheckpointInterval(long 
checkpointInterval) throws Excepti
                                10,
                                1,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                               true,
+                               0,
                                true),
                        null));
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 7eebde86028..bb40188f690 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -188,6 +188,8 @@ private JobGraph createJobGraph(ExecutionMode mode) {
                env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
                env.setRestartStrategy(RestartStrategies.noRestart());
                env.setStateBackend((StateBackend) new MemoryStateBackend());
+               env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
+               
env.getCheckpointConfig().setTolerableFailureNumber(Integer.MAX_VALUE);
 
                switch (mode) {
                        case MIGRATE:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> ----------------------------------------
>
>                 Key: FLINK-10074
>                 URL: https://issues.apache.org/jira/browse/FLINK-10074
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>            Reporter: Thomas Weise
>            Assignee: vinoyang
>            Priority: Major
>              Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to