[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668546#comment-16668546
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-434257204
 
 
   @azagrebin OK, agree.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668547#comment-16668547
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua closed pull request #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 7d88f0d94ea..b091192f0a5 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1354,6 +1354,8 @@ public void runBrokerFailureTest() throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.enableCheckpointing(500);
+   env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
+   
env.getCheckpointConfig().setTolerableFailureNumber(Integer.MAX_VALUE);
env.setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
 
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 8c0d7665fed..23507bfbdf8 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -57,6 +57,8 @@ public static void generateRandomizedIntegerSequence(
env.setParallelism(numPartitions);
env.getConfig().disableSysoutLogging();
env.setRestartStrategy(RestartStrategies.noRestart());
+   env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
+   
env.getCheckpointConfig().setTolerableFailureNumber(Integer.MAX_VALUE);
 
DataStream stream = env.addSource(
new RichParallelSourceFunction() {
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 6b7caaac6ec..719560ef860 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -883,16 +883,6 @@ public boolean isFailTaskOnCheckpointError() {
return failTaskOnCheckpointError;
}
 
-   /**
-* This method is visible because of the way the configuration is 
currently forwarded from the checkpoint config to
-* the task. This should not be called by the user, please use 
CheckpointConfig.setFailOnCheckpointingErrors(...)
-* instead.
-*/
-   @Internal
-   public void setFailTaskOnCheckpointError(boolean 
failTaskOnCheckpointError) {
-   this.failTaskOnCheckpointError = failTaskOnCheckpointError;
-   }
-
@Override
public boolean equals(Object obj) {
if (obj instanceof ExecutionConfig) {
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 179cf9c6de8..f4714c2cce6 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -100,6 +100,8 @@ public void testJobManagerJMXMetricAccess() throws 
Exception {
50,
5,

CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+   true,
+   0,
true),

[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668543#comment-16668543
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on issue #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-434255898
 
 
   I created FLINK-10724 to refactor failure handling in checkpoint coordinator 
where I believe we should firstly prepare it for better integration with the 
error counter we have in this PR. Once we resolve FLINK-10724, we can reopen 
this PR or open another one and rebase the error counter on the result of 
FLINK-10724 which should be simpler then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-10-30 Thread Andrey Zagrebin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668520#comment-16668520
 ] 

Andrey Zagrebin commented on FLINK-10074:
-

I created an issue FLINK-10074 to revisit failure handling in check point 
coordinator to prepare it for better integration with the error counter of this 
issue.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668373#comment-16668373
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-434220095
 
 
   Hi @azagrebin  I am currently busy with several other PRs and hope it will 
be merged into Flink 1.7.0. When those PRs are fixed, I will come back and 
reorganize it. I think some logic of this PR is available, but the counting 
logic may need to be refactored. Can we put it aside for some time?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-10-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668367#comment-16668367
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on issue #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-434217958
 
 
   Hi @yanghua,

   can this PR be closed for now and we come back to it when we have design in 
the Jira issue? 
   
   We might also want to do some refactoring of failure handling in checkpoint 
coordinator to prepare it for integrating the error counter from this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16643078#comment-16643078
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on issue #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-428137039
 
 
   Hi @yanghua,
   
   In general, `executionGraph.failGlobal` looks good to me to fail, but I 
think the `CheckpointFailureManager` should be constructed with a callback 
which will fail job. It means that `JobMaster` or `ExecutionGraphBuilder` 
should decide how to fail. We also have to make that checkpointing in Task 
Executor does not fail the job anymore, only the `CheckpointFailureManager` as 
a central point of failure.
   
   Another point is that inserting failure callback into different places in 
`CheckpointCoordinator` close to cause looks rather invasive. Ideally, there 
should be 2 places where `CheckpointFailureManager` should get callback:
   - in `triggerCheckpoint(long timestamp, boolean isPeriodic)` checking 
`CheckpointTriggerResult` (sync part)
   - similar to `PendingCheckpointStats` getting callback in 
`PendingCheckpoint.reportFailedCheckpoint` (async part) but with clear defined 
cause like `CheckpointDeclineReason` for sync part.
   
   `PendingCheckpoint` does not have such clearly defined failure handling as 
`triggerCheckpoint` and might need some refactoring to distinguish failure 
cases. 
   
   I suggest we firstly describe the full approach in Jira issue where we also 
include a list of all possible failures which `CheckpointFailureManager` needs 
to react upon and how we define them in code. It can be also a link to some 
design doc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642907#comment-16642907
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r223586404
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The checkpoint failure manager to manage how to process checkpoint failure.
+ */
+public class CheckpointFailureManager {
+
+   private final boolean failOnCheckpointingErrors;
+   private final int tolerableCpFailureNumber;
+   private final AtomicInteger continuousFailureCounter;
+   private final ExecutionGraph executionGraph;
+   private final Object lock = new Object();
+
+   public CheckpointFailureManager(
+   boolean failOnCheckpointingErrors,
+   int tolerableCpFailureNumber,
+   ExecutionGraph executionGraph) {
+   this.failOnCheckpointingErrors = failOnCheckpointingErrors;
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   this.executionGraph = checkNotNull(executionGraph);
+   }
+
+   @VisibleForTesting
+   public AtomicInteger getContinuousFailureCounter() {
+   return continuousFailureCounter;
+   }
+
+   public void resetCounter() {
+   continuousFailureCounter.set(0);
+   }
+
+   public void tryHandleFailure(String reason, long checkpointId) {
+   synchronized (lock) {
+   if (failOnCheckpointingErrors ||
+   continuousFailureCounter.incrementAndGet() > 
tolerableCpFailureNumber) {
+   executionGraph.failGlobal(new 
Throwable(reason));
 
 Review comment:
   hi @azagrebin and @tillrohrmann , Can you take a general look at my PR 
implementation and comment on how to trigger a job failure when the failure 
condition is met? My current implementation seems to be a bit problematic 
(cyclic dependency), which caused a lot of test failures.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625665#comment-16625665
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on issue #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-423942772
 
 
   > Do we need to take this refactoring into account? Because this PR is 
actually a supplement to the checkpoint exception handler.
   
   Yes, I think we can have the same semantics as in the previous only-TM 
implementation (`setFailOnCheckpointingErrors` means `tolerableFailures == 0`) 
but now everything should happen in `CheckpointCoordinator`. That is why I also 
mentioned this point:
   
   > Consider having only DecliningCheckpointExceptionHandler on TaskExecutor 
side and letting now to handle all failure cases only in CheckpointCoordinator
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625650#comment-16625650
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-423940208
 
 
   @azagrebin thanks for your suggestion, I agree and reconsider more details.
   
   Considering that @tillrohrmann  has said that the current checkpoint 
exception handler should also be implemented in `CheckpointCoordinator`. 
   
   > I understand why you implemented it the way you did. I think the 
`setFailOnCheckpointingErrors` should actually also be handled by the 
`CheckpointCoordinator`/`JM` and not on the Task level (but this is a different 
story). This looks a little bit like a shortcut we made back in the days.
   
   Do we need to take this refactoring into account? Because this PR is 
actually a supplement to the checkpoint exception handler.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623777#comment-16623777
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin edited a comment on issue #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-423573937
 
 
   Thanks for the update, @yanghua. Looking at the checkpoint coordinator more 
deeply, I think we firstly have to work a bit more on design for this kind of 
change. We have to take into account at least the following points (roughly):
   - Introduce the separate component/class responsible for failure management, 
counting e.g. `CheckpointFailureManager` or something
   - Job manager should 
   - construct `CheckpointFailureManager`
   - configure with the max failure count and a proper action how to fail.
   - pass it to `CheckpointCoordinator`
   - `CheckpointCoordinator`
   - should give callbacks to `CheckpointFailureManager` about failures and 
successes of checkpoints
   - needs some refactoring to distinguish better failures of 
`PendingCheckpoint`. Now it is `abortXXX()` methods which do not provide enough 
information to count it as a failure for `CheckpointFailureManager` or not 
(like we have for `triggerSavepoint` in `CheckpointDeclineReason`). At the end 
there should be clear place in the `CheckpointCoordinator` where to give 
callbacks to `CheckpointFailureManager`, e.g.:
   - `CheckpointDeclineReason.EXCEPTION` result of `triggerSavepoint`
   - some cases of `PendingCheckpoint.abortDeclined()`, `abortError()`, 
maybe `abortExpired()`
   - Consider having only `DecliningCheckpointExceptionHandler` on 
`TaskExecutor` side and letting now to handle all failure cases only in 
`CheckpointCoordinator`
   
   There might be more points. I suggest we step back and continue discussion 
in the jira issue. Once we have clear design, a PR can be opened again.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623775#comment-16623775
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on issue #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-423573937
 
 
   Thanks for the update, @yanghua. Looking at the checkpoint coordinator more 
deeply, I think we firstly have to work a bit more on design for this kind of 
change. We have to take into account at least the following points (roughly):
   
   - Introduce the separate component/class responsible for failure management, 
counting e.g. `CheckpointFailureManager` or something
   
   - Job manager should 
   - construct `CheckpointFailureManager`
   - configure with the max failure count and a proper action how to fail.
   - pass it to `CheckpointCoordinator`
   
   - `CheckpointCoordinator`
   - should give callbacks to `CheckpointFailureManager` about failures and 
successes of checkpoints
   - needs some refactoring to distinguish better failures of 
`PendingCheckpoint`. Now it is `abortXXX()` methods which do not provide enough 
information to count it as a failure for `CheckpointFailureManager` or not 
(like we have for `triggerSavepoint` in `CheckpointDeclineReason`). At the end 
there should be clear place in the `CheckpointCoordinator` where to give 
callbacks to `CheckpointFailureManager`, e.g.:
   - `CheckpointDeclineReason.EXCEPTION` result of `triggerSavepoint`
   - some cases of `PendingCheckpoint.abortDeclined()`, `abortError()`, 
maybe `abortExpired()`
   
   - Consider having only `DecliningCheckpointExceptionHandler` on 
`TaskExecutor` side and letting now to handle all failure cases only in 
`CheckpointCoordinator`
   
   There might be more points. I suggest we step back and continue discussion 
in the jira issue. Once we have clear design, a PR can be opened again.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623253#comment-16623253
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-423453821
 
 
   hi @tillrohrmann what do you think about the latest implementation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16619225#comment-16619225
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-422427072
 
 
   hi @tillrohrmann I have refactored this PR and counted the failure number in 
the `CheckpointCoordinator`. I think I should push the implementation to let 
you estimate. I have tested the counter's run path, but I don't know if it is 
the [right 
way](https://github.com/apache/flink/pull/6567/files#diff-a38ea0fa799bdaa0b354d80cd8368c60R1010)
 of failing the `ExecutionGraph` . And maybe the test case I added have too 
much assert, I will reduce it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618793#comment-16618793
 ] 

ASF GitHub Bot commented on FLINK-10074:


tillrohrmann commented on issue #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-422329293
 
 
   A failure of a global checkpoint should only increment the failure count by 
one independent of the number of failed subtasks. Thus, I would hope that one 
does not need to set a different threshold for the two different cases you 
described @tweise. However, it is correct that the price the user pays is that 
a consistent problem will only be detected after a longer delay.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617382#comment-16617382
 ] 

ASF GitHub Bot commented on FLINK-10074:


tweise commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-421971822
 
 
   @yanghua @tillrohrmann I think a user would normally expect this count to 
apply globally, but please also consider the case of an intermittent failure 
(like S3 rate limit or storage backend unavailable for other reason). In a 
large job that would cause potentially many subtasks to fail in parallel. While 
this could be addressed by setting a corresponding very high threshold, it 
would in turn mean a problem that is isolated to a single task would not hit 
the threshold until much much later, leaving the job in flipflop status instead 
of failing. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617305#comment-16617305
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-421948370
 
 
   @tillrohrmann I originally wanted to do it in the checkpoint coordinator 
when I first implemented it, but when I touched setFailOnCheckpointingErrors, I 
was guided and turned to the TM. I have also thought about this issue recently 
and it seems that it is more reasonable to implement on the checkpoint 
coordinator. Well, I will implement it again based on the checkpoint 
coordinator.
   
   To @klion26 thanks for your opinion, will change the implementation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617290#comment-16617290
 ] 

ASF GitHub Bot commented on FLINK-10074:


tillrohrmann commented on issue #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-421942140
 
 
   I understand why you implemented it the way you did. I think the 
`setFailOnCheckpointingErrors` should actually also be handled by the 
`CheckpointCoordinator`/`JM` and not on the Task level (but this is a different 
story). This looks a little bit like a shortcut we made back in the days.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617269#comment-16617269
 ] 

ASF GitHub Bot commented on FLINK-10074:


klion26 commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-421937722
 
 
   Thanks for opening this PR @yanghua , as a user, I like the idea fail in JM 
other than TM


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617235#comment-16617235
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-421931626
 
 
   @tweise Any comments or opinions? The current count implementation is for a 
single sub task instance, not the job level. Maybe @tillrohrmann  's idea is 
more reasonable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614541#comment-16614541
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-421282760
 
 
   Hi @tillrohrmann , Currently it is applied to the sub task instance (TM) 
instead of the checkpoint coordinator (JM) as a complement to the 
setFailOnCheckpointingErrors method, since setFailOnCheckpointingErrors only 
provides the user with yes or no choices.
   
   But it sounds reasonable to apply the count on the Job globally.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16608886#comment-16608886
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r216242682
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -875,6 +876,9 @@ public void run() {
localTaskOperatorSubtaskStates,
asyncDurationMillis);
 
+   if 
(owner.asynchronousCheckpointExceptionHandler != null) {
 
 Review comment:
   ok, I agree that it would need more refactoring to remove this null check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606874#comment-16606874
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r215895305
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -875,6 +876,9 @@ public void run() {
localTaskOperatorSubtaskStates,
asyncDurationMillis);
 
+   if 
(owner.asynchronousCheckpointExceptionHandler != null) {
 
 Review comment:
   I looked at the `StreamTask`, the initialization logic of the 
`asynchronousCheckpointExceptionHandler` is in the `invoke` method and before 
the call to the concrete `init` method. The `invoke` method has been marked as 
`final`. So, I personally feel that we don't have to make this thing too 
complicated, just to reduce a null judgment. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606872#comment-16606872
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r215895305
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -875,6 +876,9 @@ public void run() {
localTaskOperatorSubtaskStates,
asyncDurationMillis);
 
+   if 
(owner.asynchronousCheckpointExceptionHandler != null) {
 
 Review comment:
   I looked at the StreamTask, the initialization logic of the 
asynchronousCheckpointExceptionHandler is in the invoke method and before the 
call to the concrete init method. The invoke method has been marked as final. 
So, I personally feel that we don't have to make this thing too complicated, 
just to reduce a null judgment. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606849#comment-16606849
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r215887709
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -875,6 +876,9 @@ public void run() {
localTaskOperatorSubtaskStates,
asyncDurationMillis);
 
+   if 
(owner.asynchronousCheckpointExceptionHandler != null) {
 
 Review comment:
   I see, I would rather initialise it in `NoOpStreamTask` to avoid `null` but 
it would need then more refactoring.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605923#comment-16605923
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-419137130
 
 
   @azagrebin thanks for your suggestion, refactored this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605920#comment-16605920
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r215670211
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -875,6 +876,9 @@ public void run() {
localTaskOperatorSubtaskStates,
asyncDurationMillis);
 
+   if 
(owner.asynchronousCheckpointExceptionHandler != null) {
 
 Review comment:
   @azagrebin In the normal runtime it will not be null, plus this judgment is 
mainly for the checkpoint test code (it does not perform the initialization 
logic for the asynchronousCheckpointExceptionHandler), if not added then the 
test code will report to the NPE, so in the second commit, I added this 
judgment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605727#comment-16605727
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r215586717
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
 ##
 @@ -75,5 +101,10 @@ public void tryHandleCheckpointException(
 

environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), exception);
}
+
+   @Override
+   public void checkpointSucceeded() {
 
 Review comment:
   the empty implementation could be already the default one in the interface 
itself.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605726#comment-16605726
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r215586486
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
 ##
 @@ -48,12 +51,35 @@ public CheckpointExceptionHandler 
createCheckpointExceptionHandler(
 */
static final class FailingCheckpointExceptionHandler implements 
CheckpointExceptionHandler {
 
+   private final int tolerableNumber;
+   private AtomicInteger cpFailureCounter;
+
+   FailingCheckpointExceptionHandler(int tolerableNumber) {
+   this.cpFailureCounter = new AtomicInteger(0);
+   this.tolerableNumber = tolerableNumber;
+   }
+
@Override
public void tryHandleCheckpointException(
CheckpointMetaData checkpointMetaData,
Exception exception) throws Exception {
 
-   throw exception;
+   if (needThrowCheckpointException()) {
 
 Review comment:
   I think at this point the function `tryHandleCheckpointException` is too 
simple to break it down and add `needThrowCheckpointException`. The logic could 
be all in here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605725#comment-16605725
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r215588426
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -875,6 +876,9 @@ public void run() {
localTaskOperatorSubtaskStates,
asyncDurationMillis);
 
+   if 
(owner.asynchronousCheckpointExceptionHandler != null) {
 
 Review comment:
   do you think it can be null? it is initialised in the beginning of the Task 
`invoke` and then used also in catch Exception: `handleExecutionException(e)` 
without any check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605728#comment-16605728
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r215586087
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
 ##
 @@ -48,12 +51,35 @@ public CheckpointExceptionHandler 
createCheckpointExceptionHandler(
 */
static final class FailingCheckpointExceptionHandler implements 
CheckpointExceptionHandler {
 
+   private final int tolerableNumber;
+   private AtomicInteger cpFailureCounter;
 
 Review comment:
   `cpFailureCounter` can be also final


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605705#comment-16605705
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-419073463
 
 
   @tillrohrmann and @zentol if you have time, can you have a look at this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602793#comment-16602793
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-418295546
 
 
   @azagrebin thanks for your suggestion, I have refactored this PR, please 
review again~


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598929#comment-16598929
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r214384046
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
 ##
 @@ -75,6 +79,19 @@ public CheckpointExceptionHandler 
createCheckpointExceptionHandler(
}
};
 
+   if (failOnException) {
+   CheckpointExceptionHandler exceptionHandler =
+   
inspectingFactory.createCheckpointExceptionHandler(failOnException, 
environment);
+   Assert.assertTrue(
+   exceptionHandler instanceof 
CheckpointExceptionHandlerFactory.FailingCheckpointExceptionHandler
+   );
+
+   
CheckpointExceptionHandlerFactory.FailingCheckpointExceptionHandler 
actuallyHandler =
+   
(CheckpointExceptionHandlerFactory.FailingCheckpointExceptionHandler) 
exceptionHandler;
+
+   Assert.assertEquals(3, actuallyHandler.tolerableNumber);
 
 Review comment:
   I would rather make fields of `FailingCheckpointExceptionHandler` private 
and check `tolerableNumber` the same way as `failTaskOnCheckpointException` in 
previous declaration of `CheckpointExceptionHandlerFactory`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598930#comment-16598930
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r214384222
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
 ##
 @@ -63,6 +63,10 @@ private void testConfigForwarding(boolean failOnException) 
throws Exception {
environment.setTaskStateManager(new TestTaskStateManager());

environment.getExecutionConfig().setFailTaskOnCheckpointError(expectedHandlerFlag);
 
+   if (failOnException) {
+   
environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3);
 
 Review comment:
   this can be always set, as it should be just ignored if `failOnException` is 
`false`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598928#comment-16598928
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r214379864
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
 ##
 @@ -37,7 +37,7 @@ public CheckpointExceptionHandler 
createCheckpointExceptionHandler(
Environment environment) {
 
if (failTaskOnCheckpointException) {
-   return new FailingCheckpointExceptionHandler();
+   return new 
FailingCheckpointExceptionHandler(environment);
 
 Review comment:
   I think we do not really need `environment` in the constructor of 
`FailingCheckpointExceptionHandler`, but only `tolerableNumber` which should be 
passed in `createCheckpointExceptionHandler`, the same way as 
`failTaskOnCheckpointException`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598931#comment-16598931
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r214398085
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
 ##
 @@ -48,12 +48,41 @@ public CheckpointExceptionHandler 
createCheckpointExceptionHandler(
 */
static final class FailingCheckpointExceptionHandler implements 
CheckpointExceptionHandler {
 
+   final Environment environment;
+   final int tolerableNumber;
+   long latestFailedCheckpointID;
+   int cpFailureCounter;
+
+   FailingCheckpointExceptionHandler(Environment environment) {
+   this.environment = environment;
+   this.cpFailureCounter = 0;
+   this.tolerableNumber = 
environment.getExecutionConfig().getTaskTolerableCheckpointFailuresNumber();
+   }
+
@Override
public void tryHandleCheckpointException(
CheckpointMetaData checkpointMetaData,
Exception exception) throws Exception {
 
-   throw exception;
+   if (needThrowCheckpointException(checkpointMetaData)) {
+   throw exception;
+   }
+   }
+
+   private boolean needThrowCheckpointException(CheckpointMetaData 
checkpointMetaData) {
+   if (tolerableNumber == 0) {
+   return true;
+   }
+
+   if (checkpointMetaData.getCheckpointId() - 
latestFailedCheckpointID == 1) {
 
 Review comment:
   I think rather than relying on sequential numbering of checkpoints, 
   it is better we add one more signal: 
`CheckpointExceptionHandler.checkpointSucceeded()` where the counter is reset. 
   
   This method can be called in `AsyncCheckpointRunnable.run()`, e.g. after 
`reportCompletedSnapshotStates` is done:
   ```
   owner.asynchronousCheckpointExceptionHandler.checkpointSucceeded(); // 
forward it to synchronousCheckpointExceptionHandler inside
   ```
   
   The checkpoints finish concurrently, so I think we have to use an 
`AtomicInteger` for the `cpFailureCounter` and 
`cpFailureCounter.incrementAndGet()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594625#comment-16594625
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r213203249
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
 ##
 @@ -49,6 +49,59 @@ public void testRethrowingHandler() {
Assert.assertNull(environment.getLastDeclinedCheckpointCause());
}
 
+   @Test
+   public void testRethrowingHandlerWithTolerableNumberTriggered() {
+   DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+   
environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3);
+   CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+   CheckpointExceptionHandler exceptionHandler =
+   
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
+
+   CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+   Exception testException = new Exception("test");
+   try {
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(43L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(44L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(45L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+
+   Assert.fail("Exception not rethrown.");
+   } catch (Exception e) {
+   Assert.assertEquals(testException, e);
+   }
+
+   Assert.assertNull(environment.getLastDeclinedCheckpointCause());
+   }
+
+   @Test
+   public void testRethrowingHandlerWithTolerableNumberNotTriggered() {
+   DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+   
environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3);
+   CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+   CheckpointExceptionHandler exceptionHandler =
+   
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
+
+   CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+   Exception testException = new Exception("test");
+
+   try {
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(43L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(44L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(46L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   } catch (Exception e) {
+   Assert.assertNotEquals(testException, e);
 
 Review comment:
   Yes, we can just throw it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid 

[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594624#comment-16594624
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r213202843
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
 ##
 @@ -49,6 +49,59 @@ public void testRethrowingHandler() {
Assert.assertNull(environment.getLastDeclinedCheckpointCause());
}
 
+   @Test
+   public void testRethrowingHandlerWithTolerableNumberTriggered() {
+   DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+   
environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3);
+   CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+   CheckpointExceptionHandler exceptionHandler =
+   
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
+
+   CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+   Exception testException = new Exception("test");
+   try {
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(43L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(44L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(45L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
 
 Review comment:
   here, This line will throw an exception, then it will enter the catch block 
and verify that the exception is what we expected. If no exception is thrown to 
this line, then Assert.fail() will be fired, indicating that the actual 
exception is not thrown and the test will fail. Of course, as you said, this 
verification does not prove that the exception we need is thrown in this line. 
Maybe I can try and check each checkpoint independently.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594502#comment-16594502
 ] 

ASF GitHub Bot commented on FLINK-10074:


tweise commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r213180551
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
 ##
 @@ -49,6 +49,59 @@ public void testRethrowingHandler() {
Assert.assertNull(environment.getLastDeclinedCheckpointCause());
}
 
+   @Test
+   public void testRethrowingHandlerWithTolerableNumberTriggered() {
+   DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+   
environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3);
+   CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+   CheckpointExceptionHandler exceptionHandler =
+   
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
+
+   CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+   Exception testException = new Exception("test");
+   try {
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(43L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(44L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(45L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+
+   Assert.fail("Exception not rethrown.");
+   } catch (Exception e) {
+   Assert.assertEquals(testException, e);
+   }
+
+   Assert.assertNull(environment.getLastDeclinedCheckpointCause());
+   }
+
+   @Test
+   public void testRethrowingHandlerWithTolerableNumberNotTriggered() {
+   DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+   
environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3);
+   CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+   CheckpointExceptionHandler exceptionHandler =
+   
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
+
+   CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+   Exception testException = new Exception("test");
+
+   try {
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(43L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(44L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(46L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   } catch (Exception e) {
+   Assert.assertNotEquals(testException, e);
 
 Review comment:
   Shouldn't this fail since we don't expect an exception?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a 

[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594497#comment-16594497
 ] 

ASF GitHub Bot commented on FLINK-10074:


tweise commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r213179903
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
 ##
 @@ -49,6 +49,59 @@ public void testRethrowingHandler() {
Assert.assertNull(environment.getLastDeclinedCheckpointCause());
}
 
+   @Test
+   public void testRethrowingHandlerWithTolerableNumberTriggered() {
+   DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+   
environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3);
+   CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+   CheckpointExceptionHandler exceptionHandler =
+   
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
+
+   CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+   Exception testException = new Exception("test");
+   try {
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(43L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(44L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(45L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
 
 Review comment:
   Shouldn't it verify that this attempt generated the exception and not 
previous?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589739#comment-16589739
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-415303507
 
 
   @tillrohrmann When you have free time, please review this PR. thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582620#comment-16582620
 ] 

ASF GitHub Bot commented on FLINK-10074:


tillrohrmann commented on issue #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-413571184
 
 
   Thanks for opening this PR @yanghua. I'll try to give it a review in the 
next days.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582279#comment-16582279
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-413486884
 
 
   cc @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582251#comment-16582251
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua opened a new pull request #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567
 
 
   ## What is the purpose of the change
   
   *This pull request allows number of checkpoint failures*
   
   ## Brief change log
   
- *Add a new API `CheckpointConfig#setTolerableCheckpointFailuresNumber`*
- *Add a new API `CheckpointConfig#getTolerableCheckpointFailuresNumber`*
   - *Changed `FailingCheckpointExceptionHandler#tryHandleCheckpointException`*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - 
*CheckpointExceptionHandlerTest#testRethrowingHandlerWithTolerableNumberTriggered*
 - 
*CheckpointExceptionHandlerTest#testRethrowingHandlerWithTolerableNumberNotTriggered*

   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-15 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581211#comment-16581211
 ] 

Till Rohrmann commented on FLINK-10074:
---

A best effort strategy sounds good to me. I think you could start implementing 
this feature [~yanghua].

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-15 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580750#comment-16580750
 ] 

vinoyang commented on FLINK-10074:
--

[~thw] OK, That's fine, then I will be ready to start dealing with this issue.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-14 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580699#comment-16580699
 ] 

Thomas Weise commented on FLINK-10074:
--

[~yanghua] this is what I meant also, I updated my comment for clarity.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-14 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580687#comment-16580687
 ] 

vinoyang commented on FLINK-10074:
--

[~thw] I personally prefer that once JM failover, the counter will reset. I 
don't think it's necessary to introduce too much complexity for this. If we 
need to maintain a global counter across JM processes, we will use third-party 
components such as zookeeper. I think it is appropriate to maintain this 
counter for the life of a JM process. Once JM failover, the Job will be 
restored (re-deployed, run), and it is reasonable to reset the counter for a 
new runtime environment.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-14 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580624#comment-16580624
 ] 

Thomas Weise commented on FLINK-10074:
--

[~till.rohrmann] it is probably unreasonably hard to do for the JM failure 
case, but how about making it correct with best effort? Only when the JM fails, 
we would not retain the count, which is a very small probability. The majority 
of failures are transient issues in TMs where subtasks just get redeployed, 
much fewer cases TM machine failures. For all these we could have a more 
accurate behavior by retaining the count and failing right away on the next 
checkpoint failure.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-14 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579555#comment-16579555
 ] 

Till Rohrmann commented on FLINK-10074:
---

Since {{setFailOnCheckpointingErrors}} is public, we cannot simply change its 
signature. What we could do though, is to add another method 
{{setNumberTolerableCheckpointFailures(int)}} which is set by default to {{0}} 
and is only respected if {{setFailOnCheckpointingErrors}} is set to {{true}}. 
So if the the user on calls {{setFailOnCheckpointingErrors(true)}} then he will 
get the same old behaviour. Only after calling 
{{setNumberTolerableCheckpointFailures(10)}}, it will wait for 10 checkpoint 
failures before failing. If {{setNumberTolerableCheckpointFailures}} is set but 
{{setFailOnCheckpointingErrors(false)}}, then checkpoint failures won't fail 
the job.

[~thw] would you not reset the counter in case of a restart? This would be hard 
to do in case of a JobManager failover and lead to different behaviours 
depending on the actual fault.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-14 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579327#comment-16579327
 ] 

vinoyang commented on FLINK-10074:
--

[~thw] The setFailOnCheckpointingErrors API is currently not marked as @Public, 
but is also a public API, perhaps we should keep it for compatibility, but this 
does confuse what the current issue needs to do. The 
setFailOnCheckpointingErrors method provides two states, true or false. This 
issue provides three states that are sufficient to override the functionality 
of the setFailOnCheckpointingErrors method. How to decide? [~till.rohrmann].

The second question, you are right.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-13 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579269#comment-16579269
 ] 

Thomas Weise commented on FLINK-10074:
--

I think configuring the behavior as a count of allowable consecutive failures 
would work well. Would this replace the existing setFailOnCheckpointingErrors 
(will that setting become irrelevant when the user already sets the count)?

[https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html]

Regarding what happens once the job was allowed to fail and recovers only to 
fail again: Shouldn't the counter only be reset after the next successful 
checkpoint vs. on restart? 

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-13 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578001#comment-16578001
 ] 

Till Rohrmann commented on FLINK-10074:
---

I would be ok with defining the maximum number of consecutively failed 
checkpoints. Would that also work for you [~thw]?

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576588#comment-16576588
 ] 

vinoyang commented on FLINK-10074:
--

[~trohrm...@apache.org] need more discussion? if no, I will start this work.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-09 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575038#comment-16575038
 ] 

vinoyang commented on FLINK-10074:
--

[~till.rohrmann] yes, I agree with you. If we focus on time, it will become 
more complicated for users, because there are multiple time-related 
configurations that need to understand some details. And if we focus on the 
number of times, it will be more user friendly, as if the maximum number of 
timeouts and failures.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-09 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575001#comment-16575001
 ] 

Till Rohrmann commented on FLINK-10074:
---

I like the idea of having a maximum time between successful checkpoints. 
However, a time constraint might be difficult to configure for a user since the 
actual checkpoint interval could be larger than the configured checkpoint 
interval if the user also specified the minimum time between checkpoints > 0. 
So I'm wondering whether number of allowed checkpoint failures would be easier 
to understand. What do you think?

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-07 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571890#comment-16571890
 ] 

vinoyang commented on FLINK-10074:
--

Hi [~thw] and [~till.rohrmann] : 

I think it depends on our purpose. I personally think that we mainly want to 
prevent a job from completing checkpoint for a long time, because in this case, 
even if we let the job continue to run, once the job fails for other reasons, 
after the recovery It will generate long-term rollback consumption. Here "long 
time" should reflect a kind of coherence, so I personally think that we can 
make a user to set a threshold to specify how many consecutive failures, we let 
this job fail to recover. In the case that the threshold is not reached, we can 
clear the counter once the checkpoint can be successfully completed. But there 
is one more thing worth thinking about. If this happens, even if we let the Job 
fail to restart, it still can't do checkpoint, then what should we do. The 
scenario we assume here is that we chose HDFS as the statebackend, but it 
failed and could not be used in a short time.

Of course, we can also choose the longest time between two successful 
checkpoints. Because the checkpoint period is usually fixed, and if the 
checkpoint always fails for one reason, then the time interval for its failure 
is almost equal, so it can be approximated as another expression of how many 
checkpoints fail consecutively. And here also reflects "coherence."

But from an implementation perspective, it does lead to two different 
implementations.

 

 

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-07 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571889#comment-16571889
 ] 

vinoyang commented on FLINK-10074:
--

Hi [~thw] and [~till.rohrmann] : 

I think it depends on our purpose. I personally think that we mainly want to 
prevent a job from completing checkpoint for a long time, because in this case, 
even if we let the job continue to run, once the job fails for other reasons, 
after the recovery It will generate long-term rollback consumption. Here "long 
time" should reflect a kind of coherence, so I personally think that we can 
make a user to set a threshold to specify how many consecutive failures, we let 
this job fail to recover. In the case that the threshold is not reached, we can 
clear the counter once the checkpoint can be successfully completed. But there 
is one more thing worth thinking about. If this happens, even if we let the Job 
fail to restart, it still can't do checkpoint, then what should we do. The 
scenario we assume here is that we chose HDFS as the statebackend, but it 
failed and could not be used in a short time.

Of course, we can also choose the longest time between two successful 
checkpoints. Because the checkpoint period is usually fixed, and if the 
checkpoint always fails for one reason, then the time interval for its failure 
is almost equal, so it can be approximated as another expression of how many 
checkpoints fail consecutively. And here also reflects "coherence."

But from an implementation perspective, it does lead to two different 
implementations.

 

 

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-07 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571634#comment-16571634
 ] 

Thomas Weise commented on FLINK-10074:
--

It could potentially also be specified as maximum time between successful 
checkpoints.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-07 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571618#comment-16571618
 ] 

Till Rohrmann commented on FLINK-10074:
---

[~yanghua] what is the scope of the initial implementation and how will you 
implement it? Will it be a fixed number of failures or a failure rate which 
triggers a job failure?

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-06 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16570445#comment-16570445
 ] 

Thomas Weise commented on FLINK-10074:
--

[https://lists.apache.org/thread.html/0f887aee4b6e15965a540969d0720dd3c88cb94848f379270f8fb467@%3Cdev.flink.apache.org%3E]

 

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)