[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17130745#comment-17130745 ] Aljoscha Krettek commented on FLINK-17327: -- For posterity, here's a summary of the issue: * our code calls {{close()}} on the {{KafkaProducer}} (Kafka code), which is equivalent to calling {{close}} with a timeout of {{Long.MAX_VALUE}} * this means threads will leak when a failure happens, for example because of Broker downtime * the Flink Task Watchdog will kill the Task Manager because of these threads after a timeout The fix is to always call {{close()}} with a reasonable timeout. The fix also requires a Kafka version bump because of KAFKA-6635/KAFKA-7763, which mean that resources still leak even when closing with a timeout. Additionally, we need to close with exactly zero as timeout, because otherwise in-flight transactions will be aborted. > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Critical > Fix For: 1.12.0 > > Attachments: 0001-Change-version-to-2.4.2-ALJOSCHA.patch, > 0002-Don-t-abort-in-flight-transactions.patch, Standalonesession.log, TM.log, > TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17129590#comment-17129590 ] Aljoscha Krettek commented on FLINK-17327: -- I was mixing up issues before, KAFKA-6635 has a fix but also introduces the "feature" that transactions are aborted on shutdown. > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Critical > Fix For: 1.12.0 > > Attachments: 0001-Change-version-to-2.4.2-ALJOSCHA.patch, > 0002-Don-t-abort-in-flight-transactions.patch, Standalonesession.log, TM.log, > TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17128272#comment-17128272 ] Aljoscha Krettek commented on FLINK-17327: -- I managed to find a fix for this: - change our code to always use {{close()}} with a timeout on the Kafka Producer, if not, we might leave lingering threads - this alone does not work because of KAFKA-7763, i.e. on shutdown requests are not properly cancelled, which leaves lingering threads - the fix KAFKA-7763 also introduces code that aborts outstanding transactions when cancelling. This doesn't work together with out exactly-once Kafka Producer - you need a patched Kafka that includes the fix part of KAFKA-7763, without the code that aborts transactions, I'm attaching a patch for that against the Kafka 2.4 branch The changes needed in Flink are here: https://github.com/aljoscha/flink/tree/flink-17327-kafka-clean-shutdown-2.4. Patch for Kafka is attached. I don't think the Kafka project will like that patch, though, because aborting outstanding transactions is valid for Kafka Streams/KSQL where pending transactions that are not cancelled with block downstream consumption. > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Critical > Fix For: 1.12.0 > > Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104242#comment-17104242 ] Aljoscha Krettek commented on FLINK-17327: -- Btw, this can be reproduced even simpler. A job with just a synthetic source and a {{FlinkKafkaProducer}} will also be stuck in cancelling and eventually kill the TM. So it has nothing to do with FLINK-16482. > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Major > Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099782#comment-17099782 ] Jun Qin commented on FLINK-17327: - What do you mean by "job fails successfully"? With Flink 1.10, I believe you will get the following exception (after 1 min) which will then trigger the job cancellation: {code:java} 2020-05-04 16:34:36,262 INFO org.apache.flink.streaming.api.operators.AbstractStreamOperator - Could not complete snapshot 2 for operator Source: Custom Source -> Sink: Unnamed (1/1). org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 6ms. {code} > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Major > Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099777#comment-17099777 ] Aljoscha Krettek commented on FLINK-17327: -- I think the Kafka code doesn't like being interrupted, which Flink does when cancelling. > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Major > Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099775#comment-17099775 ] Aljoscha Krettek commented on FLINK-17327: -- For testing I tried this against Kafka 2.5.0 (which we can't really do, see FLINK-15362). Here the job fails successfully, because we don't wait indefinitely for the result. However, now we have a leftover Kafka {{NetworkClient}} which logs indefinitely: {code} 2020-05-05 12:59:03,465 WARN org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-Map -> Sink: Unnamed-c09dc291fad93d575e015871097bfc60-4, transactionalId=Map -> Sink: Unnamed-c09dc291fad93d575e015871097bfc60-4] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. {code} Also not good, because then your {{TaskManagers}} will eventually be full of leftover Kafka threads. > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Major > Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099751#comment-17099751 ] Aljoscha Krettek commented on FLINK-17327: -- This is another fix we need: https://issues.apache.org/jira/browse/KAFKA-7763. It's only available from 2.3.x onwards. > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Major > Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099625#comment-17099625 ] Piotr Nowojski commented on FLINK-17327: [~qinjunjerry] the bug that [~aljoscha] is talking about, that blocked the {{FlinkKafkaProducer}} is on the Kafka client's side, not on the brokers, so to fix it, we would have to upgrade Kafka dependencies in our connector to 2.5.0 (currently it's 2.2.x), which in turn is blocked by FLINK-15362. > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Major > Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099611#comment-17099611 ] Jun Qin commented on FLINK-17327: - The above scenario was tested with kafka_2.12-2.5.0 > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Major > Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099608#comment-17099608 ] Aljoscha Krettek commented on FLINK-17327: -- The fix I mentioned is only available on Kafka 2.5.x, so to fix it we should open Kafka Issues and fix it also for earlier versions. > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Major > Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099045#comment-17099045 ] Aljoscha Krettek commented on FLINK-17327: -- I believe that {{TransactionalRequestResult.await()}} is the culprit for the indefinite blocking, the latch is not counted down in the failure case: https://github.com/apache/kafka/blob/2.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java#L38. I also believe that this bug in Kafka was fixed here as an unrelated change: https://github.com/apache/kafka/commit/df13fc93d0aebfe0ecc40dd4af3c5fb19b35f710#diff-8a2c4f47dcec247ce2ecebf082b3d0b1R42. > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Major > Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090576#comment-17090576 ] Piotr Nowojski commented on FLINK-17327: Thanks [~qinjunjerry] for submitting the issue. There are multiple issues that contributed together to the symptoms that you have reported. I will list them in the descending priority order: 1. *CRITICAL bug*: {{setTolerableCheckpointFailureNumber(...)}} and its deprecated {{setFailTaskOnCheckpointError(...)}} predecessor are implemented incorrectly since https://issues.apache.org/jira/browse/FLINK-4809 and can lead to operators (and especially sinks with an external state) in an inconsistent state. That's also true even if they are not used, because... 2. *CRITICAL bug*: The logic in how {{CheckpointCoordinator}} handles checkpoint timeouts is broken. In your [~qinjunjerry] examples, your job should have failed after first checkpoint failure, but checkpoints were time outing on CheckpointCoordinator after 5 seconds, before {{FlinkKafkaProducer}} was detecting Kafka failure after 2 minutes. Those timeouts were not checked against {{setTolerableCheckpointFailureNumber(...)}} limit, so the job was keep going with many timed out checkpoints. Now funny thing happens: FlinkKafkaProducer detects Kafka failure. Funny thing is that it depends where the failure was detected: a) on processing record? no problem, job will failover immediately once failure is detected (in this example after 2 minutes) b) on checkpoint? heh, the failure is reported to {{CheckpointCoordinator}} *and gets ignored, as PendingCheckpoint has already been discarded 2 minutes ago* :) So theoretically the checkpoints can keep failing forever and the job will not restart automatically, unless something else fails. Even more funny things can happen if we mix 1. or 2b) with intermittent external system failure. Sink reports an exception, transaction was lost/aborted, Sink is in failed state, but if there will be a happy coincidence that it manages to accept further records, this exception can be lost and all of the records in those failed checkpoints will be lost forever as well. In all of the examples that [~qinjunjerry] posted it hasn’t happened. {{FlinkKafkaProducer}} was not able to recover after the initial failure and it was keep throwing exceptions until the job finally failed (but much later then it should have). And that’s not guaranteed anywhere. 3. *non critical bug*: {{FlinkKafkaConsumer}} is not gracefully closing: {code} 2020-04-21 17:17:50,612 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Source: Custom Source -> Sink: Unnamed (1/1) (8928563344a077ee98377721a2f22790). 2020-04-21 17:17:50,612 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Unnamed (1/1) (8928563344a077ee98377721a2f22790) switched from RUNNING to CANCELING. 2020-04-21 17:17:50,612 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Source: Custom Source -> Sink: Unnamed (1/1) (8928563344a077ee98377721a2f22790). 2020-04-21 17:17:50,614 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - Error while canceling task. org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818) at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136) at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602) at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355) at java.lang.Thread.run(Thread.java:748) {code} That could be analysed why is it happening, as this is what brought the cluster down (caused TaskExecutor to restart) 4. You configuration doesn’t make much sense. You are time outing checkpoints in 5-10s, with similar checkpoint interval, but timeouts in Kafka are probably still on default values, like 120 or 60 seconds: {code} 2020-04-21 17:15:50,585 INFO org.apache.flink.streaming.api.operators.AbstractStreamOperator - Could not complete snapshot 7 for operator Source: Custom Source -> Sink: Unnamed (1/1). org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 6ms. {code} That is not a big issue on it’s own, it’s just one of the things that triggered this whole complicated crash ({{CheckpointCoordinator}}
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090402#comment-17090402 ] Piotr Nowojski commented on FLINK-17327: I'm analysing this [~aljoscha] and there are at least couple of issues here that we need to fix. Once I'm done I will post my findings. > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Major > Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090378#comment-17090378 ] Aljoscha Krettek commented on FLINK-17327: -- Isn't this the expected behaviour? If Flink cannot write to Kafka then the job will fail. Is that what you're observing? > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Major > Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090018#comment-17090018 ] Jun Qin commented on FLINK-17327: - Added also the Standalonesession logs which covers the time frame of both tests. My local standalone cluster is basically one TM with one slot. > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Major > Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown
[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089755#comment-17089755 ] Jun Qin commented on FLINK-17327: - {{TM.log}} is for a job where Kafka Source and Kafka Sink are in a single task thread {{TM_producer_only_task.log}} is for a job with three task threads: Kafka Source, map, Kafka Sink > Kafka unavailability could cause Flink TM shutdown > -- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug >Affects Versions: 1.10.0 >Reporter: Jun Qin >Priority: Major > Attachments: TM.log, TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 6ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)