[ 
https://issues.apache.org/jira/browse/FLINK-34554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17824769#comment-17824769
 ] 

Hilmi Al Fatih edited comment on FLINK-34554 at 3/8/24 12:21 PM:
-----------------------------------------------------------------

I see. Let me get back to you by reproducing the behavior with latest Kafka 
version.

I think I can reproduce it by trying to overwhelm the broker with transactions 
that has different transactionalIds.
If I can see a heap increase that cannot be cleaned within the 
[transaction.max.timeout.ms|https://kafka.apache.org/documentation/#brokerconfigs_transaction.max.timeout.ms]
 interval, that should be an indication if the KafkaSink behavior can cause 
issue even with latest kafka.

 
{quote}I do know some other rather large Flink-Kafka setups where the new 
KafkaSink runs without any problems, both for Flink and for Kafka
{quote}
Excuse me, but I am just wondering the those large setup handles with similar 
condition, that is:
 * running EXACTLY_ONCE
 * have many sink subtasks
 * run with short checkpoint interval 

I think that problem won't appear if any of the conditions above is absent

 


was (Author: JIRAUSER283529):
I see. Let me get back to you by reproducing the behavior with latest Kafka 
version.

I think I can reproduce it by trying to overwhelm the broker with transactions 
that has different transactionalIds.
If I can see a heap increase that cannot be cleaned within the 
[transaction.max.timeout.ms|https://kafka.apache.org/documentation/#brokerconfigs_transaction.max.timeout.ms]
 interval, that should be an indication if the KafkaSink behavior can cause 
issue.

 
{quote}I do know some other rather large Flink-Kafka setups where the new 
KafkaSink runs without any problems, both for Flink and for Kafka
{quote}
Excuse me, but I am just wondering the those large setup handles with similar 
condition, that is:
 * running EXACTLY_ONCE
 * have many sink subtasks
 * run with short checkpoint interval 

I think that problem won't appear if any of the conditions above is absent

 

> Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created 
> transactionalId per checkpoint
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34554
>                 URL: https://issues.apache.org/jira/browse/FLINK-34554
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.16.3, 1.17.2, 1.18.1
>            Reporter: Hilmi Al Fatih
>            Priority: Major
>         Attachments: image (4).png, image (5).png
>
>
> Flink version: 1.17.1
> Kafka Broker version: 2.7.1 * 4 GB heap memory for each
> Hi, We recently had an outage in our production system after we perform a 
> Flink kafka-connector API upgrade. To give a brief context, our application 
> is a simple kafka-to-kafka pipeline with minimal processing. We run in 
> EXACTLY_ONCE mode, thus kafka transaction is involved.
> Our application runs with total around 350 sink subtask. Checkpoint period 
> was set to 5 seconds to avoid blocking {{read_committed}} consumers too long. 
> We recently performed an upgrade with the following details:
> Previous state:
>  * Flink version: 1.14.4
>  * Broker version: 2.7.1
>  * kafka connector API: FlinkKafkaProducer
> Update to:
>  * Flink version: 1.17.1
>  * Broker version: 2.7.1
>  * kafka connector API: KafkaSink
> Around 10 hours after the deployment, our kafka broker started to failing 
> with OOM error. Heap dump entries are dominated by the ProducerStateEntry 
> records.
> Our investigation leads to finding the total implementation change between 
> FlinkKafkaProducer and KafkaSink.
>  * KafkaSink generate different transactionalId for each checkpoint,
>  * FlinkKafkaProducer uses constant set of transactionalId pool.
> With this behavior, KafkaSink seemed to exhaust our broker heap very fast and 
> the ProducerStateEntry will only expire after 
> [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , 
> which by default is set to 7 days.  
> ([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677],
>  
> [ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268],
>  
> [ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207])
> For our job, it means it creates roughly:
>  * 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 
> 2,520,000
>  * 7 days) ~ 42mil entries.
> Attached below is the number of ProducerStateEntry entries of heap dump when 
> it is OOM:
>  * 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries.
> There are several things that come up in our mind to mitigate the drawbacks 
> such as:
>  * reduce the number of subtasks, so it reduces the number of transactionalId
>  * Enlarge the checkpoint period to reduce the newly generated 
> transactionalId rate.
>  * Shorten 
> [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to 
> expire the unused transactionalId soon.
>  * Increase the broker heap
> However, above mitigation might be too cumbersome and need careful tuning 
> which harm our flexibility.In addition, due to the lack of maintaining 
> lingering transaction state, TransactionAborter seems to abort old 
> transaction naively. We might be accidentally (or purposefully) reuse the 
> same transactionalIdPrefix and start the counter from 0. In that case, if the 
> old transactionalId happens to have epoch >0, it will keep looping aborting 
> the nonexistent transactions up to the latest checkpoint counter (which may 
> be too big) and make the job stuck.
> Btw, I am aware that in Flink 2.0, you guys are putting a lot of effort on 
> creating better integration with Kafka transaction 
> ([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]).
>  In FLIP-319, it mentions something about TID pooling. However, it is seem 
> that there is no relevant page yet for it, so I wonder whether there are any 
> concrete plan already that I can follow, or if there is something I can 
> contribute to, I will be really happy to help.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to