[ 
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726405#comment-17726405
 ] 

Sharon Xie edited comment on FLINK-32196 at 5/25/23 10:04 PM:
--------------------------------------------------------------

Thank you [~tzulitai] for the quick response and information.
{quote}Are you actually observing that there are lingering transactions not 
being aborted in Kafka? Or was that a speculation based on not seeing a 
abortTransaction() in the code?
{quote}
This is a speculation. So this may not be the root cause of the issue I'm 
seeing.
{quote}If there are actually lingering transactions in Kafka after restore, do 
they get timeout by Kafka after transaction.timeout.ms? Or are they lingering 
beyond the timeout threshold?
{quote}
What I've observed is that the subtask gets stuck in the initializing state and 
there is a growing number of kafka-producer-network-thread and the job 
eventually runs OOM -  In the [^kafka_sink_oom_logs.csv], you can see lots of 
producers get closed in the end . In the debug log, I've found that the 
transaction thread never progress beyond “Transition from state INITIALIZING to 
READY” and eventually times out. An example thread log is 
[^kafka_producer_network_thread_log.csv] . A healthy transaction goes from 
INITIALIZING to READY- to COMMITTING_TRANSACTION to READY in the log and the 
thread doesn't exit - example [^healthy_kafka_producer_thread.csv].

I've also queried the kafka's _transaction_state topic for the problematic 
transaction and 
[here|https://gist.github.com/sharonx/51e300e383455f016be1a95f0c855b97] are the 
messages in the topic.  

I'd appreciate any pointers or potential ways to explain the situation. 



was (Author: sharonxr55):
Thank you [~tzulitai] for the quick response and information.
{quote}Are you actually observing that there are lingering transactions not 
being aborted in Kafka? Or was that a speculation based on not seeing a 
abortTransaction() in the code?
{quote}
This is a speculation. So this may not be the root cause of the issue I'm 
seeing.
{quote}If there are actually lingering transactions in Kafka after restore, do 
they get timeout by Kafka after transaction.timeout.ms? Or are they lingering 
beyond the timeout threshold?
{quote}
What I've observed is that the subtask gets stuck in the initializing state and 
there is a growing number of kafka-producer-network-thread and the job 
eventually runs OOM -  [^kafka_sink_oom_logs.csv] . In the debug log, I've 
found that the transaction thread never progress beyond “Transition from state 
INITIALIZING to READY” and eventually times out. An example thread log is 
[^kafka_producer_network_thread_log.csv] . A healthy transaction goes from 
INITIALIZING to READY- to COMMITTING_TRANSACTION to READY in the log and the 
thread doesn't exit - example [^healthy_kafka_producer_thread.csv].

I've also queried the kafka's _transaction_state topic for the problematic 
transaction and 
[here|https://gist.github.com/sharonx/51e300e383455f016be1a95f0c855b97] are the 
messages in the topic.  

I'd appreciate any pointers or potential ways to explain the situation. 


> kafka sink under EO sometimes is unable to recover from a checkpoint
> --------------------------------------------------------------------
>
>                 Key: FLINK-32196
>                 URL: https://issues.apache.org/jira/browse/FLINK-32196
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.6.4, 1.15.4
>            Reporter: Sharon Xie
>            Priority: Major
>         Attachments: healthy_kafka_producer_thread.csv, 
> kafka_producer_network_thread_log.csv, kafka_sink_oom_logs.csv
>
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable 
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and 
> eventually runs OOM. The cause for OOM is that there is a kafka producer 
> thread leak.
> Here is our best *hypothesis* for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering 
> transactions upon recovery 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the 
> `TransactionAborter` doesn't abort those transactions 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function. 
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch() 
> == 0` which is why we are seeing a producer thread leak as the recovery gets 
> stuck in this for loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to