[
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726426#comment-17726426
]
Tzu-Li (Gordon) Tai commented on FLINK-32196:
-
[~sharonxr55] a few things to clarify first:
# When a KafkaSink subtask restores, there are some transaction that needs to
be committed (i.e. ones that are written in the Flink checkpoint), and
# All other transactions are considered "lingering" which should be aborted
(which is done by the loop you referenced).
# Only after the above 2 step completes, the subtask initialization is
considered complete.
So:
> A healthy transaction goes from INITIALIZING to READY- to
> COMMITTING_TRANSACTION to READY in the log
I believe these transactions are the ones from step 1. Which is expected.
> I've found that the transaction thread never progress beyond “Transition from
> state INITIALIZING to READY”
These are the ones to abort in step 2. Initializing the transaction
automatically aborts the transaction, as I mentioned in earlier comments. So I
believe this is also expected.
What is NOT expected, though, is the bunch of {{kafka-producer-network-thread}}
threads being spawned per TID to abort in step 2. Thanks for sharing the logs
btw, it was helpful figuring out what was going on!
Kafka's producer only spawns a single {{kafka-producer-network-thread}} per
instance. And the abort loop for lingering transactions always tries to reuse
the same producer instance without creating new ones, so I would expect to only
see a single {{kafka-producer-network-thread}} throughout the whole loop. This
doesn't seem to be the case. From the naming of these threads, it seems like
for every TID that the KafkaSink is trying to abort, a new
{{kafka-producer-network-thread}} thread is spawned:
This is hinted by the naming of the threads (see the last portion of the thread
name, where it's strictly incrementing; that's the TIDs of transactions the
KafkaSink is trying to abort)
{code}
“kafka-producer-network-thread |
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708579"
“kafka-producer-network-thread |
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708580”
“kafka-producer-network-thread |
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708581"
“kafka-producer-network-thread |
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708582”
{code}
The only way I see this happen is if the loop is creating new producer
instances per attempted TID, but it doesn't make sense given the code. It could
be something funny with how the KafkaSink is using Java reflections to reset
the TID on the reused producer, but I'll need to spend some time to look into
this a bit deeper.
> kafka sink under EO sometimes is unable to recover from a checkpoint
>
>
> Key: FLINK-32196
> URL: https://issues.apache.org/jira/browse/FLINK-32196
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
>Affects Versions: 1.6.4, 1.15.4
>Reporter: Sharon Xie
>Priority: Major
> Attachments: healthy_kafka_producer_thread.csv,
> kafka_producer_network_thread_log.csv, kafka_sink_oom_logs.csv
>
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and
> eventually runs OOM. The cause for OOM is that there is a kafka producer
> thread leak.
> Here is our best *hypothesis* for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering
> transactions upon recovery
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the
> `TransactionAborter` doesn't abort those transactions
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function.
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch()
> == 0` which is why we are seeing a producer thread leak as the recovery gets
> stuck in this for loop.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)