[ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988603#comment-16988603
 ] 

Zhu Zhu edited comment on FLINK-15031 at 12/5/19 9:03 AM:
----------------------------------------------------------

Thanks [~trohrmann] for the feedbacks. I can work on it.

Agreed that it would be better to let the {{ShuffleService}} determine the 
shuffle memory required for a vertex. So I'd like to introduce a interface 

{code:java}
MemorySize getShuffleMemoryForTask(int numberOfInputChannels, int 
numberOfResultSubpartitions);
{code}

in {{ShuffleMaster}} with a default returning value {{MemorySize.ZERO}}. The 
{{NettyShuffleMaster}} can then easily implement it by returning 
{{numInputChannel * buffersPerChannel (from config) + numberOfSubpartitions + 
1}}. 

Another part of change would be in scheduler components. I'd like to introduce 
a {{ResourceRequirementsRetriever}} to generate the final ResourceProfiles 
regarding shuffle memory for vertices/groups and serve the querying.





was (Author: zhuzh):
Thanks [~trohrmann] for the feedbacks. I can work on it.

Agreed that it would be better to let the {{ShuffleService}} to determine the 
shuffle memory required for a vertex. So I'd like to introduce a interface 

{code:java}
MemorySize getShuffleMemoryForTask(int numberOfInputChannels, int 
numberOfResultSubpartitions);
{code}

in {{ShuffleMaster}} with a default returning value {{MemorySize.ZERO}}. The 
{{NettyShuffleMaster}} can then easily implement it by returning 
{{numInputChannel * buffersPerChannel (from config) + numberOfSubpartitions + 
1}}. 

Another part of change would be in scheduler components. I'd like to introduce 
a {{ResourceRequirementsRetriever}} to generate the final ResourceProfiles 
regarding shuffle memory for vertices/groups and serve the querying.




> Calculate required shuffle memory cases before allocating slots in resources 
> specified
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-15031
>                 URL: https://issues.apache.org/jira/browse/FLINK-15031
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.0
>            Reporter: Zhu Zhu
>            Priority: Major
>             Fix For: 1.10.0
>
>
> In resources specified cases, we expect each operator to declare required 
> resources and before using them. In this way, no resource related error 
> should happen if no resource is used more than declared. This ensures a 
> deployed task would not fail due to insufficient resources in TM. This may 
> result in unnecessary failures and may even cause a job hanging forever, 
> failing repeatedly on deploying tasks to a TM with insufficient resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) 
> is 
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot sharing group should be the sum of shuffle memory for each 
> {{ExecutionJobVertex}} instance within.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to