This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b760d55  [FLINK-11662] Disable task to fail on checkpoint errors
b760d55 is described below

commit b760d556ca3757f3d0d1b8c81e5182b1bcc3dba3
Author: Yun Tang <myas...@live.com>
AuthorDate: Thu Jun 27 19:40:59 2019 +0800

    [FLINK-11662] Disable task to fail on checkpoint errors
    
    This closes #8745.
---
 .../apache/flink/api/common/ExecutionConfig.java   |  19 ++--
 .../tests/DataStreamAllroundTestJobFactory.java    |  18 ++--
 .../checkpoint/CheckpointFailureManager.java       |   2 +-
 .../tasks/CheckpointCoordinatorConfiguration.java  |   4 +-
 .../api/environment/CheckpointConfig.java          |  67 +++++++++++--
 .../api/graph/StreamingJobGraphGenerator.java      |   7 +-
 .../api/operators/AbstractStreamOperator.java      |   4 +-
 .../tasks/CheckpointExceptionHandlerFactory.java   |  26 +----
 .../flink/streaming/runtime/tasks/StreamTask.java  |  53 ++--------
 ...heckpointExceptionHandlerConfigurationTest.java | 107 ++++++---------------
 .../tasks/CheckpointExceptionHandlerTest.java      |  21 +---
 .../streaming/runtime/tasks/StreamTaskTest.java    |  32 ++----
 .../tasks/TaskCheckpointingBehaviourTest.java      |  38 +-------
 13 files changed, 143 insertions(+), 255 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index b6475d5..fd3b358 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -154,7 +154,10 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
        /** This flag defines if we use compression for the state snapshot data 
or not. Default: false */
        private boolean useSnapshotCompression = false;
 
-       /** Determines if a task fails or not if there is an error in writing 
its checkpoint data. Default: true */
+       /**
+        * @deprecated Should no longer be used because we would not support to 
let task directly fail on checkpoint error.
+        */
+       @Deprecated
        private boolean failTaskOnCheckpointError = true;
 
        /** The default input dependency constraint to schedule tasks. */
@@ -948,20 +951,22 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
        }
 
        /**
-        * This method is visible because of the way the configuration is 
currently forwarded from the checkpoint config to
-        * the task. This should not be called by the user, please use 
CheckpointConfig.isFailTaskOnCheckpointError()
-        * instead.
+        * @deprecated This method takes no effect since we would not forward 
the configuration from the checkpoint config
+        * to the task, and we have not supported task to fail on checkpoint 
error.
+        * Please use CheckpointConfig.getTolerableCheckpointFailureNumber() to 
know the behavior on checkpoint errors.
         */
+       @Deprecated
        @Internal
        public boolean isFailTaskOnCheckpointError() {
                return failTaskOnCheckpointError;
        }
 
        /**
-        * This method is visible because of the way the configuration is 
currently forwarded from the checkpoint config to
-        * the task. This should not be called by the user, please use 
CheckpointConfig.setFailOnCheckpointingErrors(...)
-        * instead.
+        * @deprecated This method takes no effect since we would not forward 
the configuration from the checkpoint config
+        * to the task, and we have not supported task to fail on checkpoint 
error.
+        * Please use CheckpointConfig.setTolerableCheckpointFailureNumber(int) 
to determine the behavior on checkpoint errors.
         */
