[ 
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726405#comment-17726405
 ] 

Sharon Xie edited comment on FLINK-32196 at 5/25/23 10:04 PM:
--------------------------------------------------------------

Thank you [~tzulitai] for the quick response and information.
{quote}Are you actually observing that there are lingering transactions not 
being aborted in Kafka? Or was that a speculation based on not seeing a 
abortTransaction() in the code?
{quote}
This is a speculation. So this may not be the root cause of the issue I'm 
seeing.
{quote}If there are actually lingering transactions in Kafka after restore, do 
they get timeout by Kafka after transaction.timeout.ms? Or are they lingering 
beyond the timeout threshold?
{quote}
What I've observed is that the subtask gets stuck in the initializing state and 
there is a growing number of kafka-producer-network-thread and the job 
eventually runs OOM -  [^kafka_sink_oom_logs.csv] . In the debug log, I've 
found that the transaction thread never progress beyond “Transition from state 
INITIALIZING to READY” and eventually times out. An example thread log is 
[^kafka_producer_network_thread_log.csv] . A healthy transaction goes from 
INITIALIZING to READY- to COMMITTING_TRANSACTION to READY in the log and the 
thread doesn't exit - example [^healthy_kafka_producer_thread.csv].

I've also queried the kafka's _transaction_state topic for the problematic 
transaction and 
[here|https://gist.github.com/sharonx/51e300e383455f016be1a95f0c855b97] are the 
messages in the topic.  

I'd appreciate any pointers or potential ways to explain the situation. 



was (Author: sharonxr55):
Thank you [~tzulitai] for the quick response and information.
{quote}Are you actually observing that there are lingering transactions not 
being aborted in Kafka? Or was that a speculation based on not seeing a 
abortTransaction() in the code?
{quote}
This is a speculation. So this may not be the root cause of the issue I'm 
seeing.
{quote}If there are actually lingering transactions in Kafka after restore, do 
they get timeout by Kafka after transaction.timeout.ms? Or are they lingering 
beyond the timeout threshold?
{quote}
What I've observed is that the subtask gets stuck in the initializing state and 
there is a growing number of kafka-producer-network-thread and the job 
eventually runs OOM. In the debug log, I've found that the transaction thread 
never progress beyond “Transition from state INITIALIZING to READY” and 
eventually times out. An example thread log is 
[^kafka_producer_network_thread_log.csv] . A healthy transaction goes from 
INITIALIZING to READY- to COMMITTING_TRANSACTION to READY in the log and the 
thread doesn't exit - example [^healthy_kafka_producer_thread.csv].

I've also queried the kafka's _transaction_state topic for the problematic 
transaction and 
[here|https://gist.github.com/sharonx/51e300e383455f016be1a95f0c855b97] are the 
messages in the topic.  

I'd appreciate any pointers or potential ways to explain the situation. 


> kafka sink under EO sometimes is unable to recover from a checkpoint
> --------------------------------------------------------------------
>
>                 Key: FLINK-32196
>                 URL: https://issues.apache.org/jira/browse/FLINK-32196
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.6.4, 1.15.4
>            Reporter: Sharon Xie
>            Priority: Major
>         Attachments: healthy_kafka_producer_thread.csv, 
> kafka_producer_network_thread_log.csv, kafka_sink_oom_logs.csv
>
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable 
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and 
> eventually runs OOM. The cause for OOM is that there is a kafka producer 
> thread leak.
> Here is our best *hypothesis* for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering 
> transactions upon recovery 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the 
> `TransactionAborter` doesn't abort those transactions 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function. 
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch() 
> == 0` which is why we are seeing a producer thread leak as the recovery gets 
> stuck in this for loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to