[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-06-10 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17130745#comment-17130745
 ] 

Aljoscha Krettek commented on FLINK-17327:
--

For posterity, here's a summary of the issue:
* our code calls {{close()}} on the {{KafkaProducer}} (Kafka code), which is 
equivalent to calling {{close}} with a timeout of {{Long.MAX_VALUE}}
* this means threads will leak when a failure happens, for example because of 
Broker downtime
* the Flink Task Watchdog will kill the Task Manager because of these threads 
after a timeout

The fix is to always call {{close()}} with a reasonable timeout.

The fix also requires a Kafka version bump because of KAFKA-6635/KAFKA-7763, 
which mean that resources still leak even when closing with a timeout. 
Additionally, we need to close with exactly zero as timeout, because otherwise 
in-flight transactions will be aborted.

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Critical
> Fix For: 1.12.0
>
> Attachments: 0001-Change-version-to-2.4.2-ALJOSCHA.patch, 
> 0002-Don-t-abort-in-flight-transactions.patch, Standalonesession.log, TM.log, 
> TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-06-09 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17129590#comment-17129590
 ] 

Aljoscha Krettek commented on FLINK-17327:
--

I was mixing up issues before, KAFKA-6635 has a fix but also introduces the 
"feature" that transactions are aborted on shutdown.

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Critical
> Fix For: 1.12.0
>
> Attachments: 0001-Change-version-to-2.4.2-ALJOSCHA.patch, 
> 0002-Don-t-abort-in-flight-transactions.patch, Standalonesession.log, TM.log, 
> TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-06-08 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17128272#comment-17128272
 ] 

Aljoscha Krettek commented on FLINK-17327:
--

I managed to find a fix for this:

 - change our code to always use {{close()}} with a timeout on the Kafka 
Producer, if not, we might leave lingering threads
 - this alone does not work because of KAFKA-7763, i.e. on shutdown requests 
are not properly cancelled, which leaves lingering threads
 - the fix KAFKA-7763 also introduces code that aborts outstanding transactions 
when cancelling. This doesn't work together with out exactly-once Kafka Producer
 - you need a patched Kafka that includes the fix part of KAFKA-7763, without 
the code that aborts transactions, I'm attaching a patch for that against the 
Kafka 2.4 branch

The changes needed in Flink are here: 
https://github.com/aljoscha/flink/tree/flink-17327-kafka-clean-shutdown-2.4. 
Patch for Kafka is attached. I don't think the Kafka project will like that 
patch, though, because aborting outstanding transactions is valid for Kafka 
Streams/KSQL where pending transactions that are not cancelled with block 
downstream consumption. 

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Critical
> Fix For: 1.12.0
>
> Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-05-11 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104242#comment-17104242
 ] 

Aljoscha Krettek commented on FLINK-17327:
--

Btw, this can be reproduced even simpler. A job with just a synthetic source 
and a {{FlinkKafkaProducer}} will also be stuck in cancelling and eventually 
kill the TM. So it has nothing to do with FLINK-16482.

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Major
> Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-05-05 Thread Jun Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099782#comment-17099782
 ] 

Jun Qin commented on FLINK-17327:
-

What do you mean by "job fails successfully"? With Flink 1.10, I believe you 
will get the following exception (after 1 min) which will then trigger the job 
cancellation:
{code:java}
2020-05-04 16:34:36,262 INFO  
org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Could not 
complete snapshot 2 for operator Source: Custom Source -> Sink: Unnamed (1/1).
org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
initializing transactional state in 6ms.
{code}

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Major
> Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-05-05 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099777#comment-17099777
 ] 

Aljoscha Krettek commented on FLINK-17327:
--

I think the Kafka code doesn't like being interrupted, which Flink does when 
cancelling.

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Major
> Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-05-05 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099775#comment-17099775
 ] 

Aljoscha Krettek commented on FLINK-17327:
--

