[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668546#comment-16668546 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-434257204 @azagrebin OK, agree. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668547#comment-16668547 ] ASF GitHub Bot commented on FLINK-10074: yanghua closed pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 7d88f0d94ea..b091192f0a5 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -1354,6 +1354,8 @@ public void runBrokerFailureTest() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); env.enableCheckpointing(500); + env.getCheckpointConfig().setFailOnCheckpointingErrors(false); + env.getCheckpointConfig().setTolerableFailureNumber(Integer.MAX_VALUE); env.setRestartStrategy(RestartStrategies.noRestart()); env.getConfig().disableSysoutLogging(); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java index 8c0d7665fed..23507bfbdf8 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java @@ -57,6 +57,8 @@ public static void generateRandomizedIntegerSequence( env.setParallelism(numPartitions); env.getConfig().disableSysoutLogging(); env.setRestartStrategy(RestartStrategies.noRestart()); + env.getCheckpointConfig().setFailOnCheckpointingErrors(false); + env.getCheckpointConfig().setTolerableFailureNumber(Integer.MAX_VALUE); DataStream stream = env.addSource( new RichParallelSourceFunction() { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 6b7caaac6ec..719560ef860 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -883,16 +883,6 @@ public boolean isFailTaskOnCheckpointError() { return failTaskOnCheckpointError; } - /** -* This method is visible because of the way the configuration is currently forwarded from the checkpoint config to -* the task. This should not be called by the user, please use CheckpointConfig.setFailOnCheckpointingErrors(...) -* instead. -*/ - @Internal - public void setFailTaskOnCheckpointError(boolean failTaskOnCheckpointError) { - this.failTaskOnCheckpointError = failTaskOnCheckpointError; - } - @Override public boolean equals(Object obj) { if (obj instanceof ExecutionConfig) { diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index 179cf9c6de8..f4714c2cce6 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -100,6 +100,8 @@ public void testJobManagerJMXMetricAccess() throws Exception { 50, 5, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + 0, true),
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668543#comment-16668543 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-434255898 I created FLINK-10724 to refactor failure handling in checkpoint coordinator where I believe we should firstly prepare it for better integration with the error counter we have in this PR. Once we resolve FLINK-10724, we can reopen this PR or open another one and rebase the error counter on the result of FLINK-10724 which should be simpler then. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668520#comment-16668520 ] Andrey Zagrebin commented on FLINK-10074: - I created an issue FLINK-10074 to revisit failure handling in check point coordinator to prepare it for better integration with the error counter of this issue. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668373#comment-16668373 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-434220095 Hi @azagrebin I am currently busy with several other PRs and hope it will be merged into Flink 1.7.0. When those PRs are fixed, I will come back and reorganize it. I think some logic of this PR is available, but the counting logic may need to be refactored. Can we put it aside for some time? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668367#comment-16668367 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-434217958 Hi @yanghua, can this PR be closed for now and we come back to it when we have design in the Jira issue? We might also want to do some refactoring of failure handling in checkpoint coordinator to prepare it for integrating the error counter from this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16643078#comment-16643078 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-428137039 Hi @yanghua, In general, `executionGraph.failGlobal` looks good to me to fail, but I think the `CheckpointFailureManager` should be constructed with a callback which will fail job. It means that `JobMaster` or `ExecutionGraphBuilder` should decide how to fail. We also have to make that checkpointing in Task Executor does not fail the job anymore, only the `CheckpointFailureManager` as a central point of failure. Another point is that inserting failure callback into different places in `CheckpointCoordinator` close to cause looks rather invasive. Ideally, there should be 2 places where `CheckpointFailureManager` should get callback: - in `triggerCheckpoint(long timestamp, boolean isPeriodic)` checking `CheckpointTriggerResult` (sync part) - similar to `PendingCheckpointStats` getting callback in `PendingCheckpoint.reportFailedCheckpoint` (async part) but with clear defined cause like `CheckpointDeclineReason` for sync part. `PendingCheckpoint` does not have such clearly defined failure handling as `triggerCheckpoint` and might need some refactoring to distinguish failure cases. I suggest we firstly describe the full approach in Jira issue where we also include a list of all possible failures which `CheckpointFailureManager` needs to react upon and how we define them in code. It can be also a link to some design doc. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642907#comment-16642907 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r223586404 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The checkpoint failure manager to manage how to process checkpoint failure. + */ +public class CheckpointFailureManager { + + private final boolean failOnCheckpointingErrors; + private final int tolerableCpFailureNumber; + private final AtomicInteger continuousFailureCounter; + private final ExecutionGraph executionGraph; + private final Object lock = new Object(); + + public CheckpointFailureManager( + boolean failOnCheckpointingErrors, + int tolerableCpFailureNumber, + ExecutionGraph executionGraph) { + this.failOnCheckpointingErrors = failOnCheckpointingErrors; + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + this.executionGraph = checkNotNull(executionGraph); + } + + @VisibleForTesting + public AtomicInteger getContinuousFailureCounter() { + return continuousFailureCounter; + } + + public void resetCounter() { + continuousFailureCounter.set(0); + } + + public void tryHandleFailure(String reason, long checkpointId) { + synchronized (lock) { + if (failOnCheckpointingErrors || + continuousFailureCounter.incrementAndGet() > tolerableCpFailureNumber) { + executionGraph.failGlobal(new Throwable(reason)); Review comment: hi @azagrebin and @tillrohrmann , Can you take a general look at my PR implementation and comment on how to trigger a job failure when the failure condition is met? My current implementation seems to be a bit problematic (cyclic dependency), which caused a lot of test failures. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625665#comment-16625665 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-423942772 > Do we need to take this refactoring into account? Because this PR is actually a supplement to the checkpoint exception handler. Yes, I think we can have the same semantics as in the previous only-TM implementation (`setFailOnCheckpointingErrors` means `tolerableFailures == 0`) but now everything should happen in `CheckpointCoordinator`. That is why I also mentioned this point: > Consider having only DecliningCheckpointExceptionHandler on TaskExecutor side and letting now to handle all failure cases only in CheckpointCoordinator This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625650#comment-16625650 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-423940208 @azagrebin thanks for your suggestion, I agree and reconsider more details. Considering that @tillrohrmann has said that the current checkpoint exception handler should also be implemented in `CheckpointCoordinator`. > I understand why you implemented it the way you did. I think the `setFailOnCheckpointingErrors` should actually also be handled by the `CheckpointCoordinator`/`JM` and not on the Task level (but this is a different story). This looks a little bit like a shortcut we made back in the days. Do we need to take this refactoring into account? Because this PR is actually a supplement to the checkpoint exception handler. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623777#comment-16623777 ] ASF GitHub Bot commented on FLINK-10074: azagrebin edited a comment on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-423573937 Thanks for the update, @yanghua. Looking at the checkpoint coordinator more deeply, I think we firstly have to work a bit more on design for this kind of change. We have to take into account at least the following points (roughly): - Introduce the separate component/class responsible for failure management, counting e.g. `CheckpointFailureManager` or something - Job manager should - construct `CheckpointFailureManager` - configure with the max failure count and a proper action how to fail. - pass it to `CheckpointCoordinator` - `CheckpointCoordinator` - should give callbacks to `CheckpointFailureManager` about failures and successes of checkpoints - needs some refactoring to distinguish better failures of `PendingCheckpoint`. Now it is `abortXXX()` methods which do not provide enough information to count it as a failure for `CheckpointFailureManager` or not (like we have for `triggerSavepoint` in `CheckpointDeclineReason`). At the end there should be clear place in the `CheckpointCoordinator` where to give callbacks to `CheckpointFailureManager`, e.g.: - `CheckpointDeclineReason.EXCEPTION` result of `triggerSavepoint` - some cases of `PendingCheckpoint.abortDeclined()`, `abortError()`, maybe `abortExpired()` - Consider having only `DecliningCheckpointExceptionHandler` on `TaskExecutor` side and letting now to handle all failure cases only in `CheckpointCoordinator` There might be more points. I suggest we step back and continue discussion in the jira issue. Once we have clear design, a PR can be opened again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623775#comment-16623775 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-423573937 Thanks for the update, @yanghua. Looking at the checkpoint coordinator more deeply, I think we firstly have to work a bit more on design for this kind of change. We have to take into account at least the following points (roughly): - Introduce the separate component/class responsible for failure management, counting e.g. `CheckpointFailureManager` or something - Job manager should - construct `CheckpointFailureManager` - configure with the max failure count and a proper action how to fail. - pass it to `CheckpointCoordinator` - `CheckpointCoordinator` - should give callbacks to `CheckpointFailureManager` about failures and successes of checkpoints - needs some refactoring to distinguish better failures of `PendingCheckpoint`. Now it is `abortXXX()` methods which do not provide enough information to count it as a failure for `CheckpointFailureManager` or not (like we have for `triggerSavepoint` in `CheckpointDeclineReason`). At the end there should be clear place in the `CheckpointCoordinator` where to give callbacks to `CheckpointFailureManager`, e.g.: - `CheckpointDeclineReason.EXCEPTION` result of `triggerSavepoint` - some cases of `PendingCheckpoint.abortDeclined()`, `abortError()`, maybe `abortExpired()` - Consider having only `DecliningCheckpointExceptionHandler` on `TaskExecutor` side and letting now to handle all failure cases only in `CheckpointCoordinator` There might be more points. I suggest we step back and continue discussion in the jira issue. Once we have clear design, a PR can be opened again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623253#comment-16623253 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-423453821 hi @tillrohrmann what do you think about the latest implementation? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16619225#comment-16619225 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-422427072 hi @tillrohrmann I have refactored this PR and counted the failure number in the `CheckpointCoordinator`. I think I should push the implementation to let you estimate. I have tested the counter's run path, but I don't know if it is the [right way](https://github.com/apache/flink/pull/6567/files#diff-a38ea0fa799bdaa0b354d80cd8368c60R1010) of failing the `ExecutionGraph` . And maybe the test case I added have too much assert, I will reduce it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618793#comment-16618793 ] ASF GitHub Bot commented on FLINK-10074: tillrohrmann commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-422329293 A failure of a global checkpoint should only increment the failure count by one independent of the number of failed subtasks. Thus, I would hope that one does not need to set a different threshold for the two different cases you described @tweise. However, it is correct that the price the user pays is that a consistent problem will only be detected after a longer delay. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617382#comment-16617382 ] ASF GitHub Bot commented on FLINK-10074: tweise commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-421971822 @yanghua @tillrohrmann I think a user would normally expect this count to apply globally, but please also consider the case of an intermittent failure (like S3 rate limit or storage backend unavailable for other reason). In a large job that would cause potentially many subtasks to fail in parallel. While this could be addressed by setting a corresponding very high threshold, it would in turn mean a problem that is isolated to a single task would not hit the threshold until much much later, leaving the job in flipflop status instead of failing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617305#comment-16617305 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-421948370 @tillrohrmann I originally wanted to do it in the checkpoint coordinator when I first implemented it, but when I touched setFailOnCheckpointingErrors, I was guided and turned to the TM. I have also thought about this issue recently and it seems that it is more reasonable to implement on the checkpoint coordinator. Well, I will implement it again based on the checkpoint coordinator. To @klion26 thanks for your opinion, will change the implementation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617290#comment-16617290 ] ASF GitHub Bot commented on FLINK-10074: tillrohrmann commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-421942140 I understand why you implemented it the way you did. I think the `setFailOnCheckpointingErrors` should actually also be handled by the `CheckpointCoordinator`/`JM` and not on the Task level (but this is a different story). This looks a little bit like a shortcut we made back in the days. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617269#comment-16617269 ] ASF GitHub Bot commented on FLINK-10074: klion26 commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-421937722 Thanks for opening this PR @yanghua , as a user, I like the idea fail in JM other than TM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617235#comment-16617235 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-421931626 @tweise Any comments or opinions? The current count implementation is for a single sub task instance, not the job level. Maybe @tillrohrmann 's idea is more reasonable? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614541#comment-16614541 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-421282760 Hi @tillrohrmann , Currently it is applied to the sub task instance (TM) instead of the checkpoint coordinator (JM) as a complement to the setFailOnCheckpointingErrors method, since setFailOnCheckpointingErrors only provides the user with yes or no choices. But it sounds reasonable to apply the count on the Job globally. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16608886#comment-16608886 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r216242682 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -875,6 +876,9 @@ public void run() { localTaskOperatorSubtaskStates, asyncDurationMillis); + if (owner.asynchronousCheckpointExceptionHandler != null) { Review comment: ok, I agree that it would need more refactoring to remove this null check. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606874#comment-16606874 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r215895305 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -875,6 +876,9 @@ public void run() { localTaskOperatorSubtaskStates, asyncDurationMillis); + if (owner.asynchronousCheckpointExceptionHandler != null) { Review comment: I looked at the `StreamTask`, the initialization logic of the `asynchronousCheckpointExceptionHandler` is in the `invoke` method and before the call to the concrete `init` method. The `invoke` method has been marked as `final`. So, I personally feel that we don't have to make this thing too complicated, just to reduce a null judgment. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606872#comment-16606872 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r215895305 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -875,6 +876,9 @@ public void run() { localTaskOperatorSubtaskStates, asyncDurationMillis); + if (owner.asynchronousCheckpointExceptionHandler != null) { Review comment: I looked at the StreamTask, the initialization logic of the asynchronousCheckpointExceptionHandler is in the invoke method and before the call to the concrete init method. The invoke method has been marked as final. So, I personally feel that we don't have to make this thing too complicated, just to reduce a null judgment. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606849#comment-16606849 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r215887709 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -875,6 +876,9 @@ public void run() { localTaskOperatorSubtaskStates, asyncDurationMillis); + if (owner.asynchronousCheckpointExceptionHandler != null) { Review comment: I see, I would rather initialise it in `NoOpStreamTask` to avoid `null` but it would need then more refactoring. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605923#comment-16605923 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-419137130 @azagrebin thanks for your suggestion, refactored this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605920#comment-16605920 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r215670211 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -875,6 +876,9 @@ public void run() { localTaskOperatorSubtaskStates, asyncDurationMillis); + if (owner.asynchronousCheckpointExceptionHandler != null) { Review comment: @azagrebin In the normal runtime it will not be null, plus this judgment is mainly for the checkpoint test code (it does not perform the initialization logic for the asynchronousCheckpointExceptionHandler), if not added then the test code will report to the NPE, so in the second commit, I added this judgment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605727#comment-16605727 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r215586717 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java ## @@ -75,5 +101,10 @@ public void tryHandleCheckpointException( environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), exception); } + + @Override + public void checkpointSucceeded() { Review comment: the empty implementation could be already the default one in the interface itself. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605726#comment-16605726 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r215586486 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java ## @@ -48,12 +51,35 @@ public CheckpointExceptionHandler createCheckpointExceptionHandler( */ static final class FailingCheckpointExceptionHandler implements CheckpointExceptionHandler { + private final int tolerableNumber; + private AtomicInteger cpFailureCounter; + + FailingCheckpointExceptionHandler(int tolerableNumber) { + this.cpFailureCounter = new AtomicInteger(0); + this.tolerableNumber = tolerableNumber; + } + @Override public void tryHandleCheckpointException( CheckpointMetaData checkpointMetaData, Exception exception) throws Exception { - throw exception; + if (needThrowCheckpointException()) { Review comment: I think at this point the function `tryHandleCheckpointException` is too simple to break it down and add `needThrowCheckpointException`. The logic could be all in here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605725#comment-16605725 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r215588426 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -875,6 +876,9 @@ public void run() { localTaskOperatorSubtaskStates, asyncDurationMillis); + if (owner.asynchronousCheckpointExceptionHandler != null) { Review comment: do you think it can be null? it is initialised in the beginning of the Task `invoke` and then used also in catch Exception: `handleExecutionException(e)` without any check. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605728#comment-16605728 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r215586087 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java ## @@ -48,12 +51,35 @@ public CheckpointExceptionHandler createCheckpointExceptionHandler( */ static final class FailingCheckpointExceptionHandler implements CheckpointExceptionHandler { + private final int tolerableNumber; + private AtomicInteger cpFailureCounter; Review comment: `cpFailureCounter` can be also final This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605705#comment-16605705 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-419073463 @tillrohrmann and @zentol if you have time, can you have a look at this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602793#comment-16602793 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-418295546 @azagrebin thanks for your suggestion, I have refactored this PR, please review again~ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598929#comment-16598929 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r214384046 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java ## @@ -75,6 +79,19 @@ public CheckpointExceptionHandler createCheckpointExceptionHandler( } }; + if (failOnException) { + CheckpointExceptionHandler exceptionHandler = + inspectingFactory.createCheckpointExceptionHandler(failOnException, environment); + Assert.assertTrue( + exceptionHandler instanceof CheckpointExceptionHandlerFactory.FailingCheckpointExceptionHandler + ); + + CheckpointExceptionHandlerFactory.FailingCheckpointExceptionHandler actuallyHandler = + (CheckpointExceptionHandlerFactory.FailingCheckpointExceptionHandler) exceptionHandler; + + Assert.assertEquals(3, actuallyHandler.tolerableNumber); Review comment: I would rather make fields of `FailingCheckpointExceptionHandler` private and check `tolerableNumber` the same way as `failTaskOnCheckpointException` in previous declaration of `CheckpointExceptionHandlerFactory` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598930#comment-16598930 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r214384222 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java ## @@ -63,6 +63,10 @@ private void testConfigForwarding(boolean failOnException) throws Exception { environment.setTaskStateManager(new TestTaskStateManager()); environment.getExecutionConfig().setFailTaskOnCheckpointError(expectedHandlerFlag); + if (failOnException) { + environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3); Review comment: this can be always set, as it should be just ignored if `failOnException` is `false` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598928#comment-16598928 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r214379864 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java ## @@ -37,7 +37,7 @@ public CheckpointExceptionHandler createCheckpointExceptionHandler( Environment environment) { if (failTaskOnCheckpointException) { - return new FailingCheckpointExceptionHandler(); + return new FailingCheckpointExceptionHandler(environment); Review comment: I think we do not really need `environment` in the constructor of `FailingCheckpointExceptionHandler`, but only `tolerableNumber` which should be passed in `createCheckpointExceptionHandler`, the same way as `failTaskOnCheckpointException`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598931#comment-16598931 ] ASF GitHub Bot commented on FLINK-10074: azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r214398085 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java ## @@ -48,12 +48,41 @@ public CheckpointExceptionHandler createCheckpointExceptionHandler( */ static final class FailingCheckpointExceptionHandler implements CheckpointExceptionHandler { + final Environment environment; + final int tolerableNumber; + long latestFailedCheckpointID; + int cpFailureCounter; + + FailingCheckpointExceptionHandler(Environment environment) { + this.environment = environment; + this.cpFailureCounter = 0; + this.tolerableNumber = environment.getExecutionConfig().getTaskTolerableCheckpointFailuresNumber(); + } + @Override public void tryHandleCheckpointException( CheckpointMetaData checkpointMetaData, Exception exception) throws Exception { - throw exception; + if (needThrowCheckpointException(checkpointMetaData)) { + throw exception; + } + } + + private boolean needThrowCheckpointException(CheckpointMetaData checkpointMetaData) { + if (tolerableNumber == 0) { + return true; + } + + if (checkpointMetaData.getCheckpointId() - latestFailedCheckpointID == 1) { Review comment: I think rather than relying on sequential numbering of checkpoints, it is better we add one more signal: `CheckpointExceptionHandler.checkpointSucceeded()` where the counter is reset. This method can be called in `AsyncCheckpointRunnable.run()`, e.g. after `reportCompletedSnapshotStates` is done: ``` owner.asynchronousCheckpointExceptionHandler.checkpointSucceeded(); // forward it to synchronousCheckpointExceptionHandler inside ``` The checkpoints finish concurrently, so I think we have to use an `AtomicInteger` for the `cpFailureCounter` and `cpFailureCounter.incrementAndGet()`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594625#comment-16594625 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r213203249 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java ## @@ -49,6 +49,59 @@ public void testRethrowingHandler() { Assert.assertNull(environment.getLastDeclinedCheckpointCause()); } + @Test + public void testRethrowingHandlerWithTolerableNumberTriggered() { + DeclineDummyEnvironment environment = new DeclineDummyEnvironment(); + environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3); + CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory(); + CheckpointExceptionHandler exceptionHandler = + checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, environment); + + CheckpointMetaData failedCheckpointMetaData = new CheckpointMetaData(42L, 4711L); + Exception testException = new Exception("test"); + try { + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(43L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(44L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(45L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + + Assert.fail("Exception not rethrown."); + } catch (Exception e) { + Assert.assertEquals(testException, e); + } + + Assert.assertNull(environment.getLastDeclinedCheckpointCause()); + } + + @Test + public void testRethrowingHandlerWithTolerableNumberNotTriggered() { + DeclineDummyEnvironment environment = new DeclineDummyEnvironment(); + environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3); + CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory(); + CheckpointExceptionHandler exceptionHandler = + checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, environment); + + CheckpointMetaData failedCheckpointMetaData = new CheckpointMetaData(42L, 4711L); + Exception testException = new Exception("test"); + + try { + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(43L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(44L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(46L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + } catch (Exception e) { + Assert.assertNotEquals(testException, e); Review comment: Yes, we can just throw it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594624#comment-16594624 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r213202843 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java ## @@ -49,6 +49,59 @@ public void testRethrowingHandler() { Assert.assertNull(environment.getLastDeclinedCheckpointCause()); } + @Test + public void testRethrowingHandlerWithTolerableNumberTriggered() { + DeclineDummyEnvironment environment = new DeclineDummyEnvironment(); + environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3); + CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory(); + CheckpointExceptionHandler exceptionHandler = + checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, environment); + + CheckpointMetaData failedCheckpointMetaData = new CheckpointMetaData(42L, 4711L); + Exception testException = new Exception("test"); + try { + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(43L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(44L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(45L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); Review comment: here, This line will throw an exception, then it will enter the catch block and verify that the exception is what we expected. If no exception is thrown to this line, then Assert.fail() will be fired, indicating that the actual exception is not thrown and the test will fail. Of course, as you said, this verification does not prove that the exception we need is thrown in this line. Maybe I can try and check each checkpoint independently. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594502#comment-16594502 ] ASF GitHub Bot commented on FLINK-10074: tweise commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r213180551 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java ## @@ -49,6 +49,59 @@ public void testRethrowingHandler() { Assert.assertNull(environment.getLastDeclinedCheckpointCause()); } + @Test + public void testRethrowingHandlerWithTolerableNumberTriggered() { + DeclineDummyEnvironment environment = new DeclineDummyEnvironment(); + environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3); + CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory(); + CheckpointExceptionHandler exceptionHandler = + checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, environment); + + CheckpointMetaData failedCheckpointMetaData = new CheckpointMetaData(42L, 4711L); + Exception testException = new Exception("test"); + try { + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(43L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(44L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(45L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + + Assert.fail("Exception not rethrown."); + } catch (Exception e) { + Assert.assertEquals(testException, e); + } + + Assert.assertNull(environment.getLastDeclinedCheckpointCause()); + } + + @Test + public void testRethrowingHandlerWithTolerableNumberNotTriggered() { + DeclineDummyEnvironment environment = new DeclineDummyEnvironment(); + environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3); + CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory(); + CheckpointExceptionHandler exceptionHandler = + checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, environment); + + CheckpointMetaData failedCheckpointMetaData = new CheckpointMetaData(42L, 4711L); + Exception testException = new Exception("test"); + + try { + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(43L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(44L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(46L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + } catch (Exception e) { + Assert.assertNotEquals(testException, e); Review comment: Shouldn't this fail since we don't expect an exception? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594497#comment-16594497 ] ASF GitHub Bot commented on FLINK-10074: tweise commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#discussion_r213179903 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java ## @@ -49,6 +49,59 @@ public void testRethrowingHandler() { Assert.assertNull(environment.getLastDeclinedCheckpointCause()); } + @Test + public void testRethrowingHandlerWithTolerableNumberTriggered() { + DeclineDummyEnvironment environment = new DeclineDummyEnvironment(); + environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3); + CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory(); + CheckpointExceptionHandler exceptionHandler = + checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, environment); + + CheckpointMetaData failedCheckpointMetaData = new CheckpointMetaData(42L, 4711L); + Exception testException = new Exception("test"); + try { + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(43L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(44L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); + failedCheckpointMetaData = new CheckpointMetaData(45L, 4711L); + exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, testException); Review comment: Shouldn't it verify that this attempt generated the exception and not previous? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589739#comment-16589739 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-415303507 @tillrohrmann When you have free time, please review this PR. thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582620#comment-16582620 ] ASF GitHub Bot commented on FLINK-10074: tillrohrmann commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-413571184 Thanks for opening this PR @yanghua. I'll try to give it a review in the next days. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582279#comment-16582279 ] ASF GitHub Bot commented on FLINK-10074: yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567#issuecomment-413486884 cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582251#comment-16582251 ] ASF GitHub Bot commented on FLINK-10074: yanghua opened a new pull request #6567: [FLINK-10074] Allowable number of checkpoint failures URL: https://github.com/apache/flink/pull/6567 ## What is the purpose of the change *This pull request allows number of checkpoint failures* ## Brief change log - *Add a new API `CheckpointConfig#setTolerableCheckpointFailuresNumber`* - *Add a new API `CheckpointConfig#getTolerableCheckpointFailuresNumber`* - *Changed `FailingCheckpointExceptionHandler#tryHandleCheckpointException`* ## Verifying this change This change added tests and can be verified as follows: - *CheckpointExceptionHandlerTest#testRethrowingHandlerWithTolerableNumberTriggered* - *CheckpointExceptionHandlerTest#testRethrowingHandlerWithTolerableNumberNotTriggered* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581211#comment-16581211 ] Till Rohrmann commented on FLINK-10074: --- A best effort strategy sounds good to me. I think you could start implementing this feature [~yanghua]. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580750#comment-16580750 ] vinoyang commented on FLINK-10074: -- [~thw] OK, That's fine, then I will be ready to start dealing with this issue. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580699#comment-16580699 ] Thomas Weise commented on FLINK-10074: -- [~yanghua] this is what I meant also, I updated my comment for clarity. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580687#comment-16580687 ] vinoyang commented on FLINK-10074: -- [~thw] I personally prefer that once JM failover, the counter will reset. I don't think it's necessary to introduce too much complexity for this. If we need to maintain a global counter across JM processes, we will use third-party components such as zookeeper. I think it is appropriate to maintain this counter for the life of a JM process. Once JM failover, the Job will be restored (re-deployed, run), and it is reasonable to reset the counter for a new runtime environment. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580624#comment-16580624 ] Thomas Weise commented on FLINK-10074: -- [~till.rohrmann] it is probably unreasonably hard to do for the JM failure case, but how about making it correct with best effort? Only when the JM fails, we would not retain the count, which is a very small probability. The majority of failures are transient issues in TMs where subtasks just get redeployed, much fewer cases TM machine failures. For all these we could have a more accurate behavior by retaining the count and failing right away on the next checkpoint failure. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579555#comment-16579555 ] Till Rohrmann commented on FLINK-10074: --- Since {{setFailOnCheckpointingErrors}} is public, we cannot simply change its signature. What we could do though, is to add another method {{setNumberTolerableCheckpointFailures(int)}} which is set by default to {{0}} and is only respected if {{setFailOnCheckpointingErrors}} is set to {{true}}. So if the the user on calls {{setFailOnCheckpointingErrors(true)}} then he will get the same old behaviour. Only after calling {{setNumberTolerableCheckpointFailures(10)}}, it will wait for 10 checkpoint failures before failing. If {{setNumberTolerableCheckpointFailures}} is set but {{setFailOnCheckpointingErrors(false)}}, then checkpoint failures won't fail the job. [~thw] would you not reset the counter in case of a restart? This would be hard to do in case of a JobManager failover and lead to different behaviours depending on the actual fault. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579327#comment-16579327 ] vinoyang commented on FLINK-10074: -- [~thw] The setFailOnCheckpointingErrors API is currently not marked as @Public, but is also a public API, perhaps we should keep it for compatibility, but this does confuse what the current issue needs to do. The setFailOnCheckpointingErrors method provides two states, true or false. This issue provides three states that are sufficient to override the functionality of the setFailOnCheckpointingErrors method. How to decide? [~till.rohrmann]. The second question, you are right. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579269#comment-16579269 ] Thomas Weise commented on FLINK-10074: -- I think configuring the behavior as a count of allowable consecutive failures would work well. Would this replace the existing setFailOnCheckpointingErrors (will that setting become irrelevant when the user already sets the count)? [https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html] Regarding what happens once the job was allowed to fail and recovers only to fail again: Shouldn't the counter only be reset after the next successful checkpoint vs. on restart? > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578001#comment-16578001 ] Till Rohrmann commented on FLINK-10074: --- I would be ok with defining the maximum number of consecutively failed checkpoints. Would that also work for you [~thw]? > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576588#comment-16576588 ] vinoyang commented on FLINK-10074: -- [~trohrm...@apache.org] need more discussion? if no, I will start this work. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575038#comment-16575038 ] vinoyang commented on FLINK-10074: -- [~till.rohrmann] yes, I agree with you. If we focus on time, it will become more complicated for users, because there are multiple time-related configurations that need to understand some details. And if we focus on the number of times, it will be more user friendly, as if the maximum number of timeouts and failures. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575001#comment-16575001 ] Till Rohrmann commented on FLINK-10074: --- I like the idea of having a maximum time between successful checkpoints. However, a time constraint might be difficult to configure for a user since the actual checkpoint interval could be larger than the configured checkpoint interval if the user also specified the minimum time between checkpoints > 0. So I'm wondering whether number of allowed checkpoint failures would be easier to understand. What do you think? > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571890#comment-16571890 ] vinoyang commented on FLINK-10074: -- Hi [~thw] and [~till.rohrmann] : I think it depends on our purpose. I personally think that we mainly want to prevent a job from completing checkpoint for a long time, because in this case, even if we let the job continue to run, once the job fails for other reasons, after the recovery It will generate long-term rollback consumption. Here "long time" should reflect a kind of coherence, so I personally think that we can make a user to set a threshold to specify how many consecutive failures, we let this job fail to recover. In the case that the threshold is not reached, we can clear the counter once the checkpoint can be successfully completed. But there is one more thing worth thinking about. If this happens, even if we let the Job fail to restart, it still can't do checkpoint, then what should we do. The scenario we assume here is that we chose HDFS as the statebackend, but it failed and could not be used in a short time. Of course, we can also choose the longest time between two successful checkpoints. Because the checkpoint period is usually fixed, and if the checkpoint always fails for one reason, then the time interval for its failure is almost equal, so it can be approximated as another expression of how many checkpoints fail consecutively. And here also reflects "coherence." But from an implementation perspective, it does lead to two different implementations. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571889#comment-16571889 ] vinoyang commented on FLINK-10074: -- Hi [~thw] and [~till.rohrmann] : I think it depends on our purpose. I personally think that we mainly want to prevent a job from completing checkpoint for a long time, because in this case, even if we let the job continue to run, once the job fails for other reasons, after the recovery It will generate long-term rollback consumption. Here "long time" should reflect a kind of coherence, so I personally think that we can make a user to set a threshold to specify how many consecutive failures, we let this job fail to recover. In the case that the threshold is not reached, we can clear the counter once the checkpoint can be successfully completed. But there is one more thing worth thinking about. If this happens, even if we let the Job fail to restart, it still can't do checkpoint, then what should we do. The scenario we assume here is that we chose HDFS as the statebackend, but it failed and could not be used in a short time. Of course, we can also choose the longest time between two successful checkpoints. Because the checkpoint period is usually fixed, and if the checkpoint always fails for one reason, then the time interval for its failure is almost equal, so it can be approximated as another expression of how many checkpoints fail consecutively. And here also reflects "coherence." But from an implementation perspective, it does lead to two different implementations. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571634#comment-16571634 ] Thomas Weise commented on FLINK-10074: -- It could potentially also be specified as maximum time between successful checkpoints. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571618#comment-16571618 ] Till Rohrmann commented on FLINK-10074: --- [~yanghua] what is the scope of the initial implementation and how will you implement it? Will it be a fixed number of failures or a failure rate which triggers a job failure? > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16570445#comment-16570445 ] Thomas Weise commented on FLINK-10074: -- [https://lists.apache.org/thread.html/0f887aee4b6e15965a540969d0720dd3c88cb94848f379270f8fb467@%3Cdev.flink.apache.org%3E] > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)