Sharon Xie created FLINK-32196:
----------------------------------

             Summary: KafkaWriter recovery doesn't abort lingering transactions 
under EO semantic
                 Key: FLINK-32196
                 URL: https://issues.apache.org/jira/browse/FLINK-32196
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.15.4, 1.6.4
            Reporter: Sharon Xie


We are seeing an issue where a Flink job using kafka sink under EO is unable to 
recover from a checkpoint. The sink task stuck at `INITIALIZING` state and 
eventually runs OOM. The cause for OOM is that there is a kafka producer thread 
leak.

Here is our best hypothesis for the issue.
In `KafkaWriter` under the EO semantic, it intends to abort lingering 
transactions upon recovery 
[https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]

However, the actual implementation to abort those transactions in the 
`TransactionAborter` doesn't abort those transactions 
[https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]

Specifically `producer.abortTransaction()` is never called in that function. 
Instead it calls `producer.flush()`.

Also The function is in for loop that only breaks when `producer.getEpoch() == 
0` which is why we are seeing a producer thread leak as the recovery gets stuck 
in this for loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to