[ https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988603#comment-16988603 ]
Zhu Zhu commented on FLINK-15031: --------------------------------- Thanks [~trohrmann] for the feedbacks. I can work on it. Agreed that it would be better to let the {{ShuffleService}} to determine the shuffle memory required for a vertex. So I'd like to introduce a interface {code:java} MemorySize getShuffleMemoryForTask(int numberOfInputChannels, int numberOfResultSubpartitions); {code} in {{ShuffleMaster}} with a default returning value {{MemorySize.ZERO}}. The {{NettyShuffleMaster}} can then easily implement it by returning {{numInputChannel * buffersPerChannel (from config) + numberOfSubpartitions + 1}}. Another part of change would be in scheduler components. I'd like to introduce a {{ResourceRequirementsRetriever}} to generate the final ResourceProfiles regarding shuffle memory for vertices/groups and serve the querying. > Calculate required shuffle memory cases before allocating slots in resources > specified > -------------------------------------------------------------------------------------- > > Key: FLINK-15031 > URL: https://issues.apache.org/jira/browse/FLINK-15031 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination > Affects Versions: 1.10.0 > Reporter: Zhu Zhu > Priority: Major > Fix For: 1.10.0 > > > In resources specified cases, we expect each operator to declare required > resources and before using them. In this way, no resource related error > should happen if no resource is used more than declared. This ensures a > deployed task would not fail due to insufficient resources in TM. This may > result in unnecessary failures and may even cause a job hanging forever, > failing repeatedly on deploying tasks to a TM with insufficient resources. > Shuffle memory is the last missing piece for this goal at the moment. Minimum > network buffers are required by tasks to work. Currently a task is possible > to be deployed to a TM with insufficient network buffers, and fails on > launching. > To avoid that, we should calculate required network memory for a > task/SlotSharingGroup before allocating a slot for it. > The required shuffle memory can be derived from the number of required > network buffers. The number of buffers required by a task (ExecutionVertex) > is > {code:java} > exclusive buffers for input channels(i.e. numInputChannel * > buffersPerChannel) + required buffers for result partition buffer > pool(currently is numberOfSubpartitions + 1) > {code} > Note that this is for the {{NettyShuffleService}} case. For custom shuffle > services, currently there is no way to get the required shuffle memory of a > task. > To make it simple under dynamic slot sharing, the required shuffle memory for > a task should be the max required shuffle memory of all {{ExecutionVertex}} > of the same {{ExecutionJobVertex}}. And the required shuffle memory for a > slot sharing group should be the sum of shuffle memory for each > {{ExecutionJobVertex}} instance within. -- This message was sent by Atlassian Jira (v8.3.4#803005)