[ 
https://issues.apache.org/jira/browse/FLINK-18150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17146262#comment-17146262
 ] 

Julius Michaelis commented on FLINK-18150:
------------------------------------------

Finally managed to build my job on release-1.11-rc2. It seems like the problem 
persists. Hence I'm guessing it's unrelated to 17327.
(I was hoping that even if it's unrelated, the update of the Kafka consumer 
from 2.2.0 to 2.4.2 with better node selection behavior would help. Nope. :/)

> A single failing Kafka broker may cause jobs to fail indefinitely with 
> TimeoutException: Timeout expired while fetching topic metadata
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18150
>                 URL: https://issues.apache.org/jira/browse/FLINK-18150
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.1
>         Environment: It is a bit unclear to me under what circumstances this 
> can be reproduced. I created a "minimum" non-working example at 
> https://github.com/jcaesar/flink-kafka-ha-failure. Note that this deploys the 
> minimum number of Kafka brokers, but it works just as well with replication 
> factor 3 and 8 brokers, e.g.
> I run this with
> {code:bash}
> docker-compose kill; and docker-compose rm -vf; and docker-compose up 
> --abort-on-container-exit --build
> {code}
> The exception should appear on the webui after 5~6 minutes.
> To make sure that this isn't dependent on my machine, I've also checked 
> reproducibility on a m5a.2xlarge EC2 instance.
> You verify that the Kafka cluster is running "normally" e.g. with:
> {code:bash}
> kafkacat -b localhost,localhost:9093 -L
> {code}
> So far, I only know that
> * {{flink.partition-discovery.interval-millis}} must be set.
> * The broker that failed must be part of the {{bootstrap.servers}}
> * There needs to be a certain amount of topics or producers, but I'm unsure 
> which is crucial
> * Changing the values of {{metadata.request.timeout.ms}} or 
> {{flink.partition-discovery.interval-millis}} does not seem to have any 
> effect.
>            Reporter: Julius Michaelis
>            Priority: Major
>
> When a Kafka broker fails that is listed among the bootstrap servers and 
> partition discovery is active, the Flink job reading from that Kafka may 
> enter a failing loop.
> At first, the job seems to react normally without failure with only a short 
> latency spike when switching Kafka leaders.
> Then, it fails with a
> {code:none}
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>         at 
> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
>         at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:821)
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
>         at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
>         at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> It recovers, but processes fewer than the expected amount of records.
> Finally,  the job fails with
> {code:none}
> 2020-06-05 13:59:37
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
> fetching topic metadata
> {code}
> and repeats doing so while not processing any records. (The exception comes 
> without any backtrace or otherwise interesting information)
> I have also observed this behavior with partition-discovery turned off, but 
> only when the Flink job failed (after a broker failure) and had to run 
> checkpoint recovery for some other reason.
> Please see the [Environment] description for information on how to reproduce 
> the issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to