[ https://issues.apache.org/jira/browse/FLINK-12529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842078#comment-16842078 ]
Haibo Sun edited comment on FLINK-12529 at 5/17/19 10:23 AM: ------------------------------------------------------------- > But as far as I understand you would only need to modify the > {{Stream(Two)InputProcessor#processBufferOrEvent}} code (assuming that this > change would base on my PR?). If this change based on your PR (https://github.com/apache/flink/pull/8467), your are right. This change is simple, so I will put it after your PR. was (Author: sunhaibotb): > But as far as I understand you would only need to modify the > {{Stream(Two)InputProcessor#processBufferOrEvent}} code (assuming that this > change would base on my PR?). If this change based on your [PR#8467|[https://github.com/apache/flink/pull/8467]], your are right. This change is simple, so I will put it after your PR. > Release record-deserializer buffers timely to improve the efficiency of heap > usage on taskmanager > ------------------------------------------------------------------------------------------------- > > Key: FLINK-12529 > URL: https://issues.apache.org/jira/browse/FLINK-12529 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators > Affects Versions: 1.8.0 > Reporter: Haibo Sun > Assignee: Haibo Sun > Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In input processors (`StreamInputProcessor` and `StreamTwoInputProcessor`), > each input channel has a corresponding record deserializer. Currently, these > record deserializers are cleaned up at the end of the task (look at > `StreamInputProcessor#cleanup()` and `StreamTwoInputProcessor#cleanup()`). > This is not a problem for unbounded streams, but it may reduce the efficiency > of heap memory usage on taskmanger when input is bounded stream. > For example, in case that all inputs are bounded streams, some of them end > very early because of the small amount of data, and the other end very late > because of the large amount of data, then the buffers of the record > deserializers corresponding to the input channels finished early is idle for > a long time and no longer used. > In another case, when both unbounded and bounded streams exist in the inputs, > the buffers of the record deserializers corresponding to the bounded stream > are idle for ever (no longer used) after the bounded streams are finished. > Especially when the record and the parallelism of upstream are large, the > total size of `SpanningWrapper#buffer` are very large. The size of > `SpanningWrapper#buffer` is allowed to reach up to 5 MB, and if the > parallelism of upstream is 100, the maximum total size will reach 500 MB (in > our production, there are jobs with the record size up to hundreds of KB and > the parallelism of upstream up to 1000). > Overall, after receiving `EndOfPartitionEvent` from the input channel, the > corresponding record deserializer should be cleared immediately to improve > the efficiency of heap memory usage on taskmanager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)