[jira] [Commented] (FLINK-25728) Potential memory leaks in StreamMultipleInputProcessor

2022-01-28 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17483792#comment-17483792
 ] 

Piotr Nowojski commented on FLINK-25728:


After an offline discussion with [~chesnay], we agreed to merge a fix for this 
bug without a test coverage. After the feature freeze, we will try to provide 
stress test coverage for this issue. I've created FLINK-25869 for this purpose.

> Potential memory leaks in StreamMultipleInputProcessor
> --
>
> Key: FLINK-25728
> URL: https://issues.apache.org/jira/browse/FLINK-25728
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.12.5, 1.15.0, 1.13.5, 1.14.2
>Reporter: pc wang
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-completablefuture-issue.tar.xz, 
> image-2022-01-20-18-43-32-816.png
>
>
> We have an application that contains a broadcast process stage. The 
> none-broadcast input has roughly 10 million messages per second, and the 
> broadcast side is some kind of control stream, rarely has message follow 
> through. 
> After several hours of running, the TaskManager will run out of heap memory 
> and restart. We reviewed the application code without finding any relevant 
> issues.
> We found that the running to crash time was roughly the same. Then we make a 
> heap dump before the crash and found mass `CompletableFuture$UniRun` 
> instances. 
> These `CompletableFuture$UniRun` instances consume several gigabytes memories.
>  
> The following pic is from the heap dump we get from a mock testing stream 
> with the same scenario.
> !image-2022-01-20-18-43-32-816.png|width=1161,height=471!
>  
> After some source code research. We found that it might be caused by the 
> *StreamMultipleInputProcessor.getAvailableFuture()*.
> *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's 
> *availableFuture* got completed when any of it's input's *availableFuture* is 
> complete. 
> The current implementation create a new *CompletableFuture* and a new 
> *CompletableFuture$UniRun* append to delegate inputProcessor's 
> *avaiableFuture*.
> The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow 
> inputProcessor's *avaiableFuture*. 
> See the source code below.
> [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65]
> Because the *UniRun* holds the reference of outside 
> *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass 
> *CompletableFuture* instance which can not be recycled.
> We made some modifications to the 
> *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that 
> the issue is gone on our modified version. 
> We are willing to make a PR for this fix.
>  Heap Dump File [^flink-completablefuture-issue.tar.xz] 
> PS: This is a YourKit heap dump. may be not compatible HPROF files.
> [Sample Code to reproduce the 
> issue|https://github.com/wpc009/flink/blob/FLINK-25728/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/MultipleInputStreamMemoryIssueTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25728) Potential memory leaks in StreamMultipleInputProcessor

2022-01-25 Thread pc wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17481956#comment-17481956
 ] 

pc wang commented on FLINK-25728:
-

Ok.


> Potential memory leaks in StreamMultipleInputProcessor
> --
>
> Key: FLINK-25728
> URL: https://issues.apache.org/jira/browse/FLINK-25728
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.12.5, 1.15.0, 1.13.5, 1.14.2
>Reporter: pc wang
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-completablefuture-issue.tar.xz, 
> image-2022-01-20-18-43-32-816.png
>
>
> We have an application that contains a broadcast process stage. The 
> none-broadcast input has roughly 10 million messages per second, and the 
> broadcast side is some kind of control stream, rarely has message follow 
> through. 
> After several hours of running, the TaskManager will run out of heap memory 
> and restart. We reviewed the application code without finding any relevant 
> issues.
> We found that the running to crash time was roughly the same. Then we make a 
> heap dump before the crash and found mass `CompletableFuture$UniRun` 
> instances. 
> These `CompletableFuture$UniRun` instances consume several gigabytes memories.
>  
> The following pic is from the heap dump we get from a mock testing stream 
> with the same scenario.
> !image-2022-01-20-18-43-32-816.png|width=1161,height=471!
>  
> After some source code research. We found that it might be caused by the 
> *StreamMultipleInputProcessor.getAvailableFuture()*.
> *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's 
> *availableFuture* got completed when any of it's input's *availableFuture* is 
> complete. 
> The current implementation create a new *CompletableFuture* and a new 
> *CompletableFuture$UniRun* append to delegate inputProcessor's 
> *avaiableFuture*.
> The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow 
> inputProcessor's *avaiableFuture*. 
> See the source code below.
> [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65]
> Because the *UniRun* holds the reference of outside 
> *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass 
> *CompletableFuture* instance which can not be recycled.
> We made some modifications to the 
> *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that 
> the issue is gone on our modified version. 
> We are willing to make a PR for this fix.
>  Heap Dump File [^flink-completablefuture-issue.tar.xz] 
> PS: This is a YourKit heap dump. may be not compatible HPROF files.
> [Sample Code to reproduce the 
> issue|https://github.com/wpc009/flink/blob/FLINK-25728/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/MultipleInputStreamMemoryIssueTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25728) Potential memory leaks in StreamMultipleInputProcessor

2022-01-25 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17481954#comment-17481954
 ] 