For testing I tried this against Kafka 2.5.0 (which we can't really do, see 
FLINK-15362). Here the job fails successfully, because we don't wait 
indefinitely for the result. However, now we have a leftover Kafka 
{{NetworkClient}} which logs indefinitely:
{code}
2020-05-05 12:59:03,465 WARN  org.apache.kafka.clients.NetworkClient
   [] - [Producer clientId=producer-Map -> Sink: 
Unnamed-c09dc291fad93d575e015871097bfc60-4, transactionalId=Map -> Sink: 
Unnamed-c09dc291fad93d575e015871097bfc60-4] Connection to node -1 
(localhost/127.0.0.1:9092) could not be established. Broker may not be 
available.
{code}

Also not good, because then your {{TaskManagers}} will eventually be full of 
leftover Kafka threads.

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Major
> Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-05-05 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099751#comment-17099751
 ] 

Aljoscha Krettek commented on FLINK-17327:
--

This is another fix we need: https://issues.apache.org/jira/browse/KAFKA-7763. 
It's only available from 2.3.x onwards.

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Major
> Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-05-05 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099625#comment-17099625
 ] 

Piotr Nowojski commented on FLINK-17327:


[~qinjunjerry] the bug that [~aljoscha] is talking about, that blocked the 
{{FlinkKafkaProducer}} is on the Kafka client's side, not on the brokers, so to 
fix it, we would have to upgrade Kafka dependencies in our connector to 2.5.0 
(currently it's 2.2.x), which in turn is blocked by FLINK-15362. 

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Major
> Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-05-05 Thread Jun Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099611#comment-17099611
 ] 

Jun Qin commented on FLINK-17327:
-

The above scenario was tested with kafka_2.12-2.5.0

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Major
> Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-05-05 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099608#comment-17099608
 ] 

Aljoscha Krettek commented on FLINK-17327:
--

The fix I mentioned is only available on Kafka 2.5.x, so to fix it we should 
open Kafka Issues and fix it also for earlier versions.

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Major
> Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-05-04 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17099045#comment-17099045
 ] 

Aljoscha Krettek commented on FLINK-17327:
--

I believe that {{TransactionalRequestResult.await()}} is the culprit for the 
indefinite blocking, the latch is not counted down in the failure case: 
https://github.com/apache/kafka/blob/2.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java#L38.

I also believe that this bug in Kafka was fixed here as an unrelated change: 
https://github.com/apache/kafka/commit/df13fc93d0aebfe0ecc40dd4af3c5fb19b35f710#diff-8a2c4f47dcec247ce2ecebf082b3d0b1R42.

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Major
> Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-04-23 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090576#comment-17090576
 ] 

Piotr Nowojski commented on FLINK-17327:


Thanks [~qinjunjerry] for submitting the issue. There are multiple issues that 
contributed together to the symptoms that you have reported. I will list them 
in the descending priority order:

1. *CRITICAL bug*: {{setTolerableCheckpointFailureNumber(...)}} and its 
deprecated {{setFailTaskOnCheckpointError(...)}} predecessor are implemented 
incorrectly since https://issues.apache.org/jira/browse/FLINK-4809 and can lead 
to operators (and especially sinks with an external state) in an inconsistent 
state. That's also true even if they are not used, because...

2. *CRITICAL bug*: The logic in how {{CheckpointCoordinator}} handles 
checkpoint timeouts is broken. In your [~qinjunjerry] examples, your job should 
have failed after first checkpoint failure, but checkpoints were time outing on 
CheckpointCoordinator after 5 seconds, before {{FlinkKafkaProducer}} was 
detecting Kafka failure after 2 minutes. Those timeouts were not checked 
against {{setTolerableCheckpointFailureNumber(...)}} limit, so the job was keep 
going with many timed out checkpoints. Now funny thing happens: 
FlinkKafkaProducer detects Kafka failure. Funny thing is that it depends where 
the failure was detected:

