Lauri Suurväli created FLINK-33231:
--------------------------------------

             Summary: Memory leak in KafkaSourceReader if no data in consumed 
topic
                 Key: FLINK-33231
                 URL: https://issues.apache.org/jira/browse/FLINK-33231
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.17.1
            Reporter: Lauri Suurväli
         Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png

*Problem description*

Our Flink streaming job TaskManager heap gets full when the job has nothing to 
consume and process.

It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
When there are no messages in the source topic the TaskManager heap usage 
starts increasing until the job exits after receiving a SIGTERM signal. We are 
running the job on AWS EMR with YARN.

The problems with the TaskManager heap usage do not occur when there is data to 
process. It's also worth noting that sending a single message to the source 
topic of a streaming job that has been sitting idle and suffers from the memory 
leak will cause the heap to be cleared. However it does not resolve the problem 
since the heap usage will start increasing immediately after processing the 
message.

!Screenshot 2023-10-10 at 12.49.37.png!

TaskManager heap used percentage is calculated by 

 
{code:java}
flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
 

 

 I was able to take heap dumps of the TaskManager processes during a high heap 
usage percentage. Heap dump analysis detected 912,355 instances of 
java.util.HashMap empty collections retaining >= 43,793,040 bytes.

!Screenshot 2023-10-09 at 14.13.43.png!

The retained heap seemed to be located at:

 
{code:java}
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
 

!Screenshot 2023-10-09 at 13.02.34.png!

 

*Possible hints:*

An empty HashMap is added during the snapshotState method to offsetsToCommit 
map if it does not already exist for the given checkpoint. [KafkaSourceReader 
line 
107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]

 
{code:java}
Map<TopicPartition, OffsetAndMetadata> offsetsMap =
        offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
{code}
 

If the startingOffset for the given split is >= 0 then a new entry would be 
added to the map from the previous step. [KafkaSourceReader line 
113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
{code:java}
if (split.getStartingOffset() >= 0) {
    offsetsMap.put(
        split.getTopicPartition(),
        new OffsetAndMetadata(split.getStartingOffset()));
}{code}
If the starting offset is smaller than 0 then this would leave the offsetMap 
created in step 1 empty. We can see from the logs that the startingOffset is -3 
when the splits are added to the reader.

 
{code:java}
Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
[Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
-9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
 

 

The offsetsToCommit map is cleaned from entries once they have been committed 
to Kafka which happens during the callback function that is passed to the 
KafkaSourceFetcherManager.commitOffsets method in 
KafkaSourceReader.notifyCheckpointComplete method.

However if the committedPartitions is empty for the given checkpoint, then the 
KafkaSourceFetcherManager.commitOffsets method returns.  
[KafkaSourceFetcherManager line 
78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78]
{code:java}
if (offsetsToCommit.isEmpty()) {
    return;
} {code}
We can observe from the logs that indeed an empty map is encountered at this 
step:
{code:java}
Committing offsets {}{code}
*Conclusion*

It seems that an empty map gets added per each checkpoint to offsetsToCommit 
map. Since the startingOffset in our case is -3 then the empty map never gets 
filled. During the offset commit phase the offsets for these checkpoints are 
ignored, since there is nothing to commit, however there isn't any cleanup 
either so the empty maps keep accumulating. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to