[jira] [Commented] (FLINK-32196) KafkaWriter recovery doesn't abort lingering transactions under the EO semantic
[ https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726405#comment-17726405 ] Sharon Xie commented on FLINK-32196: Thank you [~tzulitai] for the quick response and information. {quote}Are you actually observing that there are lingering transactions not being aborted in Kafka? Or was that a speculation based on not seeing a abortTransaction() in the code? {quote} This is a speculation. So this may not be the root cause of the issue I'm seeing. {quote}If there are actually lingering transactions in Kafka after restore, do they get timeout by Kafka after transaction.timeout.ms? Or are they lingering beyond the timeout threshold? {quote} What I've observed is that the subtask gets stuck in the initializing state and there is a growing number of kafka-producer-network-thread and the job eventually runs OOM. In the debug log, I've found that the transaction thread never progress beyond “Transition from state INITIALIZING to READY” and eventually times out. An example thread log is [^kafka_producer_network_thread_log.csv] . A healthy transaction goes from INITIALIZING to READY- to COMMITTING_TRANSACTION to READY in the log and the thread doesn't exit - example [^healthy_kafka_producer_thread.csv]. I've also queried the kafka's _transaction_state topic for the problematic transaction and [here|https://gist.github.com/sharonx/51e300e383455f016be1a95f0c855b97] are the messages in the topic. I'd appreciate any pointers or potential ways to explain the situation. > KafkaWriter recovery doesn't abort lingering transactions under the EO > semantic > --- > > Key: FLINK-32196 > URL: https://issues.apache.org/jira/browse/FLINK-32196 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.15.4 >Reporter: Sharon Xie >Priority: Major > Attachments: healthy_kafka_producer_thread.csv, > kafka_producer_network_thread_log.csv > > > We are seeing an issue where a Flink job using kafka sink under EO is unable > to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and > eventually runs OOM. The cause for OOM is that there is a kafka producer > thread leak. > Here is our best hypothesis for the issue. > In `KafkaWriter` under the EO semantic, it intends to abort lingering > transactions upon recovery > [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179] > However, the actual implementation to abort those transactions in the > `TransactionAborter` doesn't abort those transactions > [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124] > Specifically `producer.abortTransaction()` is never called in that function. > Instead it calls `producer.flush()`. > Also The function is in for loop that only breaks when `producer.getEpoch() > == 0` which is why we are seeing a producer thread leak as the recovery gets > stuck in this for loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32196) KafkaWriter recovery doesn't abort lingering transactions under the EO semantic
[ https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726390#comment-17726390 ] Tzu-Li (Gordon) Tai commented on FLINK-32196: - In terms of the lingering transactions you are observing, a few questions: # Are you actually observing that there are lingering transactions not being aborted in Kafka? Or was that a speculation based on not seeing a {{abortTransaction()}} in the code? # If there are actually lingering transactions in Kafka after restore, do they get timeout by Kafka after {{{}transaction.timeout.ms{}}}? Or are they lingering beyond the timeout threshold? > KafkaWriter recovery doesn't abort lingering transactions under the EO > semantic > --- > > Key: FLINK-32196 > URL: https://issues.apache.org/jira/browse/FLINK-32196 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.15.4 >Reporter: Sharon Xie >Priority: Major > > We are seeing an issue where a Flink job using kafka sink under EO is unable > to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and > eventually runs OOM. The cause for OOM is that there is a kafka producer > thread leak. > Here is our best hypothesis for the issue. > In `KafkaWriter` under the EO semantic, it intends to abort lingering > transactions upon recovery > [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179] > However, the actual implementation to abort those transactions in the > `TransactionAborter` doesn't abort those transactions > [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124] > Specifically `producer.abortTransaction()` is never called in that function. > Instead it calls `producer.flush()`. > Also The function is in for loop that only breaks when `producer.getEpoch() > == 0` which is why we are seeing a producer thread leak as the recovery gets > stuck in this for loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32196) KafkaWriter recovery doesn't abort lingering transactions under the EO semantic
[ https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726385#comment-17726385 ] Tzu-Li (Gordon) Tai commented on FLINK-32196: - Hi [~sharonxr55], the {{abortTransactionOfSubtask}} you posted aborts transactions by relying on the fact that when you call `initTransactions()`, Kafka automatically aborts any old ongoing transactions under the same {{{}transactional.id{}}}. Could you re-elaborate the producer leak? As far as I can tell, the loop is reusing the same producer instance; on every loop entry, the same producer instance is reset with a new {{transactional.id}} and called {{initTransactions()}} to abort the transaction. > KafkaWriter recovery doesn't abort lingering transactions under the EO > semantic > --- > > Key: FLINK-32196 > URL: https://issues.apache.org/jira/browse/FLINK-32196 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.15.4 >Reporter: Sharon Xie >Priority: Major > > We are seeing an issue where a Flink job using kafka sink under EO is unable > to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and > eventually runs OOM. The cause for OOM is that there is a kafka producer > thread leak. > Here is our best hypothesis for the issue. > In `KafkaWriter` under the EO semantic, it intends to abort lingering > transactions upon recovery > [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179] > However, the actual implementation to abort those transactions in the > `TransactionAborter` doesn't abort those transactions > [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124] > Specifically `producer.abortTransaction()` is never called in that function. > Instead it calls `producer.flush()`. > Also The function is in for loop that only breaks when `producer.getEpoch() > == 0` which is why we are seeing a producer thread leak as the recovery gets > stuck in this for loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)