This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b760d55 [FLINK-11662] Disable task to fail on checkpoint errors b760d55 is described below commit b760d556ca3757f3d0d1b8c81e5182b1bcc3dba3 Author: Yun Tang <myas...@live.com> AuthorDate: Thu Jun 27 19:40:59 2019 +0800 [FLINK-11662] Disable task to fail on checkpoint errors This closes #8745. --- .../apache/flink/api/common/ExecutionConfig.java | 19 ++-- .../tests/DataStreamAllroundTestJobFactory.java | 18 ++-- .../checkpoint/CheckpointFailureManager.java | 2 +- .../tasks/CheckpointCoordinatorConfiguration.java | 4 +- .../api/environment/CheckpointConfig.java | 67 +++++++++++-- .../api/graph/StreamingJobGraphGenerator.java | 7 +- .../api/operators/AbstractStreamOperator.java | 4 +- .../tasks/CheckpointExceptionHandlerFactory.java | 26 +---- .../flink/streaming/runtime/tasks/StreamTask.java | 53 ++-------- ...heckpointExceptionHandlerConfigurationTest.java | 107 ++++++--------------- .../tasks/CheckpointExceptionHandlerTest.java | 21 +--- .../streaming/runtime/tasks/StreamTaskTest.java | 32 ++---- .../tasks/TaskCheckpointingBehaviourTest.java | 38 +------- 13 files changed, 143 insertions(+), 255 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index b6475d5..fd3b358 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -154,7 +154,10 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut /** This flag defines if we use compression for the state snapshot data or not. Default: false */ private boolean useSnapshotCompression = false; - /** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: true */ + /** + * @deprecated Should no longer be used because we would not support to let task directly fail on checkpoint error. + */ + @Deprecated private boolean failTaskOnCheckpointError = true; /** The default input dependency constraint to schedule tasks. */ @@ -948,20 +951,22 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut } /** - * This method is visible because of the way the configuration is currently forwarded from the checkpoint config to - * the task. This should not be called by the user, please use CheckpointConfig.isFailTaskOnCheckpointError() - * instead. + * @deprecated This method takes no effect since we would not forward the configuration from the checkpoint config + * to the task, and we have not supported task to fail on checkpoint error. + * Please use CheckpointConfig.getTolerableCheckpointFailureNumber() to know the behavior on checkpoint errors. */ + @Deprecated @Internal public boolean isFailTaskOnCheckpointError() { return failTaskOnCheckpointError; } /** - * This method is visible because of the way the configuration is currently forwarded from the checkpoint config to - * the task. This should not be called by the user, please use CheckpointConfig.setFailOnCheckpointingErrors(...) - * instead. + * @deprecated This method takes no effect since we would not forward the configuration from the checkpoint config + * to the task, and we have not supported task to fail on checkpoint error. + * Please use CheckpointConfig.setTolerableCheckpointFailureNumber(int) to determine the behavior on checkpoint errors. */ + @Deprecated @Internal public void setFailTaskOnCheckpointError(boolean failTaskOnCheckpointError) { this.failTaskOnCheckpointError = failTaskOnCheckpointError; diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index 913d030..31dcfb8 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -81,7 +81,7 @@ import static org.apache.flink.streaming.tests.TestOperatorEnum.RESULT_TYPE_QUER * <li>environment.checkpoint_interval (long, default - 1000): the checkpoint interval.</li> * <li>environment.externalize_checkpoint (boolean, default - false): whether or not checkpoints should be externalized.</li> * <li>environment.externalize_checkpoint.cleanup (String, default - 'retain'): Configures the cleanup mode for externalized checkpoints. Can be 'retain' or 'delete'.</li> - * <li>environment.fail_on_checkpointing_errors (String, default - true): Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure.</li> + * <li>environment.tolerable_checkpoint_failure_number (int, default - 0): Sets the expected behaviour for the job manager in case that it received declined checkpoints from tasks.</li> * <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li> * <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li> * <li>environment.restart_strategy (String, default - 'fixed_delay'): The failure restart strategy to use. Can be 'fixed_delay' or 'no_restart'.</li> @@ -150,9 +150,9 @@ public class DataStreamAllroundTestJobFactory { .key("environment.externalize_checkpoint.cleanup") .defaultValue("retain"); - private static final ConfigOption<Boolean> ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS = ConfigOptions - .key("environment.fail_on_checkpointing_errors") - .defaultValue(true); + private static final ConfigOption<Integer> ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER = ConfigOptions + .key("environment.tolerable_declined_checkpoint_number ") + .defaultValue(0); private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = ConfigOptions .key("environment.parallelism") @@ -272,12 +272,12 @@ public class DataStreamAllroundTestJobFactory { throw new IllegalArgumentException("Unknown clean up mode for externalized checkpoints: " + cleanupModeConfig); } env.getCheckpointConfig().enableExternalizedCheckpoints(cleanupMode); - } - final boolean failOnCheckpointingErrors = pt.getBoolean( - ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS.key(), - ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS.defaultValue()); - env.getCheckpointConfig().setFailOnCheckpointingErrors(failOnCheckpointingErrors); + final int tolerableDeclinedCheckpointNumber = pt.getInt( + ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER.key(), + ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER.defaultValue()); + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber); + } } private static void setupParallelism(final StreamExecutionEnvironment env, final ParameterTool pt) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java index 4a95cdd..568e836 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java @@ -31,7 +31,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class CheckpointFailureManager { - private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + public static final int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; private final int tolerableCpFailureNumber; private final FailJobCallback failureCallback; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java index cff5777..74fcdf3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java @@ -65,7 +65,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { int maxConcurrentCheckpoints, CheckpointRetentionPolicy checkpointRetentionPolicy, boolean isExactlyOnce, - boolean isPerfetCheckpointForRecovery, + boolean isPreferCheckpointForRecovery, int tolerableCpFailureNumber) { // sanity checks @@ -81,7 +81,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; this.checkpointRetentionPolicy = Preconditions.checkNotNull(checkpointRetentionPolicy); this.isExactlyOnce = isExactlyOnce; - this.isPreferCheckpointForRecovery = isPerfetCheckpointForRecovery; + this.isPreferCheckpointForRecovery = isPreferCheckpointForRecovery; this.tolerableCheckpointFailureNumber = tolerableCpFailureNumber; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index c2c3536..033f55a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -23,7 +23,11 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.streaming.api.CheckpointingMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static java.util.Objects.requireNonNull; +import static org.apache.flink.runtime.checkpoint.CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -34,6 +38,8 @@ public class CheckpointConfig implements java.io.Serializable { private static final long serialVersionUID = -750378776078908147L; + private static final Logger LOG = LoggerFactory.getLogger(CheckpointConfig.class); + /** The default checkpoint mode: exactly once. */ public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE; @@ -46,6 +52,8 @@ public class CheckpointConfig implements java.io.Serializable { /** The default limit of concurrently happening checkpoints: one. */ public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1; + public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1; + // ------------------------------------------------------------------------ /** Checkpointing mode (exactly-once vs. at-least-once). */ @@ -69,14 +77,24 @@ public class CheckpointConfig implements java.io.Serializable { /** Cleanup behaviour for persistent checkpoints. */ private ExternalizedCheckpointCleanup externalizedCheckpointCleanup; - /** Determines if a tasks are failed or not if there is an error in their checkpointing. Default: true */ + /** + * Task would not fail if there is an error in their checkpointing. + * + * <p>{@link #tolerableCheckpointFailureNumber} would always overrule this deprecated field if they have conflicts. + * + * @deprecated Use {@link #tolerableCheckpointFailureNumber}. + */ + @Deprecated private boolean failOnCheckpointingErrors = true; /** Determines if a job will fallback to checkpoint when there is a more recent savepoint. **/ private boolean preferCheckpointForRecovery = false; - /** Determines the threshold that we tolerance checkpoint failure number. */ - private int tolerableCheckpointFailureNumber = 0; + /** + * Determines the threshold that we tolerance declined checkpoint failure number. + * The default value is -1 meaning undetermined and not set via {@link #setTolerableCheckpointFailureNumber(int)}. + * */ + private int tolerableCheckpointFailureNumber = UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER; // ------------------------------------------------------------------------ @@ -239,27 +257,57 @@ public class CheckpointConfig implements java.io.Serializable { } /** - * This determines the behaviour of tasks if there is an error in their local checkpointing. If this returns true, - * tasks will fail as a reaction. If this returns false, task will only decline the failed checkpoint. + * This determines the behaviour when meeting checkpoint errors. + * If this returns true, which is equivalent to get tolerableCheckpointFailureNumber as zero, job manager would + * fail the whole job once it received a decline checkpoint message. + * If this returns false, which is equivalent to get tolerableCheckpointFailureNumber as the maximum of integer (means unlimited), + * job manager would not fail the whole job no matter how many declined checkpoints it received. + * + * @deprecated Use {@link #getTolerableCheckpointFailureNumber()}. */ + @Deprecated public boolean isFailOnCheckpointingErrors() { return failOnCheckpointingErrors; } /** - * Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure. - * If this is set to true, the task will fail on checkpointing error. If this is set to false, the task will only - * decline a the checkpoint and continue running. The default is true. + * Sets the expected behaviour for tasks in case that they encounter an error when checkpointing. + * If this is set as true, which is equivalent to set tolerableCheckpointFailureNumber as zero, job manager would + * fail the whole job once it received a decline checkpoint message. + * If this is set as false, which is equivalent to set tolerableCheckpointFailureNumber as the maximum of integer (means unlimited), + * job manager would not fail the whole job no matter how many declined checkpoints it received. + * + * <p>{@link #setTolerableCheckpointFailureNumber(int)} would always overrule this deprecated method if they have conflicts. + * + * @deprecated Use {@link #setTolerableCheckpointFailureNumber(int)}. */ + @Deprecated public void setFailOnCheckpointingErrors(boolean failOnCheckpointingErrors) { + if (tolerableCheckpointFailureNumber != UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER) { + LOG.warn("Since tolerableCheckpointFailureNumber has been configured as {}, deprecated #setFailOnCheckpointingErrors(boolean) " + + "method would not take any effect and please use #setTolerableCheckpointFailureNumber(int) method to " + + "determine your expected behaviour when checkpoint errors on task side.", tolerableCheckpointFailureNumber); + return; + } this.failOnCheckpointingErrors = failOnCheckpointingErrors; + if (failOnCheckpointingErrors) { + this.tolerableCheckpointFailureNumber = 0; + } else { + this.tolerableCheckpointFailureNumber = UNLIMITED_TOLERABLE_FAILURE_NUMBER; + } } /** * Get the tolerable checkpoint failure number which used by the checkpoint failure manager * to determine when we need to fail the job. + * + * <p>If the {@link #tolerableCheckpointFailureNumber} has not been configured, this method would return 0 + * which means the checkpoint failure manager would not tolerate any declined checkpoint failure. */ public int getTolerableCheckpointFailureNumber() { + if (tolerableCheckpointFailureNumber == UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER) { + return 0; + } return tolerableCheckpointFailureNumber; } @@ -268,6 +316,9 @@ public class CheckpointConfig implements java.io.Serializable { * we do not tolerance any checkpoint failure. */ public void setTolerableCheckpointFailureNumber(int tolerableCheckpointFailureNumber) { + if (tolerableCheckpointFailureNumber < 0) { + throw new IllegalArgumentException("The tolerable failure checkpoint number must be non-negative."); + } this.tolerableCheckpointFailureNumber = tolerableCheckpointFailureNumber; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index e191dea..4c11fa3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.operators.ResourceSpec; @@ -606,11 +605,7 @@ public class StreamingJobGraphGenerator { CheckpointConfig cfg = streamGraph.getCheckpointConfig(); long interval = cfg.getCheckpointInterval(); - if (interval >= 10) { - ExecutionConfig executionConfig = streamGraph.getExecutionConfig(); - // propagate the expected behaviour for checkpoint errors to task. - executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors()); - } else { + if (interval < 10) { // interval of max value means disable periodic checkpoint interval = Long.MAX_VALUE; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 09d8bca..7d2eda5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -32,6 +32,8 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -419,7 +421,7 @@ public abstract class AbstractStreamOperator<OUT> if (!getContainingTask().isCanceled()) { LOG.info(snapshotFailMessage, snapshotException); } - throw new Exception(snapshotFailMessage, snapshotException); + throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException); } return snapshotInProgress; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java index 430f43e..64ab71c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java @@ -29,32 +29,12 @@ import org.apache.flink.util.Preconditions; public class CheckpointExceptionHandlerFactory { /** - * Returns a {@link CheckpointExceptionHandler} that either causes a task to fail completely or to just declines - * checkpoint on exception, depending on the parameter flag. + * Returns a {@link CheckpointExceptionHandler} that just declines checkpoint on exception. */ public CheckpointExceptionHandler createCheckpointExceptionHandler( - boolean failTaskOnCheckpointException, Environment environment) { - if (failTaskOnCheckpointException) { - return new FailingCheckpointExceptionHandler(); - } else { - return new DecliningCheckpointExceptionHandler(environment); - } - } - - /** - * This handler makes the task fail by rethrowing a reported exception. - */ - static final class FailingCheckpointExceptionHandler implements CheckpointExceptionHandler { - - @Override - public void tryHandleCheckpointException( - CheckpointMetaData checkpointMetaData, - Exception exception) throws Exception { - - throw exception; - } + return new DecliningCheckpointExceptionHandler(environment); } /** @@ -71,7 +51,7 @@ public class CheckpointExceptionHandlerFactory { @Override public void tryHandleCheckpointException( CheckpointMetaData checkpointMetaData, - Exception exception) throws Exception { + Exception exception) { environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), exception); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 3927e46..e5f56b2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -183,10 +183,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private ExecutorService asyncOperationsThreadPool; /** Handler for exceptions during checkpointing in the stream task. Used in synchronous part of the checkpoint. */ - private CheckpointExceptionHandler synchronousCheckpointExceptionHandler; - - /** Wrapper for synchronousCheckpointExceptionHandler to deal with rethrown exceptions. Used in the async part. */ - private AsyncCheckpointExceptionHandler asynchronousCheckpointExceptionHandler; + private CheckpointExceptionHandler checkpointExceptionHandler; private final List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters; @@ -323,11 +320,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory(); - synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler( - getExecutionConfig().isFailTaskOnCheckpointError(), - getEnvironment()); - - asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this); + checkpointExceptionHandler = cpExceptionHandlerFactory + .createCheckpointExceptionHandler(getEnvironment()); stateBackend = createStateBackend(); checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID()); @@ -1062,9 +1056,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // We only report the exception for the original cause of fail and cleanup. // Otherwise this followup exception could race the original exception in failing the task. - owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException( - checkpointMetaData, - checkpointException); + try { + owner.checkpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, checkpointException); + } catch (Exception unhandled) { + AsynchronousException asyncException = new AsynchronousException(unhandled); + owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException); + } currentState = CheckpointingOperation.AsyncCheckpointState.DISCARDED; } else { @@ -1227,7 +1224,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // operation, and without the failure, the task would go back to normal execution. throw ex; } else { - owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex); + owner.checkpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex); } } } @@ -1252,36 +1249,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } - /** - * Wrapper for synchronous {@link CheckpointExceptionHandler}. This implementation catches unhandled, rethrown - * exceptions and reports them through {@link #handleAsyncException(String, Throwable)}. As this implementation - * always handles the exception in some way, it never rethrows. - */ - static final class AsyncCheckpointExceptionHandler implements CheckpointExceptionHandler { - - /** Owning stream task to which we report async exceptions. */ - final StreamTask<?, ?> owner; - - /** Synchronous exception handler to which we delegate. */ - final CheckpointExceptionHandler synchronousCheckpointExceptionHandler; - - AsyncCheckpointExceptionHandler(StreamTask<?, ?> owner) { - this.owner = Preconditions.checkNotNull(owner); - this.synchronousCheckpointExceptionHandler = - Preconditions.checkNotNull(owner.synchronousCheckpointExceptionHandler); - } - - @Override - public void tryHandleCheckpointException(CheckpointMetaData checkpointMetaData, Exception exception) { - try { - synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, exception); - } catch (Exception unhandled) { - AsynchronousException asyncException = new AsynchronousException(unhandled); - owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException); - } - } - } - @VisibleForTesting public static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters( StreamConfig configuration, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java index 17ab88f..33a3dc8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java @@ -18,17 +18,11 @@ package org.apache.flink.streaming.runtime.tasks; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.checkpoint.CheckpointFailureManager; +import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; -import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -40,82 +34,47 @@ import org.junit.Test; public class CheckpointExceptionHandlerConfigurationTest extends TestLogger { @Test - public void testConfigurationFailOnException() throws Exception { - testConfigForwarding(true); - } - - @Test - public void testConfigurationDeclineOnException() throws Exception { - testConfigForwarding(false); + public void testCheckpointConfigDefault() { + StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + CheckpointConfig checkpointConfig = streamExecutionEnvironment.getCheckpointConfig(); + Assert.assertTrue(checkpointConfig.isFailOnCheckpointingErrors()); + Assert.assertEquals(0, checkpointConfig.getTolerableCheckpointFailureNumber()); } @Test - public void testFailIsDefaultConfig() { - ExecutionConfig newExecutionConfig = new ExecutionConfig(); - Assert.assertTrue(newExecutionConfig.isFailTaskOnCheckpointError()); - } - - private void testConfigForwarding(boolean failOnException) throws Exception { - - final boolean expectedHandlerFlag = failOnException; - - final DummyEnvironment environment = new DummyEnvironment("test", 1, 0); - environment.setTaskStateManager(new TestTaskStateManager()); - environment.getExecutionConfig().setFailTaskOnCheckpointError(expectedHandlerFlag); - - final CheckpointExceptionHandlerFactory inspectingFactory = new CheckpointExceptionHandlerFactory() { - - @Override - public CheckpointExceptionHandler createCheckpointExceptionHandler( - boolean failTaskOnCheckpointException, - Environment environment) { - - Assert.assertEquals(expectedHandlerFlag, failTaskOnCheckpointException); - return super.createCheckpointExceptionHandler(failTaskOnCheckpointException, environment); - } - }; - - StreamTask streamTask = new StreamTask(environment, null) { - @Override - protected void init() throws Exception {} - - @Override - protected void performDefaultAction(ActionContext context) throws Exception { - context.allActionsCompleted(); - } + public void testSetCheckpointConfig() { + StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + CheckpointConfig checkpointConfig = streamExecutionEnvironment.getCheckpointConfig(); - @Override - protected void cleanup() throws Exception {} + // use deprecated API to set not fail on checkpoint errors + checkpointConfig.setFailOnCheckpointingErrors(false); + Assert.assertFalse(checkpointConfig.isFailOnCheckpointingErrors()); + Assert.assertEquals(CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER, checkpointConfig.getTolerableCheckpointFailureNumber()); - @Override - protected void cancelTask() throws Exception {} + // use new API to set tolerable declined checkpoint number + checkpointConfig.setTolerableCheckpointFailureNumber(5); + Assert.assertEquals(5, checkpointConfig.getTolerableCheckpointFailureNumber()); - @Override - protected CheckpointExceptionHandlerFactory createCheckpointExceptionHandlerFactory() { - return inspectingFactory; - } - }; - - streamTask.invoke(); + // after we configure the tolerable declined checkpoint number, deprecated API would not take effect + checkpointConfig.setFailOnCheckpointingErrors(true); + Assert.assertEquals(5, checkpointConfig.getTolerableCheckpointFailureNumber()); } @Test - public void testCheckpointConfigDefault() throws Exception { - StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); - Assert.assertTrue(streamExecutionEnvironment.getCheckpointConfig().isFailOnCheckpointingErrors()); + public void testPropagationFailFromCheckpointConfig() { + try { + doTestPropagationFromCheckpointConfig(true); + } catch (IllegalArgumentException ignored) { + // ignored + } } @Test - public void testPropagationFailFromCheckpointConfig() throws Exception { - doTestPropagationFromCheckpointConfig(true); - } - - @Test - public void testPropagationDeclineFromCheckpointConfig() throws Exception { + public void testPropagationDeclineFromCheckpointConfig() { doTestPropagationFromCheckpointConfig(false); } - public void doTestPropagationFromCheckpointConfig(boolean failTaskOnCheckpointErrors) throws Exception { + public void doTestPropagationFromCheckpointConfig(boolean failTaskOnCheckpointErrors) { StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); streamExecutionEnvironment.setParallelism(1); streamExecutionEnvironment.getCheckpointConfig().setCheckpointInterval(1000); @@ -123,7 +82,7 @@ public class CheckpointExceptionHandlerConfigurationTest extends TestLogger { streamExecutionEnvironment.addSource(new SourceFunction<Integer>() { @Override - public void run(SourceContext<Integer> ctx) throws Exception { + public void run(SourceContext<Integer> ctx) { } @Override @@ -131,13 +90,5 @@ public class CheckpointExceptionHandlerConfigurationTest extends TestLogger { } }).addSink(new DiscardingSink<>()); - - StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph(); - JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); - SerializedValue<ExecutionConfig> serializedExecutionConfig = jobGraph.getSerializedExecutionConfig(); - ExecutionConfig executionConfig = - serializedExecutionConfig.deserializeValue(Thread.currentThread().getContextClassLoader()); - - Assert.assertEquals(failTaskOnCheckpointErrors, executionConfig.isFailTaskOnCheckpointError()); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java index 2f58162..2632c01 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java @@ -31,30 +31,11 @@ import org.junit.Test; public class CheckpointExceptionHandlerTest extends TestLogger { @Test - public void testRethrowingHandler() { - DeclineDummyEnvironment environment = new DeclineDummyEnvironment(); - CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory(); - CheckpointExceptionHandler exceptionHandler = - checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, environment); - - CheckpointMetaData failedCheckpointMetaData = new CheckpointMetaData(42L, 4711L); - Exception testException = new Exception("test"); - try { - exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); - Assert.fail("Exception not rethrown."); - } catch (Exception e) { - Assert.assertEquals(testException, e); - } - - Assert.assertNull(environment.getLastDeclinedCheckpointCause()); - } - - @Test public void testDecliningHandler() { DeclineDummyEnvironment environment = new DeclineDummyEnvironment(); CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory(); CheckpointExceptionHandler exceptionHandler = - checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(false, environment); + checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(environment); CheckpointMetaData failedCheckpointMetaData = new CheckpointMetaData(42L, 4711L); Exception testException = new Exception("test"); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index efc2505..e2171b7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -310,16 +310,16 @@ public class StreamTaskTest extends TestLogger { } @Test - public void testFailingCheckpointStreamOperator() throws Exception { + public void testDecliningCheckpointStreamOperator() throws Exception { final long checkpointId = 42L; final long timestamp = 1L; TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar"); when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0); - Environment mockEnvironment = new MockEnvironmentBuilder().build(); + CheckpointExceptionHandlerTest.DeclineDummyEnvironment declineDummyEnvironment = new CheckpointExceptionHandlerTest.DeclineDummyEnvironment(); - StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment); + StreamTask<?, ?> streamTask = new EmptyStreamTask(declineDummyEnvironment); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); // mock the operators @@ -360,19 +360,11 @@ public class StreamTaskTest extends TestLogger { CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory(); CheckpointExceptionHandler checkpointExceptionHandler = - checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, mockEnvironment); - Whitebox.setInternalState(streamTask, "synchronousCheckpointExceptionHandler", checkpointExceptionHandler); - - StreamTask.AsyncCheckpointExceptionHandler asyncCheckpointExceptionHandler = - new StreamTask.AsyncCheckpointExceptionHandler(streamTask); - Whitebox.setInternalState(streamTask, "asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler); - - try { - streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false); - fail("Expected test exception here."); - } catch (Exception e) { - assertEquals(testException, e.getCause()); - } + checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(declineDummyEnvironment); + Whitebox.setInternalState(streamTask, "checkpointExceptionHandler", checkpointExceptionHandler); + + streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false); + assertEquals(testException, declineDummyEnvironment.getLastDeclinedCheckpointCause()); verify(operatorSnapshotResult1).cancel(); verify(operatorSnapshotResult2).cancel(); @@ -432,12 +424,8 @@ public class StreamTaskTest extends TestLogger { CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory(); CheckpointExceptionHandler checkpointExceptionHandler = - checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, mockEnvironment); - Whitebox.setInternalState(streamTask, "synchronousCheckpointExceptionHandler", checkpointExceptionHandler); - - StreamTask.AsyncCheckpointExceptionHandler asyncCheckpointExceptionHandler = - new StreamTask.AsyncCheckpointExceptionHandler(streamTask); - Whitebox.setInternalState(streamTask, "asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler); + checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(mockEnvironment); + Whitebox.setInternalState(streamTask, "checkpointExceptionHandler", checkpointExceptionHandler); mockEnvironment.setExpectedExternalFailureCause(Throwable.class); streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java index c02ff99..589b64c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java @@ -99,7 +99,6 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.RunnableFuture; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; @@ -122,27 +121,12 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { } @Test - public void testTaskFailingOnCheckpointErrorInSyncPart() throws Exception { - Throwable failureCause = runTestTaskFailingOnCheckpointError(new SyncFailureInducingStateBackend()); - assertNotNull(failureCause); - - String expectedMessageStart = "Could not perform checkpoint"; - assertEquals(expectedMessageStart, failureCause.getMessage().substring(0, expectedMessageStart.length())); - } - - @Test - public void testTaskFailingOnCheckpointErrorInAsyncPart() throws Exception { - Throwable failureCause = runTestTaskFailingOnCheckpointError(new AsyncFailureInducingStateBackend()); - assertEquals(AsynchronousException.class, failureCause.getClass()); - } - - @Test public void testBlockingNonInterruptibleCheckpoint() throws Exception { StateBackend lockingStateBackend = new BackendForTestStream(LockingOutputStream::new); Task task = - createTask(new TestOperator(), lockingStateBackend, mock(CheckpointResponder.class), true); + createTask(new TestOperator(), lockingStateBackend, mock(CheckpointResponder.class)); // start the task and wait until it is in "restore" task.startTaskThread(); @@ -162,7 +146,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { TestDeclinedCheckpointResponder checkpointResponder = new TestDeclinedCheckpointResponder(); Task task = - createTask(new FilterOperator(), backend, checkpointResponder, false); + createTask(new FilterOperator(), backend, checkpointResponder); // start the task and wait until it is in "restore" task.startTaskThread(); @@ -175,20 +159,6 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { task.getExecutingThread().join(); } - private Throwable runTestTaskFailingOnCheckpointError(AbstractStateBackend backend) throws Exception { - - Task task = - createTask(new FilterOperator(), backend, mock(CheckpointResponder.class), true); - - // start the task and wait until it is in "restore" - task.startTaskThread(); - - task.getExecutingThread().join(); - - assertEquals(ExecutionState.FAILED, task.getExecutionState()); - return task.getFailureCause(); - } - // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ @@ -196,8 +166,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { private static Task createTask( StreamOperator<?> op, StateBackend backend, - CheckpointResponder checkpointResponder, - boolean failOnCheckpointErrors) throws IOException { + CheckpointResponder checkpointResponder) throws IOException { Configuration taskConfig = new Configuration(); StreamConfig cfg = new StreamConfig(taskConfig); @@ -206,7 +175,6 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { cfg.setStateBackend(backend); ExecutionConfig executionConfig = new ExecutionConfig(); - executionConfig.setFailTaskOnCheckpointError(failOnCheckpointErrors); JobInformation jobInformation = new JobInformation( new JobID(),