Piotr Nowojski commented on FLINK-25728:


Thanks for the confirmation. Good catch! (let's move the discussion to the PR 
about how to fix it)

> Potential memory leaks in StreamMultipleInputProcessor
> --
>
> Key: FLINK-25728
> URL: https://issues.apache.org/jira/browse/FLINK-25728
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.12.5, 1.15.0, 1.13.5, 1.14.2
>Reporter: pc wang
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-completablefuture-issue.tar.xz, 
> image-2022-01-20-18-43-32-816.png
>
>
> We have an application that contains a broadcast process stage. The 
> none-broadcast input has roughly 10 million messages per second, and the 
> broadcast side is some kind of control stream, rarely has message follow 
> through. 
> After several hours of running, the TaskManager will run out of heap memory 
> and restart. We reviewed the application code without finding any relevant 
> issues.
> We found that the running to crash time was roughly the same. Then we make a 
> heap dump before the crash and found mass `CompletableFuture$UniRun` 
> instances. 
> These `CompletableFuture$UniRun` instances consume several gigabytes memories.
>  
> The following pic is from the heap dump we get from a mock testing stream 
> with the same scenario.
> !image-2022-01-20-18-43-32-816.png|width=1161,height=471!
>  
> After some source code research. We found that it might be caused by the 
> *StreamMultipleInputProcessor.getAvailableFuture()*.
> *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's 
> *availableFuture* got completed when any of it's input's *availableFuture* is 
> complete. 
> The current implementation create a new *CompletableFuture* and a new 
> *CompletableFuture$UniRun* append to delegate inputProcessor's 
> *avaiableFuture*.
> The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow 
> inputProcessor's *avaiableFuture*. 
> See the source code below.
> [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65]
> Because the *UniRun* holds the reference of outside 
> *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass 
> *CompletableFuture* instance which can not be recycled.
> We made some modifications to the 
> *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that 
> the issue is gone on our modified version. 
> We are willing to make a PR for this fix.
>  Heap Dump File [^flink-completablefuture-issue.tar.xz] 
> PS: This is a YourKit heap dump. may be not compatible HPROF files.
> [Sample Code to reproduce the 
> issue|https://github.com/wpc009/flink/blob/FLINK-25728/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/MultipleInputStreamMemoryIssueTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25728) Potential memory leaks in StreamMultipleInputProcessor

2022-01-25 Thread pc wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17481944#comment-17481944
 ] 

pc wang commented on FLINK-25728:
-

That's correct. A bunch *CompletableFuture* instances and 
*CompletableFuture$UniRun* instances were hold by the 
_inputProcessors[i].getAvailableFuture()_  from the input that is idle.
Besides memory issue, when the idle input eventually has inputs, then the huge 
amount of *CompletableFuture$UniRun* will cause CPU spikes.