+       @Deprecated
        @Internal
        public void setFailTaskOnCheckpointError(boolean 
failTaskOnCheckpointError) {
                this.failTaskOnCheckpointError = failTaskOnCheckpointError;
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 913d030..31dcfb8 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -81,7 +81,7 @@ import static 
org.apache.flink.streaming.tests.TestOperatorEnum.RESULT_TYPE_QUER
  *     <li>environment.checkpoint_interval (long, default - 1000): the 
checkpoint interval.</li>
  *     <li>environment.externalize_checkpoint (boolean, default - false): 
whether or not checkpoints should be externalized.</li>
  *     <li>environment.externalize_checkpoint.cleanup (String, default - 
'retain'): Configures the cleanup mode for externalized checkpoints. Can be 
'retain' or 'delete'.</li>
- *     <li>environment.fail_on_checkpointing_errors (String, default - true): 
Sets the expected behaviour for tasks in case that they encounter an error in 
their checkpointing procedure.</li>
+ *     <li>environment.tolerable_checkpoint_failure_number (int, default - 0): 
Sets the expected behaviour for the job manager in case that it received 
declined checkpoints from tasks.</li>
  *     <li>environment.parallelism (int, default - 1): parallelism to use for 
the job.</li>
  *     <li>environment.max_parallelism (int, default - 128): max parallelism 
to use for the job</li>
  *     <li>environment.restart_strategy (String, default - 'fixed_delay'): The 
failure restart strategy to use. Can be 'fixed_delay' or 'no_restart'.</li>
@@ -150,9 +150,9 @@ public class DataStreamAllroundTestJobFactory {
                .key("environment.externalize_checkpoint.cleanup")
                .defaultValue("retain");
 
-       private static final ConfigOption<Boolean> 
ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS = ConfigOptions
-               .key("environment.fail_on_checkpointing_errors")
-               .defaultValue(true);
+       private static final ConfigOption<Integer> 
ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER = ConfigOptions
+               .key("environment.tolerable_declined_checkpoint_number ")
+               .defaultValue(0);
 
        private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = 
ConfigOptions
                .key("environment.parallelism")
@@ -272,12 +272,12 @@ public class DataStreamAllroundTestJobFactory {
                                        throw new 
IllegalArgumentException("Unknown clean up mode for externalized checkpoints: " 
+ cleanupModeConfig);
                        }
                        
env.getCheckpointConfig().enableExternalizedCheckpoints(cleanupMode);
-               }
 
-               final boolean failOnCheckpointingErrors = pt.getBoolean(
-                       ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS.key(),
-                       
ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS.defaultValue());
-               
env.getCheckpointConfig().setFailOnCheckpointingErrors(failOnCheckpointingErrors);
+                       final int tolerableDeclinedCheckpointNumber = pt.getInt(
+                               
ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER.key(),
+                               
ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER.defaultValue());
+                       
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber);
+               }
        }
 
        private static void setupParallelism(final StreamExecutionEnvironment 
env, final ParameterTool pt) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 4a95cdd..568e836 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -31,7 +31,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class CheckpointFailureManager {
 
-       private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+       public static final int UNLIMITED_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
 
        private final int tolerableCpFailureNumber;
        private final FailJobCallback failureCallback;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index cff5777..74fcdf3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -65,7 +65,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                        int maxConcurrentCheckpoints,
                        CheckpointRetentionPolicy checkpointRetentionPolicy,
                        boolean isExactlyOnce,
-                       boolean isPerfetCheckpointForRecovery,
+                       boolean isPreferCheckpointForRecovery,
                        int tolerableCpFailureNumber) {
 
                // sanity checks
@@ -81,7 +81,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
                this.checkpointRetentionPolicy = 
Preconditions.checkNotNull(checkpointRetentionPolicy);
                this.isExactlyOnce = isExactlyOnce;
-               this.isPreferCheckpointForRecovery = 
isPerfetCheckpointForRecovery;
+               this.isPreferCheckpointForRecovery = 
isPreferCheckpointForRecovery;
                this.tolerableCheckpointFailureNumber = 
tolerableCpFailureNumber;
        }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index c2c3536..033f55a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -23,7 +23,11 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -34,6 +38,8 @@ public class CheckpointConfig implements java.io.Serializable 
{
 
        private static final long serialVersionUID = -750378776078908147L;
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointConfig.class);
+
        /** The default checkpoint mode: exactly once. */
        public static final CheckpointingMode DEFAULT_MODE = 
CheckpointingMode.EXACTLY_ONCE;
 
