[ 
https://issues.apache.org/jira/browse/FLINK-17351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092055#comment-17092055
 ] 

Jiayi Liao commented on FLINK-17351:
------------------------------------

[~roman_khachatryan] Interesting issue. But what if users don't set the 
{{CheckpointFailureManager}}? The exception thrown when producer is doing 
checkpointing, will also be swallowed because the checkpoint is expired.

> CheckpointCoordinator and CheckpointFailureManager ignores checkpoint timeouts
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-17351
>                 URL: https://issues.apache.org/jira/browse/FLINK-17351
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.9.2, 1.10.0
>            Reporter: Piotr Nowojski
>            Priority: Critical
>             Fix For: 1.11.0
>
>
> As described in point 2: 
> https://issues.apache.org/jira/browse/FLINK-17327?focusedCommentId=17090576&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17090576
> (copy of description from above linked comment):
> The logic in how {{CheckpointCoordinator}} handles checkpoint timeouts is 
> broken. In your [~qinjunjerry] examples, your job should have failed after 
> first checkpoint failure, but checkpoints were time outing on 
> CheckpointCoordinator after 5 seconds, before {{FlinkKafkaProducer}} was 
> detecting Kafka failure after 2 minutes. Those timeouts were not checked 
> against {{setTolerableCheckpointFailureNumber(...)}} limit, so the job was 
> keep going with many timed out checkpoints. Now funny thing happens: 
> FlinkKafkaProducer detects Kafka failure. Funny thing is that it depends 
> where the failure was detected:
> a) on processing record? no problem, job will failover immediately once 
> failure is detected (in this example after 2 minutes)
> b) on checkpoint? heh, the failure is reported to {{CheckpointCoordinator}} 
> *and gets ignored, as PendingCheckpoint has already been discarded 2 minutes 
> ago* :) So theoretically the checkpoints can keep failing forever and the job 
> will not restart automatically, unless something else fails.
> Even more funny things can happen if we mix FLINK-17350 . or b) with 
> intermittent external system failure. Sink reports an exception, transaction 
> was lost/aborted, Sink is in failed state, but if there will be a happy 
> coincidence that it manages to accept further records, this exception can be 
> lost and all of the records in those failed checkpoints will be lost forever 
> as well. In all of the examples that [~qinjunjerry] posted it hasn't 
> happened. {{FlinkKafkaProducer}} was not able to recover after the initial 
> failure and it was keep throwing exceptions until the job finally failed (but 
> much later then it should have). And that's not guaranteed anywhere.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to