[jira] [Commented] (FLINK-32196) kafka sink under EO sometimes is unable to recover from a checkpoint

2023-05-25 Thread Sharon Xie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726479#comment-17726479
 ] 

Sharon Xie commented on FLINK-32196:


[~tzulitai]Thanks for analyzing the logs. As additional context, this happened 
after a security patch on the broker side. Though most of the jobs auto 
recovered, we've found a couple that got stuck in the recovery step. So there 
is a chance that this is caused by an issue from the broker side - eg: some 
broker side transaction state is lost or bad partition state. Any possible 
explanation here?

A couple other questions.

> These are the ones to abort in step 2. Initializing the transaction 
> automatically aborts the transaction, as I mentioned in earlier comments. So 
> I believe this is also expected.

In this case, does the transaction state 
[messages|https://gist.github.com/sharonx/51e300e383455f016be1a95f0c855b97] in 
kafka broker look right to you? It seems there is no change in those messages 
except the epoch and txnLastUpdateTimestamp. I guess the idea is to call 
transaction init with the old txnId and just let it time out. But there is some 
heart beat to update the transaction? Also can you please explain a bit about 
why abortTransaction is not used?

> What is NOT expected, though, is the bunch of kafka-producer-network-thread 
> threads being spawned per TID to abort in step 2.
Is it common to have so many lingering transactions that need to abort? The job 
is not a high throughput one. About 3 records/sec at the checkpointing interval 
= 10sec. It takes ~30min to run oom and I feel it's weird that the kafka sink 
would need so long to recover.

> kafka sink under EO sometimes is unable to recover from a checkpoint
> 
>
> Key: FLINK-32196
> URL: https://issues.apache.org/jira/browse/FLINK-32196
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4, 1.15.4
>Reporter: Sharon Xie
>Priority: Major
> Attachments: healthy_kafka_producer_thread.csv, 
> kafka_producer_network_thread_log.csv, kafka_sink_oom_logs.csv
>
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable 
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and 
> eventually runs OOM. The cause for OOM is that there is a kafka producer 
> thread leak.
> Here is our best *hypothesis* for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering 
> transactions upon recovery 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the 
> `TransactionAborter` doesn't abort those transactions 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function. 
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch() 
> == 0` which is why we are seeing a producer thread leak as the recovery gets 
> stuck in this for loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32196) kafka sink under EO sometimes is unable to recover from a checkpoint

2023-05-25 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726426#comment-17726426
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32196:
-

[~sharonxr55] a few things to clarify first:
 # When a KafkaSink subtask restores, there are some transaction that needs to 
be committed (i.e. ones that are written in the Flink checkpoint), and
 # All other transactions are considered "lingering" which should be aborted 
(which is done by the loop you referenced).
 # Only after the above 2 step completes, the subtask initialization is 
considered complete.

So:

> A healthy transaction goes from INITIALIZING to READY- to 
> COMMITTING_TRANSACTION to READY in the log

I believe these transactions are the ones from step 1. Which is expected.

> I've found that the transaction thread never progress beyond “Transition from 
> state INITIALIZING to READY”

These are the ones to abort in step 2. Initializing the transaction 
automatically aborts the transaction, as I mentioned in earlier comments. So I 
believe this is also expected.

 

What is NOT expected, though, is the bunch of {{kafka-producer-network-thread}} 
threads being spawned per TID to abort in step 2. Thanks for sharing the logs 
btw, it was helpful figuring out what was going on!


Kafka's producer only spawns a single {{kafka-producer-network-thread}} per 
instance. And the abort loop for lingering transactions always tries to reuse 
the same producer instance without creating new ones, so I would expect to only 
see a single {{kafka-producer-network-thread}} throughout the whole loop. This 
doesn't seem to be the case. From the naming of these threads, it seems like 
for every TID that the KafkaSink is trying to abort, a new 
{{kafka-producer-network-thread}} thread is spawned:

This is hinted by the naming of the threads (see the last portion of the thread 
name, where it's strictly incrementing; that's the TIDs of transactions the 
KafkaSink is trying to abort)
{code}
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708579"
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708580”
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708581"
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708582”
{code}

The only way I see this happen is if the loop is creating new producer 
instances per attempted TID, but it doesn't make sense given the code. It could 
be something funny with how the KafkaSink is using Java reflections to reset 
the TID on the reused producer, but I'll need to spend some time to look into 
this a bit deeper.

> kafka sink under EO sometimes is unable to recover from a checkpoint
> 
>
> Key: FLINK-32196
> URL: https://issues.apache.org/jira/browse/FLINK-32196
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4, 1.15.4
>Reporter: Sharon Xie
>Priority: Major
> Attachments: healthy_kafka_producer_thread.csv, 
> kafka_producer_network_thread_log.csv, kafka_sink_oom_logs.csv
>
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable 
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and 
> eventually runs OOM. The cause for OOM is that there is a kafka producer 
> thread leak.
> Here is our best *hypothesis* for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering 
> transactions upon recovery 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the 
> `TransactionAborter` doesn't abort those transactions 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function. 
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch() 
> == 0` which is why we are seeing a producer thread leak as the recovery gets 
> stuck in this for loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)