> Potential memory leaks in StreamMultipleInputProcessor
> --
>
> Key: FLINK-25728
> URL: https://issues.apache.org/jira/browse/FLINK-25728
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.12.5, 1.15.0, 1.13.5, 1.14.2
>Reporter: pc wang
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-completablefuture-issue.tar.xz, 
> image-2022-01-20-18-43-32-816.png
>
>
> We have an application that contains a broadcast process stage. The 
> none-broadcast input has roughly 10 million messages per second, and the 
> broadcast side is some kind of control stream, rarely has message follow 
> through. 
> After several hours of running, the TaskManager will run out of heap memory 
> and restart. We reviewed the application code without finding any relevant 
> issues.
> We found that the running to crash time was roughly the same. Then we make a 
> heap dump before the crash and found mass `CompletableFuture$UniRun` 
> instances. 
> These `CompletableFuture$UniRun` instances consume several gigabytes memories.
>  
> The following pic is from the heap dump we get from a mock testing stream 
> with the same scenario.
> !image-2022-01-20-18-43-32-816.png|width=1161,height=471!
>  
> After some source code research. We found that it might be caused by the 
> *StreamMultipleInputProcessor.getAvailableFuture()*.
> *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's 
> *availableFuture* got completed when any of it's input's *availableFuture* is 
> complete. 
> The current implementation create a new *CompletableFuture* and a new 
> *CompletableFuture$UniRun* append to delegate inputProcessor's 
> *avaiableFuture*.
> The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow 
> inputProcessor's *avaiableFuture*. 
> See the source code below.
> [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65]
> Because the *UniRun* holds the reference of outside 
> *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass 
> *CompletableFuture* instance which can not be recycled.
> We made some modifications to the 
> *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that 
> the issue is gone on our modified version. 
> We are willing to make a PR for this fix.
>  Heap Dump File [^flink-completablefuture-issue.tar.xz] 
> PS: This is a YourKit heap dump. may be not compatible HPROF files.
> [Sample Code to reproduce the 
> issue|https://github.com/wpc009/flink/blob/FLINK-25728/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/MultipleInputStreamMemoryIssueTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25728) Potential memory leaks in StreamMultipleInputProcessor

2022-01-25 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17481909#comment-17481909
 ] 

Piotr Nowojski commented on FLINK-25728:


Thanks for reporting and analysing the issue [~wpc009]. Could you maybe 
rephrase why 
{code:java}
inputProcessors[i]
.getAvailableFuture()
.thenRun(() -> anyInputAvailable.complete(null))
{code}
is causing this memory leak?

