Jun Qin created FLINK-17327: ------------------------------- Summary: Kafka unavailability could cause Flink TM shutdown Key: FLINK-17327 URL: https://issues.apache.org/jira/browse/FLINK-17327 Project: Flink Issue Type: Bug Affects Versions: 1.10.0 Reporter: Jun Qin
Steps to reproduce: # Start a Flink 1.10 standalone cluster # Run a Flink job which reads from one Kafka topic and writes to another topic, with exactly-once checkpointing enabled # Stop all Kafka Brokers after a few successful checkpoints When Kafka brokers are down: # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker could not be established # Then, Flink could not complete snapshot due to {{Timeout expired while initializing transactional state in 60000ms}} # After several snapshot failures, Flink reported {{Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.}} # Eventually, Flink tried to cancel the task which did not succeed within 3 min # Then {{Fatal error occurred while executing the TaskManager. Shutting it down...}} I will attach the logs to show the details. Worth to note that if there would be no consumer but producer only in the task, the behavior is different: # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker could not be established # after {{delivery.timeout.ms}} (2min by default), producer reports: {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for output-topic-0:120001 ms has passed since batch creation}} # Flink tried to cancel the upstream tasks and created a new producer # The new producer obviously reported connectivity issue to brokers # This continues till Kafka brokers are back. # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.}} # Flink cancelled the tasks and restarted them # The job continues, and new checkpoint succeeded. # TM runs all the time in this scenario I set Kafka transaction time out to 1 hour just to avoid transaction timeout. To get a producer only task, I called {{env.disableOperatorChaining();}} in the second scenario. -- This message was sent by Atlassian Jira (v8.3.4#803005)