Hilmi Al Fatih created FLINK-34554:
--------------------------------------

             Summary: Using EXACTLY_ONCE with KafkaSink cause broker's OOM due 
to newly created transactionalId per checkpoint
                 Key: FLINK-34554
                 URL: https://issues.apache.org/jira/browse/FLINK-34554
             Project: Flink
          Issue Type: New Feature
          Components: Connectors / Kafka
    Affects Versions: 1.18.1, 1.17.2, 1.16.3
            Reporter: Hilmi Al Fatih
             Fix For: 1.18.1, 1.17.2, 1.16.3
         Attachments: image (4).png, image (5).png

Flink version: 1.17.1
Kafka Broker version: 2.7.1 * 4 GB heap memory for each

Hi,
We recently had an outage in our production system after we perform a Flink 
kafka-connector API upgrade. To give a brief context, our application is a 
simple kafka-to-kafka pipeline with minimal processing. We run in EXACTLY_ONCE 
mode, thus kafka transaction is involved.
Our application runs with total around 350 sink subtask. Checkpoint period was 
set to 5 seconds to avoid blocking {{read_committed}} consumers too long. We 
recently performed an upgrade with the following details: * Previous state:

 * Flink version: 1.14.4
 * Broker version: 2.7.1
 * kafka connector API: FlinkKafkaProducer

 * Update to:

 * Flink version: 1.17.1
 * Broker version: 2.7.1
 * kafka connector API: KafkaSink

Around 10 hours after the deployment, our kafka broker started to failing with 
OOM error. Heap dump entries are dominated by the ProducerStateEntry records.
Our investigation leads to finding the total implementation change between 
FlinkKafkaProducer and KafkaSink. * KafkaSink generate different 
transactionalId for each checkpoint,
 * FlinkKafkaProducer uses constant set of transactionalId pool.

With this behavior, KafkaSink seemed to exhaust our broker heap very fast and 
the ProducerStateEntry will only expire after 
[transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , which 
by default is set to 7 days.  
([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677],
 
[ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268],
 
[ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207])For
 our job, it means it creates roughly:
10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 2,520,000
7 days) ~ 42mil entries.Here is attached the number of ProducerStateEntry 
entries of heap dump when it is OOM:
505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries.There are 
several things that come up in our mind to mitigate the drawbacks such as: * 
reduce the number of subtasks, so it reduces the number of transactionalId
 * Enlarge the checkpoint period to reduce the newly generated transactionalId 
rate.
 * Shorten 
[transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to 
expire the unused transactionalId soon.
 * Increase the broker heap

However, above mitigation might be too cumbersome and need careful tuning which 
harm our flexibility.In addition, due to the lack of maintaining lingering 
transaction state, TransactionAborter seems to abort old transaction naively. 
We might be accidentally (or purposefully) reuse the same transactionalIdPrefix 
and start the counter from 0. In that case, if the old transactionalId happens 
to have epoch >0, it will keep looping aborting the nonexistent transactions up 
to the latest checkpoint counter (which may be too big) and make the job 
stuck.Btw, I am aware that in Flink 2.0, you guys are putting a lot of effort 
on creating better integration with Kafka transaction 
([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]).
 In FLIP-319, it mentions something about TID pooling. However, it is seem that 
there is no relevant page yet for it, so I wonder whether there are any 
concrete plan already that I can follow, or if there is something I can 
contribute to, I will be really happy to help.
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to