[jira] [Commented] (FLINK-32196) KafkaWriter recovery doesn't abort lingering transactions under the EO semantic

2023-05-25 Thread Sharon Xie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726405#comment-17726405
 ] 

Sharon Xie commented on FLINK-32196:


Thank you [~tzulitai] for the quick response and information.
{quote}Are you actually observing that there are lingering transactions not 
being aborted in Kafka? Or was that a speculation based on not seeing a 
abortTransaction() in the code?
{quote}
This is a speculation. So this may not be the root cause of the issue I'm 
seeing.
{quote}If there are actually lingering transactions in Kafka after restore, do 
they get timeout by Kafka after transaction.timeout.ms? Or are they lingering 
beyond the timeout threshold?
{quote}
What I've observed is that the subtask gets stuck in the initializing state and 
there is a growing number of kafka-producer-network-thread and the job 
eventually runs OOM. In the debug log, I've found that the transaction thread 
never progress beyond “Transition from state INITIALIZING to READY” and 
eventually times out. An example thread log is 
[^kafka_producer_network_thread_log.csv] . A healthy transaction goes from 
INITIALIZING to READY- to COMMITTING_TRANSACTION to READY in the log and the 
thread doesn't exit - example [^healthy_kafka_producer_thread.csv].

I've also queried the kafka's _transaction_state topic for the problematic 
transaction and 
[here|https://gist.github.com/sharonx/51e300e383455f016be1a95f0c855b97] are the 
messages in the topic.  

I'd appreciate any pointers or potential ways to explain the situation. 


> KafkaWriter recovery doesn't abort lingering transactions under the EO 
> semantic
> ---
>
> Key: FLINK-32196
> URL: https://issues.apache.org/jira/browse/FLINK-32196
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4, 1.15.4
>Reporter: Sharon Xie
>Priority: Major
> Attachments: healthy_kafka_producer_thread.csv, 
> kafka_producer_network_thread_log.csv
>
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable 
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and 
> eventually runs OOM. The cause for OOM is that there is a kafka producer 
> thread leak.
> Here is our best hypothesis for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering 
> transactions upon recovery 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the 
> `TransactionAborter` doesn't abort those transactions 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function. 
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch() 
> == 0` which is why we are seeing a producer thread leak as the recovery gets 
> stuck in this for loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32196) KafkaWriter recovery doesn't abort lingering transactions under the EO semantic

2023-05-25 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726390#comment-17726390
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32196:
-

In terms of the lingering transactions you are observing, a few questions:
 # Are you actually observing that there are lingering transactions not being 
aborted in Kafka? Or was that a speculation based on not seeing a 
{{abortTransaction()}} in the code?
 # If there are actually lingering transactions in Kafka after restore, do they 
get timeout by Kafka after {{{}transaction.timeout.ms{}}}? Or are they 
lingering beyond the timeout threshold?

> KafkaWriter recovery doesn't abort lingering transactions under the EO 
> semantic
> ---
>
> Key: FLINK-32196
> URL: https://issues.apache.org/jira/browse/FLINK-32196
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4, 1.15.4
>Reporter: Sharon Xie
>Priority: Major
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable 
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and 
> eventually runs OOM. The cause for OOM is that there is a kafka producer 
> thread leak.
> Here is our best hypothesis for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering 
> transactions upon recovery 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the 
> `TransactionAborter` doesn't abort those transactions 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function. 
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch() 
> == 0` which is why we are seeing a producer thread leak as the recovery gets 
> stuck in this for loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32196) KafkaWriter recovery doesn't abort lingering transactions under the EO semantic

2023-05-25 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726385#comment-17726385
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32196:
-

Hi [~sharonxr55], the {{abortTransactionOfSubtask}} you posted aborts 
transactions by relying on the fact that when you call `initTransactions()`, 
Kafka automatically aborts any old ongoing transactions under the same 
{{{}transactional.id{}}}.

Could you re-elaborate the producer leak? As far as I can tell, the loop is 
reusing the same producer instance; on every loop entry, the same producer 
instance is reset with a new {{transactional.id}} and called 
{{initTransactions()}} to abort the transaction.

> KafkaWriter recovery doesn't abort lingering transactions under the EO 
> semantic
> ---
>
> Key: FLINK-32196
> URL: https://issues.apache.org/jira/browse/FLINK-32196
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4, 1.15.4
>Reporter: Sharon Xie
>Priority: Major
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable 
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and 
> eventually runs OOM. The cause for OOM is that there is a kafka producer 
> thread leak.
> Here is our best hypothesis for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering 
> transactions upon recovery 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the 
> `TransactionAborter` doesn't abort those transactions 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function. 
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch() 
> == 0` which is why we are seeing a producer thread leak as the recovery gets 
> stuck in this for loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)