a) on processing record? no problem, job will failover immediately once failure 
is detected (in this example after 2 minutes)
b) on checkpoint? heh, the failure is reported to {{CheckpointCoordinator}} 
*and gets ignored, as PendingCheckpoint has already been discarded 2 minutes 
ago* :) So theoretically the checkpoints can keep failing forever and the job 
will not restart automatically, unless something else fails.

Even more funny things can happen if we mix 1. or 2b) with intermittent 
external system failure. Sink reports an exception, transaction was 
lost/aborted, Sink is in failed state, but if there will be a happy coincidence 
that it manages to accept further records, this exception can be lost and all 
of the records in those failed checkpoints will be lost forever as well. In all 
of the examples that [~qinjunjerry] posted it hasn’t happened. 
{{FlinkKafkaProducer}} was not able to recover after the initial failure and it 
was keep throwing exceptions until the job finally failed (but much later then 
it should have). And that’s not guaranteed anywhere.

3. *non critical bug*: {{FlinkKafkaConsumer}} is not gracefully closing:
{code}
2020-04-21 17:17:50,612 INFO  org.apache.flink.runtime.taskmanager.Task 
- Attempting to cancel task Source: Custom Source -> Sink: Unnamed 
(1/1) (8928563344a077ee98377721a2f22790).
2020-04-21 17:17:50,612 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source -> Sink: Unnamed (1/1) 
(8928563344a077ee98377721a2f22790) switched from RUNNING to CANCELING.
2020-04-21 17:17:50,612 INFO  org.apache.flink.runtime.taskmanager.Task 
- Triggering cancellation of task code Source: Custom Source -> 
Sink: Unnamed (1/1) (8928563344a077ee98377721a2f22790).
2020-04-21 17:17:50,614 WARN  
org.apache.flink.streaming.runtime.tasks.StreamTask   - Error while 
canceling task.
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
at 
org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
at 
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)
at java.lang.Thread.run(Thread.java:748)
{code}
That could be analysed why is it happening, as this is what brought the cluster 
down (caused TaskExecutor to restart)

4. You configuration doesn’t make much sense. You are time outing checkpoints 
in 5-10s, with similar checkpoint interval, but timeouts in Kafka are probably 
still on default values, like 120 or 60 seconds:
{code}
2020-04-21 17:15:50,585 INFO  
org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Could not 
complete snapshot 7 for operator Source: Custom Source -> Sink: Unnamed (1/1).
org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
initializing transactional state in 6ms.
{code}
That is not a big issue on it’s own, it’s just one of the things that triggered 
this whole complicated crash ({{CheckpointCoordinator}} 

[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-04-23 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090402#comment-17090402
 ] 

Piotr Nowojski commented on FLINK-17327:


I'm analysing this [~aljoscha] and there are at least couple of issues here 
that we need to fix. Once I'm done I will post my findings.

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Major
> Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-04-23 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090378#comment-17090378
 ] 

Aljoscha Krettek commented on FLINK-17327:
--

Isn't this the expected behaviour? If Flink cannot write to Kafka then the job 
will fail. Is that what you're observing?

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Major
> Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-04-22 Thread Jun Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090018#comment-17090018
 ] 

Jun Qin commented on FLINK-17327:
-

Added also the Standalonesession logs which covers the time frame of both 
tests. My local standalone cluster is basically one TM with one slot. 

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Major
> Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-04-22 Thread Jun Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089755#comment-17089755
 ] 

Jun Qin commented on FLINK-17327:
-

{{TM.log}} is for a job where Kafka Source and Kafka Sink are in a single task 
thread

{{TM_producer_only_task.log}} is for a job with three task threads: Kafka 
Source, map, Kafka Sink

> Kafka unavailability could cause Flink TM shutdown
> --
>
> Key: FLINK-17327
> URL: https://issues.apache.org/jira/browse/FLINK-17327
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.10.0
>Reporter: Jun Qin
>Priority: Major
> Attachments: TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 6ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)