[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17090576#comment-17090576
 ] 

Piotr Nowojski commented on FLINK-17327:
----------------------------------------

Thanks [~qinjunjerry] for submitting the issue. There are multiple issues that 
contributed together to the symptoms that you have reported. I will list them 
in the descending priority order:

1. *CRITICAL bug*: {{setTolerableCheckpointFailureNumber(...)}} and its 
deprecated {{setFailTaskOnCheckpointError(...)}} predecessor are implemented 
incorrectly since https://issues.apache.org/jira/browse/FLINK-4809 and can lead 
to operators (and especially sinks with an external state) in an inconsistent 
state. That's also true even if they are not used, because...

2. *CRITICAL bug*: The logic in how {{CheckpointCoordinator}} handles 
checkpoint timeouts is broken. In your [~qinjunjerry] examples, your job should 
have failed after first checkpoint failure, but checkpoints were time outing on 
CheckpointCoordinator after 5 seconds, before {{FlinkKafkaProducer}} was 
detecting Kafka failure after 2 minutes. Those timeouts were not checked 
against {{setTolerableCheckpointFailureNumber(...)}} limit, so the job was keep 
going with many timed out checkpoints. Now funny thing happens: 
FlinkKafkaProducer detects Kafka failure. Funny thing is that it depends where 
the failure was detected:

a) on processing record? no problem, job will failover immediately once failure 
is detected (in this example after 2 minutes)
b) on checkpoint? heh, the failure is reported to {{CheckpointCoordinator}} 
*and gets ignored, as PendingCheckpoint has already been discarded 2 minutes 
ago* :) So theoretically the checkpoints can keep failing forever and the job 
will not restart automatically, unless something else fails.

Even more funny things can happen if we mix 1. or 2b) with intermittent 
external system failure. Sink reports an exception, transaction was 
lost/aborted, Sink is in failed state, but if there will be a happy coincidence 
that it manages to accept further records, this exception can be lost and all 
of the records in those failed checkpoints will be lost forever as well. In all 
of the examples that [~qinjunjerry] posted it hasn’t happened. 
{{FlinkKafkaProducer}} was not able to recover after the initial failure and it 
was keep throwing exceptions until the job finally failed (but much later then 
it should have). And that’s not guaranteed anywhere.

3. *non critical bug*: {{FlinkKafkaConsumer}} is not gracefully closing:
{code}
2020-04-21 17:17:50,612 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Attempting to cancel task Source: Custom Source -> Sink: Unnamed 
(1/1) (8928563344a077ee98377721a2f22790).
2020-04-21 17:17:50,612 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source -> Sink: Unnamed (1/1) 
(8928563344a077ee98377721a2f22790) switched from RUNNING to CANCELING.
2020-04-21 17:17:50,612 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Triggering cancellation of task code Source: Custom Source -> 
Sink: Unnamed (1/1) (8928563344a077ee98377721a2f22790).
2020-04-21 17:17:50,614 WARN  
org.apache.flink.streaming.runtime.tasks.StreamTask           - Error while 
canceling task.
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
        at 
org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
        at 
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
        at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)
        at java.lang.Thread.run(Thread.java:748)
{code}
That could be analysed why is it happening, as this is what brought the cluster 
down (caused TaskExecutor to restart)

4. You configuration doesn’t make much sense. You are time outing checkpoints 
in 5-10s, with similar checkpoint interval, but timeouts in Kafka are probably 
still on default values, like 120 or 60 seconds:
{code}
2020-04-21 17:15:50,585 INFO  
org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Could not 
complete snapshot 7 for operator Source: Custom Source -> Sink: Unnamed (1/1).
org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
initializing transactional state in 60000ms.
{code}
That is not a big issue on it’s own, it’s just one of the things that triggered 
this whole complicated crash ({{CheckpointCoordinator}} timeouting checkpoint, 
which triggered other bugs)

5. The reason why splitting the job into multiple tasks helped, is probably 
just a pure lack. In this case FlinkKafkaProducer was failing, triggering 
cancelation on other tasks which actually completed successfully.

6.
{code}
Too many ongoing snapshots. Increase kafka producers pool size or decrease 
number of concurrent checkpoints.
{code}
Is just an after shock of {{FlinkKafkaProducer}} ending up in 
broken/inconsistent state because of previously ignored errors (points 1. and 
2.). Task should have failed much sooner, after first snapshotting failure. But 
it kept going until it run out of producers in the pool, but that’s pure 
coincidence. After the first failure, there was no way to keep going without a 
data loss, and we were actually lucky that it didn’t recover and there was some 
terminal failure after all. (edited) 


As there are 3 different bugs, I will create new tickets for those issues.


> Kafka unavailability could cause Flink TM shutdown
> --------------------------------------------------
>
>                 Key: FLINK-17327
>                 URL: https://issues.apache.org/jira/browse/FLINK-17327
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.0
>            Reporter: Jun Qin
>            Priority: Major
>         Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 60000ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to