[ https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piotr Nowojski closed FLINK-25728. ---------------------------------- Fix Version/s: 1.15.0 1.13.6 1.14.4 Resolution: Fixed Merged to master as a7eadf57e42^ and a7eadf57e42 Merged to release-1.14 as 761a4623dda^ and 761a4623dda Merged to release-1.13 as 9653999a27b^ and 9653999a27b > Potential memory leaks in StreamMultipleInputProcessor > ------------------------------------------------------ > > Key: FLINK-25728 > URL: https://issues.apache.org/jira/browse/FLINK-25728 > Project: Flink > Issue Type: Bug > Components: Runtime / Task > Affects Versions: 1.12.5, 1.15.0, 1.13.5, 1.14.2 > Reporter: pc wang > Assignee: pc wang > Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0, 1.13.6, 1.14.4 > > Attachments: flink-completablefuture-issue.tar.xz, > image-2022-01-20-18-43-32-816.png > > > We have an application that contains a broadcast process stage. The > none-broadcast input has roughly 10 million messages per second, and the > broadcast side is some kind of control stream, rarely has message follow > through. > After several hours of running, the TaskManager will run out of heap memory > and restart. We reviewed the application code without finding any relevant > issues. > We found that the running to crash time was roughly the same. Then we make a > heap dump before the crash and found mass `CompletableFuture$UniRun` > instances. > These `CompletableFuture$UniRun` instances consume several gigabytes memories. > > The following pic is from the heap dump we get from a mock testing stream > with the same scenario. > !image-2022-01-20-18-43-32-816.png|width=1161,height=471! > > After some source code research. We found that it might be caused by the > *StreamMultipleInputProcessor.getAvailableFuture()*. > *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's > *availableFuture* got completed when any of it's input's *availableFuture* is > complete. > The current implementation create a new *CompletableFuture* and a new > *CompletableFuture$UniRun* append to delegate inputProcessor's > *avaiableFuture*. > The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow > inputProcessor's *avaiableFuture*. > See the source code below. > [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65] > Because the *UniRun* holds the reference of outside > *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass > *CompletableFuture* instance which can not be recycled. > We made some modifications to the > *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that > the issue is gone on our modified version. > We are willing to make a PR for this fix. > Heap Dump File [^flink-completablefuture-issue.tar.xz] > PS: This is a YourKit heap dump. may be not compatible HPROF files. > [Sample Code to reproduce the > issue|https://github.com/wpc009/flink/blob/FLINK-25728/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/MultipleInputStreamMemoryIssueTest.java] > -- This message was sent by Atlassian Jira (v8.20.1#820001)