[ 
https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-25728.
----------------------------------
    Fix Version/s: 1.15.0
                   1.13.6
                   1.14.4
       Resolution: Fixed

Merged to master as a7eadf57e42^ and a7eadf57e42
Merged to release-1.14 as 761a4623dda^ and 761a4623dda
Merged to release-1.13 as 9653999a27b^ and 9653999a27b

> Potential memory leaks in StreamMultipleInputProcessor
> ------------------------------------------------------
>
>                 Key: FLINK-25728
>                 URL: https://issues.apache.org/jira/browse/FLINK-25728
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.12.5, 1.15.0, 1.13.5, 1.14.2
>            Reporter: pc wang
>            Assignee: pc wang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.15.0, 1.13.6, 1.14.4
>
>         Attachments: flink-completablefuture-issue.tar.xz, 
> image-2022-01-20-18-43-32-816.png
>
>
> We have an application that contains a broadcast process stage. The 
> none-broadcast input has roughly 10 million messages per second, and the 
> broadcast side is some kind of control stream, rarely has message follow 
> through. 
> After several hours of running, the TaskManager will run out of heap memory 
> and restart. We reviewed the application code without finding any relevant 
> issues.
> We found that the running to crash time was roughly the same. Then we make a 
> heap dump before the crash and found mass `CompletableFuture$UniRun` 
> instances. 
> These `CompletableFuture$UniRun` instances consume several gigabytes memories.
>  
> The following pic is from the heap dump we get from a mock testing stream 
> with the same scenario.
> !image-2022-01-20-18-43-32-816.png|width=1161,height=471!
>  
> After some source code research. We found that it might be caused by the 
> *StreamMultipleInputProcessor.getAvailableFuture()*.
> *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's 
> *availableFuture* got completed when any of it's input's *availableFuture* is 
> complete. 
> The current implementation create a new *CompletableFuture* and a new 
> *CompletableFuture$UniRun* append to delegate inputProcessor's 
> *avaiableFuture*.
> The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow 
> inputProcessor's *avaiableFuture*. 
> See the source code below.
> [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65]
> Because the *UniRun* holds the reference of outside 
> *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass 
> *CompletableFuture* instance which can not be recycled.
> We made some modifications to the 
> *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that 
> the issue is gone on our modified version. 
> We are willing to make a PR for this fix.
>  Heap Dump File [^flink-completablefuture-issue.tar.xz] 
> PS: This is a YourKit heap dump. may be not compatible HPROF files.
> [Sample Code to reproduce the 
> issue|https://github.com/wpc009/flink/blob/FLINK-25728/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/MultipleInputStreamMemoryIssueTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to