Or maybe let me rephrase it. Let's assume we have two inputs, one is flipping 
between available/unavailable status, the other is continuously unavailable. 
Now per each {{StreamMultipleInputProcessor#getAvailableFuture}} call, we will 
create one {{CompletableFuture anyInputAvailable}}, referenced by TWO 
instances {{CompletableFuture$UniRun}}, for 
{{inputProcessors[i].getAvailableFuture()}} from each of the inputs. 
{{anyInputAvailable}} will be returned, and it will be completed by first input 
sooner or later, waking up {{SteamTask}} in the process. That's fine. Now the 
problem is that each of those {{anyInputAvailable}} instance will be in this 
scenario referenced forever by the second input's 
{{inputProcessors[i].getAvailableFuture()}}. As long as the second input is not 
available, we are keep building up the memory leak. 

Did I understand the problem correctly?


> Potential memory leaks in StreamMultipleInputProcessor
> --
>
> Key: FLINK-25728
> URL: https://issues.apache.org/jira/browse/FLINK-25728
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.12.5, 1.15.0, 1.13.5, 1.14.2
>Reporter: pc wang
>Priority: Critical
>  Labels: pull-request-available
> Attachments: flink-completablefuture-issue.tar.xz, 
> image-2022-01-20-18-43-32-816.png
>
>
> We have an application that contains a broadcast process stage. The 
> none-broadcast input has roughly 10 million messages per second, and the 
> broadcast side is some kind of control stream, rarely has message follow 
> through. 
> After several hours of running, the TaskManager will run out of heap memory 
> and restart. We reviewed the application code without finding any relevant 
> issues.
> We found that the running to crash time was roughly the same. Then we make a 
> heap dump before the crash and found mass `CompletableFuture$UniRun` 
> instances. 
> These `CompletableFuture$UniRun` instances consume several gigabytes memories.
>  
> The following pic is from the heap dump we get from a mock testing stream 
> with the same scenario.
> !image-2022-01-20-18-43-32-816.png|width=1161,height=471!
>  
> After some source code research. We found that it might be caused by the 
> *StreamMultipleInputProcessor.getAvailableFuture()*.
> *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's 
> *availableFuture* got completed when any of it's input's *availableFuture* is 
> complete. 
> The current implementation create a new *CompletableFuture* and a new 
> *CompletableFuture$UniRun* append to delegate inputProcessor's 
> *avaiableFuture*.
> The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow 
> inputProcessor's *avaiableFuture*. 
> See the source code below.
> [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65]
> Because the *UniRun* holds the reference of outside 
> *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass 
> *CompletableFuture* instance which can not be recycled.
> We made some modifications to the 
> *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that 
> the issue is gone on our modified version. 
> We are willing to make a PR for this fix.
>  Heap Dump File [^flink-completablefuture-issue.tar.xz] 
> PS: This is a YourKit heap dump. may be not compatible HPROF files.
> [Sample Code to reproduce the 
> issue|https://github.com/wpc009/flink/blob/FLINK-25728/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/MultipleInputStreamMemoryIssueTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25728) Potential memory leaks in StreamMultipleInputProcessor

2022-01-24 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17481626#comment-17481626
 ] 

Till Rohrmann commented on FLINK-25728:
---

cc [~pnowojski]

> Potential memory leaks in StreamMultipleInputProcessor
> --
>
> Key: FLINK-25728
> URL: https://issues.apache.org/jira/browse/FLINK-25728
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.12.5, 1.15.0, 1.13.5, 1.14.2
>Reporter: pc wang
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: flink-completablefuture-issue.tar.xz, 
> image-2022-01-20-18-43-32-816.png
>
>
> We have an application that contains a broadcast process stage. The 
> none-broadcast input has roughly 10 million messages per second, and the 
> broadcast side is some kind of control stream, rarely has message follow 
> through. 
> After several hours of running, the TaskManager will run out of heap memory 
> and restart. We reviewed the application code without finding any relevant 
> issues.
> We found that the running to crash time was roughly the same. Then we make a 
> heap dump before the crash and found mass `CompletableFuture$UniRun` 
> instances. 
> These `CompletableFuture$UniRun` instances consume several gigabytes memories.
>  
> The following pic is from the heap dump we get from a mock testing stream 
> with the same scenario.
> !image-2022-01-20-18-43-32-816.png|width=1161,height=471!
>  
> After some source code research. We found that it might be caused by the 
> *StreamMultipleInputProcessor.getAvailableFuture()*.
> *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's 
> *availableFuture* got completed when any of it's input's *availableFuture* is 
> complete. 
> The current implementation create a new *CompletableFuture* and a new 
> *CompletableFuture$UniRun* append to delegate inputProcessor's 
> *avaiableFuture*.
> The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow 
> inputProcessor's *avaiableFuture*. 
> See the source code below.
> [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65]
> Because the *UniRun* holds the reference of outside 
> *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass 
> *CompletableFuture* instance which can not be recycled.
> We made some modifications to the 
> *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that 
> the issue is gone on our modified version. 
> We are willing to make a PR for this fix.
>  Heap Dump File [^flink-completablefuture-issue.tar.xz] 
> PS: This is a YourKit heap dump. may be not compatible HPROF files.
> [Sample Code to reproduce the 
> issue|https://github.com/wpc009/flink/blob/FLINK-25728/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/MultipleInputStreamMemoryIssueTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)