@@ -46,6 +52,8 @@ public class CheckpointConfig implements java.io.Serializable 
{
        /** The default limit of concurrently happening checkpoints: one. */
        public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
 
+       public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
+
        // 
------------------------------------------------------------------------
 
        /** Checkpointing mode (exactly-once vs. at-least-once). */
@@ -69,14 +77,24 @@ public class CheckpointConfig implements 
java.io.Serializable {
        /** Cleanup behaviour for persistent checkpoints. */
        private ExternalizedCheckpointCleanup externalizedCheckpointCleanup;
 
-       /** Determines if a tasks are failed or not if there is an error in 
their checkpointing. Default: true */
+       /**
+        * Task would not fail if there is an error in their checkpointing.
+        *
+        * <p>{@link #tolerableCheckpointFailureNumber} would always overrule 
this deprecated field if they have conflicts.
+        *
+        * @deprecated Use {@link #tolerableCheckpointFailureNumber}.
+        */
+       @Deprecated
        private boolean failOnCheckpointingErrors = true;
 
        /** Determines if a job will fallback to checkpoint when there is a 
more recent savepoint. **/
        private boolean preferCheckpointForRecovery = false;
 
-       /** Determines the threshold that we tolerance checkpoint failure 
number. */
-       private int tolerableCheckpointFailureNumber = 0;
+       /**
+        * Determines the threshold that we tolerance declined checkpoint 
failure number.
+        * The default value is -1 meaning undetermined and not set via {@link 
#setTolerableCheckpointFailureNumber(int)}.
+        * */
+       private int tolerableCheckpointFailureNumber = 
UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER;
 
        // 
------------------------------------------------------------------------
 
@@ -239,27 +257,57 @@ public class CheckpointConfig implements 
java.io.Serializable {
        }
 
        /**
-        * This determines the behaviour of tasks if there is an error in their 
local checkpointing. If this returns true,
-        * tasks will fail as a reaction. If this returns false, task will only 
decline the failed checkpoint.
+        * This determines the behaviour when meeting checkpoint errors.
+        * If this returns true, which is equivalent to get 
tolerableCheckpointFailureNumber as zero, job manager would
+        * fail the whole job once it received a decline checkpoint message.
+        * If this returns false, which is equivalent to get 
tolerableCheckpointFailureNumber as the maximum of integer (means unlimited),
+        * job manager would not fail the whole job no matter how many declined 
checkpoints it received.
+        *
+        * @deprecated Use {@link #getTolerableCheckpointFailureNumber()}.
         */
+       @Deprecated
        public boolean isFailOnCheckpointingErrors() {
                return failOnCheckpointingErrors;
        }
 
        /**
-        * Sets the expected behaviour for tasks in case that they encounter an 
error in their checkpointing procedure.
-        * If this is set to true, the task will fail on checkpointing error. 
If this is set to false, the task will only
-        * decline a the checkpoint and continue running. The default is true.
+        * Sets the expected behaviour for tasks in case that they encounter an 
error when checkpointing.
+        * If this is set as true, which is equivalent to set 
tolerableCheckpointFailureNumber as zero, job manager would
+        * fail the whole job once it received a decline checkpoint message.
+        * If this is set as false, which is equivalent to set 
tolerableCheckpointFailureNumber as the maximum of integer (means unlimited),
+        * job manager would not fail the whole job no matter how many declined 
checkpoints it received.
+        *
+        * <p>{@link #setTolerableCheckpointFailureNumber(int)} would always 
overrule this deprecated method if they have conflicts.
+        *
+        * @deprecated Use {@link #setTolerableCheckpointFailureNumber(int)}.
         */
+       @Deprecated
        public void setFailOnCheckpointingErrors(boolean 
failOnCheckpointingErrors) {
+               if (tolerableCheckpointFailureNumber != 
UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER) {
+                       LOG.warn("Since tolerableCheckpointFailureNumber has 
been configured as {}, deprecated #setFailOnCheckpointingErrors(boolean) " +
+                               "method would not take any effect and please 
use #setTolerableCheckpointFailureNumber(int) method to " +
+                               "determine your expected behaviour when 
checkpoint errors on task side.", tolerableCheckpointFailureNumber);
+                       return;
+               }
                this.failOnCheckpointingErrors = failOnCheckpointingErrors;
+               if (failOnCheckpointingErrors) {
+                       this.tolerableCheckpointFailureNumber = 0;
+               } else {
+                       this.tolerableCheckpointFailureNumber = 
UNLIMITED_TOLERABLE_FAILURE_NUMBER;
+               }
        }
 
        /**
         * Get the tolerable checkpoint failure number which used by the 
checkpoint failure manager
         * to determine when we need to fail the job.
+        *
+        * <p>If the {@link #tolerableCheckpointFailureNumber} has not been 
configured, this method would return 0
+        * which means the checkpoint failure manager would not tolerate any 
declined checkpoint failure.
         */
        public int getTolerableCheckpointFailureNumber() {
+               if (tolerableCheckpointFailureNumber == 
UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER) {
+                       return 0;
+               }
                return tolerableCheckpointFailureNumber;
        }
 
@@ -268,6 +316,9 @@ public class CheckpointConfig implements 
java.io.Serializable {
         * we do not tolerance any checkpoint failure.
         */
        public void setTolerableCheckpointFailureNumber(int 
tolerableCheckpointFailureNumber) {
+               if (tolerableCheckpointFailureNumber < 0) {
+                       throw new IllegalArgumentException("The tolerable 
failure checkpoint number must be non-negative.");
+               }
                this.tolerableCheckpointFailureNumber = 
tolerableCheckpointFailureNumber;
        }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index e191dea..4c11fa3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -606,11 +605,7 @@ public class StreamingJobGraphGenerator {
                CheckpointConfig cfg = streamGraph.getCheckpointConfig();
 
                long interval = cfg.getCheckpointInterval();
-               if (interval >= 10) {
-                       ExecutionConfig executionConfig = 
streamGraph.getExecutionConfig();
-                       // propagate the expected behaviour for checkpoint 
errors to task.
-                       
executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
-               } else {
+               if (interval < 10) {
                        // interval of max value means disable periodic 
checkpoint
                        interval = Long.MAX_VALUE;
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 09d8bca..7d2eda5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -32,6 +32,8 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -419,7 +421,7 @@ public abstract class AbstractStreamOperator<OUT>
                        if (!getContainingTask().isCanceled()) {
                                LOG.info(snapshotFailMessage, 
snapshotException);
                        }
-                       throw new Exception(snapshotFailMessage, 
snapshotException);
+                       throw new CheckpointException(snapshotFailMessage, 
CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
                }
 
                return snapshotInProgress;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
index 430f43e..64ab71c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
@@ -29,32 +29,12 @@ import org.apache.flink.util.Preconditions;
 public class CheckpointExceptionHandlerFactory {
 
        /**
-        * Returns a {@link CheckpointExceptionHandler} that either causes a 
task to fail completely or to just declines
-        * checkpoint on exception, depending on the parameter flag.
+        * Returns a {@link CheckpointExceptionHandler} that just declines 
checkpoint on exception.
         */
        public CheckpointExceptionHandler createCheckpointExceptionHandler(
-               boolean failTaskOnCheckpointException,
                Environment environment) {
 
-               if (failTaskOnCheckpointException) {
-                       return new FailingCheckpointExceptionHandler();
-               } else {
-                       return new 
DecliningCheckpointExceptionHandler(environment);
-               }
-       }
-
-       /**
-        * This handler makes the task fail by rethrowing a reported exception.
-        */
-       static final class FailingCheckpointExceptionHandler implements 
CheckpointExceptionHandler {
-
-               @Override
-               public void tryHandleCheckpointException(
-                       CheckpointMetaData checkpointMetaData,
-                       Exception exception) throws Exception {
-
-                       throw exception;
-               }
+               return new DecliningCheckpointExceptionHandler(environment);
        }
 
        /**
@@ -71,7 +51,7 @@ public class CheckpointExceptionHandlerFactory {
                @Override
                public void tryHandleCheckpointException(
                        CheckpointMetaData checkpointMetaData,
-                       Exception exception) throws Exception {
+                       Exception exception) {
 
                        
environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), exception);
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 3927e46..e5f56b2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -183,10 +183,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        private ExecutorService asyncOperationsThreadPool;
 
        /** Handler for exceptions during checkpointing in the stream task. 
Used in synchronous part of the checkpoint. */
-       private CheckpointExceptionHandler 
synchronousCheckpointExceptionHandler;
-
-       /** Wrapper for synchronousCheckpointExceptionHandler to deal with 
rethrown exceptions. Used in the async part. */
-       private AsyncCheckpointExceptionHandler 
asynchronousCheckpointExceptionHandler;
+       private CheckpointExceptionHandler checkpointExceptionHandler;
 
        private final 
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters;
 
@@ -323,11 +320,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                        CheckpointExceptionHandlerFactory 
cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
 
-                       synchronousCheckpointExceptionHandler = 
cpExceptionHandlerFactory.createCheckpointExceptionHandler(
-                               
getExecutionConfig().isFailTaskOnCheckpointError(),
-                               getEnvironment());
-
-                       asynchronousCheckpointExceptionHandler = new 
AsyncCheckpointExceptionHandler(this);
+                       checkpointExceptionHandler = cpExceptionHandlerFactory
+                               
.createCheckpointExceptionHandler(getEnvironment());
 
                        stateBackend = createStateBackend();
                        checkpointStorage = 
stateBackend.createCheckpointStorage(getEnvironment().getJobID());
@@ -1062,9 +1056,12 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                                        // We only report the exception for the 
original cause of fail and cleanup.
                                        // Otherwise this followup exception 
could race the original exception in failing the task.
-                                       
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
-                                               checkpointMetaData,
-                                               checkpointException);
+                                       try {
+                                               
owner.checkpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData,
 checkpointException);
+                                       } catch (Exception unhandled) {
+                                               AsynchronousException 
asyncException = new AsynchronousException(unhandled);
+                                               
owner.handleAsyncException("Failure in asynchronous checkpoint 
materialization", asyncException);
+                                       }
 
                                        currentState = 
CheckpointingOperation.AsyncCheckpointState.DISCARDED;
                                } else {
@@ -1227,7 +1224,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                        // operation, and without the failure, 
the task would go back to normal execution.
                                        throw ex;
                                } else {
-                                       
owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData,
 ex);
+                                       
owner.checkpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData,
 ex);
                                }
                        }
                }
@@ -1252,36 +1249,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
        }
 
-       /**
-        * Wrapper for synchronous {@link CheckpointExceptionHandler}. This 
implementation catches unhandled, rethrown
-        * exceptions and reports them through {@link 
#handleAsyncException(String, Throwable)}. As this implementation
-        * always handles the exception in some way, it never rethrows.
-        */
-       static final class AsyncCheckpointExceptionHandler implements 
CheckpointExceptionHandler {
-
-               /** Owning stream task to which we report async exceptions. */
-               final StreamTask<?, ?> owner;
-
-               /** Synchronous exception handler to which we delegate. */
-               final CheckpointExceptionHandler 
synchronousCheckpointExceptionHandler;
-
-               AsyncCheckpointExceptionHandler(StreamTask<?, ?> owner) {
-                       this.owner = Preconditions.checkNotNull(owner);
-                       this.synchronousCheckpointExceptionHandler =
-                               
Preconditions.checkNotNull(owner.synchronousCheckpointExceptionHandler);
-               }
-
-               @Override
-               public void tryHandleCheckpointException(CheckpointMetaData 
checkpointMetaData, Exception exception) {
-                       try {
-                               
synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData,
 exception);
-                       } catch (Exception unhandled) {
-                               AsynchronousException asyncException = new 
AsynchronousException(unhandled);
-                               owner.handleAsyncException("Failure in 
asynchronous checkpoint materialization", asyncException);
-                       }
-               }
-       }
-
        @VisibleForTesting
        public static <OUT> 
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
createRecordWriters(
                        StreamConfig configuration,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
index 17ab88f..33a3dc8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
@@ -18,17 +18,11 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -40,82 +34,47 @@ import org.junit.Test;
 public class CheckpointExceptionHandlerConfigurationTest extends TestLogger {
 
        @Test
-       public void testConfigurationFailOnException() throws Exception {
-               testConfigForwarding(true);
-       }
-
-       @Test
-       public void testConfigurationDeclineOnException() throws Exception {
-               testConfigForwarding(false);
+       public void testCheckpointConfigDefault() {
+               StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               CheckpointConfig checkpointConfig = 
streamExecutionEnvironment.getCheckpointConfig();
+               
Assert.assertTrue(checkpointConfig.isFailOnCheckpointingErrors());
+               Assert.assertEquals(0, 
checkpointConfig.getTolerableCheckpointFailureNumber());
        }
 
        @Test
-       public void testFailIsDefaultConfig() {
-               ExecutionConfig newExecutionConfig = new ExecutionConfig();
-               
Assert.assertTrue(newExecutionConfig.isFailTaskOnCheckpointError());
-       }
-
-       private void testConfigForwarding(boolean failOnException) throws 
Exception {
-
-               final boolean expectedHandlerFlag = failOnException;
-
-               final DummyEnvironment environment = new 
DummyEnvironment("test", 1, 0);
-               environment.setTaskStateManager(new TestTaskStateManager());
-               
environment.getExecutionConfig().setFailTaskOnCheckpointError(expectedHandlerFlag);
-
-               final CheckpointExceptionHandlerFactory inspectingFactory = new 
CheckpointExceptionHandlerFactory() {
-
-                       @Override
-                       public CheckpointExceptionHandler 
createCheckpointExceptionHandler(
-                               boolean failTaskOnCheckpointException,
-                               Environment environment) {
-
-                               Assert.assertEquals(expectedHandlerFlag, 
failTaskOnCheckpointException);
-                               return 
super.createCheckpointExceptionHandler(failTaskOnCheckpointException, 
environment);
-                       }
-               };
-
-               StreamTask streamTask = new StreamTask(environment, null) {
-                       @Override
-                       protected void init() throws Exception {}
-
-                       @Override
-                       protected void performDefaultAction(ActionContext 
context) throws Exception {
-                               context.allActionsCompleted();
-                       }
+       public void testSetCheckpointConfig() {
+               StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               CheckpointConfig checkpointConfig = 
streamExecutionEnvironment.getCheckpointConfig();
 
-                       @Override
-                       protected void cleanup() throws Exception {}
+               // use deprecated API to set not fail on checkpoint errors
+               checkpointConfig.setFailOnCheckpointingErrors(false);
+               
Assert.assertFalse(checkpointConfig.isFailOnCheckpointingErrors());
+               
Assert.assertEquals(CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER,
 checkpointConfig.getTolerableCheckpointFailureNumber());
 
-                       @Override
-                       protected void cancelTask() throws Exception {}
+               // use new API to set tolerable declined checkpoint number
+               checkpointConfig.setTolerableCheckpointFailureNumber(5);
+               Assert.assertEquals(5, 
checkpointConfig.getTolerableCheckpointFailureNumber());
 
-                       @Override
-                       protected CheckpointExceptionHandlerFactory 
createCheckpointExceptionHandlerFactory() {
-                               return inspectingFactory;
-                       }
-               };
-
-               streamTask.invoke();
+               // after we configure the tolerable declined checkpoint number, 
deprecated API would not take effect
+               checkpointConfig.setFailOnCheckpointingErrors(true);
+               Assert.assertEquals(5, 
checkpointConfig.getTolerableCheckpointFailureNumber());
        }
 
        @Test
-       public void testCheckpointConfigDefault() throws Exception {
-               StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               
Assert.assertTrue(streamExecutionEnvironment.getCheckpointConfig().isFailOnCheckpointingErrors());
+       public void testPropagationFailFromCheckpointConfig() {
+               try {
+                       doTestPropagationFromCheckpointConfig(true);
+               } catch (IllegalArgumentException ignored) {
+                       // ignored
+               }
        }
 
        @Test
-       public void testPropagationFailFromCheckpointConfig() throws Exception {
-               doTestPropagationFromCheckpointConfig(true);
-       }
-
-       @Test
-       public void testPropagationDeclineFromCheckpointConfig() throws 
Exception {
+       public void testPropagationDeclineFromCheckpointConfig() {
                doTestPropagationFromCheckpointConfig(false);
        }
 
-       public void doTestPropagationFromCheckpointConfig(boolean 
failTaskOnCheckpointErrors) throws Exception {
+       public void doTestPropagationFromCheckpointConfig(boolean 
failTaskOnCheckpointErrors) {
                StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
                streamExecutionEnvironment.setParallelism(1);
                
streamExecutionEnvironment.getCheckpointConfig().setCheckpointInterval(1000);
@@ -123,7 +82,7 @@ public class CheckpointExceptionHandlerConfigurationTest 
extends TestLogger {
                streamExecutionEnvironment.addSource(new 
SourceFunction<Integer>() {
 
                        @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                       public void run(SourceContext<Integer> ctx) {
                        }
 
                        @Override
@@ -131,13 +90,5 @@ public class CheckpointExceptionHandlerConfigurationTest 
extends TestLogger {
                        }
 
                }).addSink(new DiscardingSink<>());
-
-               StreamGraph streamGraph = 
streamExecutionEnvironment.getStreamGraph();
-               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
-               SerializedValue<ExecutionConfig> serializedExecutionConfig = 
jobGraph.getSerializedExecutionConfig();
-               ExecutionConfig executionConfig =
-                       
serializedExecutionConfig.deserializeValue(Thread.currentThread().getContextClassLoader());
-
-               Assert.assertEquals(failTaskOnCheckpointErrors, 
executionConfig.isFailTaskOnCheckpointError());
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
index 2f58162..2632c01 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
@@ -31,30 +31,11 @@ import org.junit.Test;
 public class CheckpointExceptionHandlerTest extends TestLogger {
 
        @Test
-       public void testRethrowingHandler() {
-               DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
-               CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
-               CheckpointExceptionHandler exceptionHandler =
-                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
-
-               CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
-               Exception testException = new Exception("test");
-               try {
-                       
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
-                       Assert.fail("Exception not rethrown.");
-               } catch (Exception e) {
-                       Assert.assertEquals(testException, e);
-               }
-
-               Assert.assertNull(environment.getLastDeclinedCheckpointCause());
-       }
-
-       @Test
        public void testDecliningHandler() {
                DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
                CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
                CheckpointExceptionHandler exceptionHandler =
-                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(false, 
environment);
+                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(environment);
 
                CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
                Exception testException = new Exception("test");
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index efc2505..e2171b7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -310,16 +310,16 @@ public class StreamTaskTest extends TestLogger {
        }
 
        @Test
-       public void testFailingCheckpointStreamOperator() throws Exception {
+       public void testDecliningCheckpointStreamOperator() throws Exception {
                final long checkpointId = 42L;
                final long timestamp = 1L;
 
                TaskInfo mockTaskInfo = mock(TaskInfo.class);
                
when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
                when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-               Environment mockEnvironment = new 
MockEnvironmentBuilder().build();
+               CheckpointExceptionHandlerTest.DeclineDummyEnvironment 
declineDummyEnvironment = new 
CheckpointExceptionHandlerTest.DeclineDummyEnvironment();
 
-               StreamTask<?, ?> streamTask = new 
EmptyStreamTask(mockEnvironment);
+               StreamTask<?, ?> streamTask = new 
EmptyStreamTask(declineDummyEnvironment);
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
 
                // mock the operators
@@ -360,19 +360,11 @@ public class StreamTaskTest extends TestLogger {
 
                CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
                CheckpointExceptionHandler checkpointExceptionHandler =
-                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
mockEnvironment);
-               Whitebox.setInternalState(streamTask, 
"synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
-
-               StreamTask.AsyncCheckpointExceptionHandler 
asyncCheckpointExceptionHandler =
-                       new 
StreamTask.AsyncCheckpointExceptionHandler(streamTask);
-               Whitebox.setInternalState(streamTask, 
"asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
-
-               try {
-                       streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forCheckpointWithDefaultLocation(), false);
-                       fail("Expected test exception here.");
-               } catch (Exception e) {
-                       assertEquals(testException, e.getCause());
-               }
+                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(declineDummyEnvironment);
+               Whitebox.setInternalState(streamTask, 
"checkpointExceptionHandler", checkpointExceptionHandler);
+
+               streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forCheckpointWithDefaultLocation(), false);
+               assertEquals(testException, 
declineDummyEnvironment.getLastDeclinedCheckpointCause());
 
                verify(operatorSnapshotResult1).cancel();
                verify(operatorSnapshotResult2).cancel();
@@ -432,12 +424,8 @@ public class StreamTaskTest extends TestLogger {
 
                CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
                CheckpointExceptionHandler checkpointExceptionHandler =
-                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
mockEnvironment);
-               Whitebox.setInternalState(streamTask, 
"synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
-
-               StreamTask.AsyncCheckpointExceptionHandler 
asyncCheckpointExceptionHandler =
-                       new 
StreamTask.AsyncCheckpointExceptionHandler(streamTask);
-               Whitebox.setInternalState(streamTask, 
"asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
+                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(mockEnvironment);
+               Whitebox.setInternalState(streamTask, 
"checkpointExceptionHandler", checkpointExceptionHandler);
 
                
mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
                streamTask.triggerCheckpoint(checkpointMetaData, 
CheckpointOptions.forCheckpointWithDefaultLocation(), false);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index c02ff99..589b64c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -99,7 +99,6 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 
@@ -122,27 +121,12 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
        }
 
        @Test
-       public void testTaskFailingOnCheckpointErrorInSyncPart() throws 
Exception {
-               Throwable failureCause = 
runTestTaskFailingOnCheckpointError(new SyncFailureInducingStateBackend());
-               assertNotNull(failureCause);
-
-               String expectedMessageStart = "Could not perform checkpoint";
-               assertEquals(expectedMessageStart, 
failureCause.getMessage().substring(0, expectedMessageStart.length()));
-       }
-
-       @Test
-       public void testTaskFailingOnCheckpointErrorInAsyncPart() throws 
Exception {
-               Throwable failureCause = 
runTestTaskFailingOnCheckpointError(new AsyncFailureInducingStateBackend());
-               assertEquals(AsynchronousException.class, 
failureCause.getClass());
-       }
-
-       @Test
        public void testBlockingNonInterruptibleCheckpoint() throws Exception {
 
                StateBackend lockingStateBackend = new 
BackendForTestStream(LockingOutputStream::new);
 
                Task task =
-                       createTask(new TestOperator(), lockingStateBackend, 
mock(CheckpointResponder.class), true);
+                       createTask(new TestOperator(), lockingStateBackend, 
mock(CheckpointResponder.class));
 
                // start the task and wait until it is in "restore"
                task.startTaskThread();
@@ -162,7 +146,7 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                TestDeclinedCheckpointResponder checkpointResponder = new 
TestDeclinedCheckpointResponder();
 
                Task task =
-                       createTask(new FilterOperator(), backend, 
checkpointResponder, false);
+                       createTask(new FilterOperator(), backend, 
checkpointResponder);
 
                // start the task and wait until it is in "restore"
                task.startTaskThread();
@@ -175,20 +159,6 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                task.getExecutingThread().join();
        }
 
-       private Throwable 
runTestTaskFailingOnCheckpointError(AbstractStateBackend backend) throws 
Exception {
-
-               Task task =
-                       createTask(new FilterOperator(), backend, 
mock(CheckpointResponder.class), true);
-
-               // start the task and wait until it is in "restore"
-               task.startTaskThread();
-
-               task.getExecutingThread().join();
-
-               assertEquals(ExecutionState.FAILED, task.getExecutionState());
-               return task.getFailureCause();
-       }
-
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
@@ -196,8 +166,7 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
        private static Task createTask(
                StreamOperator<?> op,
                StateBackend backend,
-               CheckpointResponder checkpointResponder,
-               boolean failOnCheckpointErrors) throws IOException {
+               CheckpointResponder checkpointResponder) throws IOException {
 
                Configuration taskConfig = new Configuration();
                StreamConfig cfg = new StreamConfig(taskConfig);
@@ -206,7 +175,6 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                cfg.setStateBackend(backend);
 
                ExecutionConfig executionConfig = new ExecutionConfig();
-               
executionConfig.setFailTaskOnCheckpointError(failOnCheckpointErrors);
 
                JobInformation jobInformation = new JobInformation(
                                new JobID(),

Reply via email to