[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099625#comment-17099625
 ] 

Piotr Nowojski commented on FLINK-17327:
----------------------------------------

[~qinjunjerry] the bug that [~aljoscha] is talking about, that blocked the 
{{FlinkKafkaProducer}} is on the Kafka client's side, not on the brokers, so to 
fix it, we would have to upgrade Kafka dependencies in our connector to 2.5.0 
(currently it's 2.2.x), which in turn is blocked by FLINK-15362. 

> Kafka unavailability could cause Flink TM shutdown
> --------------------------------------------------
>
>                 Key: FLINK-17327
>                 URL: https://issues.apache.org/jira/browse/FLINK-17327
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.0
>            Reporter: Jun Qin
>            Priority: Major
>         Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 60000ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to