Sharon Xie created FLINK-32196: ---------------------------------- Summary: KafkaWriter recovery doesn't abort lingering transactions under EO semantic Key: FLINK-32196 URL: https://issues.apache.org/jira/browse/FLINK-32196 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.15.4, 1.6.4 Reporter: Sharon Xie
We are seeing an issue where a Flink job using kafka sink under EO is unable to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and eventually runs OOM. The cause for OOM is that there is a kafka producer thread leak. Here is our best hypothesis for the issue. In `KafkaWriter` under the EO semantic, it intends to abort lingering transactions upon recovery [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179] However, the actual implementation to abort those transactions in the `TransactionAborter` doesn't abort those transactions [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124] Specifically `producer.abortTransaction()` is never called in that function. Instead it calls `producer.flush()`. Also The function is in for loop that only breaks when `producer.getEpoch() == 0` which is why we are seeing a producer thread leak as the recovery gets stuck in this for loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)