[ 
https://issues.apache.org/jira/browse/SPARK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16822166#comment-16822166
 ] 

Thomas Graves edited comment on SPARK-27495 at 4/19/19 8:28 PM:
----------------------------------------------------------------

Unfortunately the link to the original design doc was removed from  
https://issues.apache.org/jira/browse/SPARK-24615 but just for reference there 
was some good discussions about this in comments    
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16528293&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16528293
 all the way through 
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16567393&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16567393

Just to summarize the proposal there after the discussions it was something 
like:
{code:java}
val rdd2 = rdd1.withResources(....).mapPartitions(){code}
Possibly getting more detailed:
 rdd.withResources
 .prefer("/gpu/k80", 2) // prefix of resource logical name, amount 
.require("/cpu", 1)
 .require("/memory", 8192000000)
 .require("/disk", 1000000000000)
 The withResources would apply to just that stage.  If there were conflicting 
resources in say like a join
 val rddA = rdd.withResources.mapPartitions()

val rddB = rdd.withResources.mapPartitions()

val rddC = rddA.join(rddB)
 Then you would have to resolve that conflict by either choosing largest or 
merging the requirements obviously choosing the largest of the two.

There are also a lot of corner cases we need to handle such as mentioned:

 

 

 
{code:java}
So which means RDDs with different resources requirements in one stage may have 
conflicts. For example: rdd1.withResources.mapPartitions { xxx 
}.withResources.mapPartitions { xxx }.collect, resources in rdd1 may be 
different from map rdd, so currently what I can think is that:
1. always pick the latter with warning log to say that multiple different 
resources in one stage is illegal.
 2. fail the stage with warning log to say that multiple different resources in 
one stage is illegal.
 3. merge conflicts with maximum resources needs. For example rdd1 requires 3 
gpus per task, rdd2 requires 4 gpus per task, then the merged requirement would 
be 4 gpus per task. (This is the high level description, details will be per 
partition based merging) [chosen].
 
{code}
 

 

One thing that was brought up multiple times is how a user will really know 
what stage it applies to.  Users aren't necessarily going to realize where the 
stage boundaries are.  I don't think anyone has a good solution to this.

Also I think for this to really be useful it has to be tied into dynamic 
allocation.  Without that they can just use the application level task configs 
we are adding in  SPARK-24615.

Of course the original proposal was only for RDDs as well. That was because it 
goes with barrier scheduling and I think the dataset/dataframe api is even 
harder to know where stage boundaries are because catalyst can optimize a bunch 
of things.

 


was (Author: tgraves):
Unfortunately the link to the original design doc was removed from  
https://issues.apache.org/jira/browse/SPARK-24615 but just for reference there 
was some good discussions about this in comments    
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16528293&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16528293
 all the way through 
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16567393&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16567393

Just to summarize the proposal there after the discussions it was something 
like:
{code:java}
val rdd2 = rdd1.withResources(....).mapPartitions(){code}
Possibly getting more detailed:
rdd.withResources
  .prefer("/gpu/k80", 2) // prefix of resource logical name, amount  
.require("/cpu", 1)
  .require("/memory", 8192000000)
  .require("/disk", 1000000000000)
The withResources would apply to just that stage.  If there were conflicting 
resources in say like a join
val rddA = rdd.withResources.mapPartitions()

val rddB = rdd.withResources.mapPartitions()

val rddC = rddA.join(rddB)
Then you would have to resolve that conflict by either choosing largest or 
merging the requirements obviously choosing the largest of the two.

There are also a lot of corner cases we need to handle such as mentioned:

 

 

 
{noformat}
So which means RDDs with different resources requirements in one stage may have 
conflicts. For example: rdd1.withResources.mapPartitions { xxx 
}.withResources.mapPartitions { xxx }.collect, resources in rdd1 may be 
different from map rdd, so currently what I can think is that:
 
1. always pick the latter with warning log to say that multiple different 
resources in one stage is illegal.
 2. fail the stage with warning log to say that multiple different resources in 
one stage is illegal.
 3. merge conflicts with maximum resources needs. For example rdd1 requires 3 
gpus per task, rdd2 requires 4 gpus per task, then the merged requirement would 
be 4 gpus per task. (This is the high level description, details will be per 
partition based merging) [chosen].
{noformat}
 

 

 

One thing that was brought up multiple times is how a user will really know 
what stage it applies to.  Users aren't necessarily going to realize where the 
stage boundaries are.  I don't think anyone has a good solution to this.

Also I think for this to really be useful it has to be tied into dynamic 
allocation.  Without that they can just use the application level task configs 
we are adding in  SPARK-24615.

Of course the original proposal was only for RDDs as well. That was because it 
goes with barrier scheduling and I think the dataset/dataframe api is even 
harder to know where stage boundaries are because catalyst can optimize a bunch 
of things.

 

> Support Stage level resource configuration and scheduling
> ---------------------------------------------------------
>
>                 Key: SPARK-27495
>                 URL: https://issues.apache.org/jira/browse/SPARK-27495
>             Project: Spark
>          Issue Type: Story
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Thomas Graves
>            Priority: Major
>
> Currently Spark supports CPU level scheduling and we are adding in 
> accelerator aware scheduling with 
> https://issues.apache.org/jira/browse/SPARK-24615, but both of those are 
> scheduling via application level configurations.  Meaning there is one 
> configuration that is set for the entire lifetime of the application and the 
> user can't change it between Spark jobs/stages within that application.  
> Many times users have different requirements for different stages of their 
> application so they want to be able to configure at the stage level what 
> resources are required for that stage.
> For example, I might start a spark application which first does some ETL work 
> that needs lots of cores to run many tasks in parallel, then once that is 
> done I want to run some ML job and at that point I want GPU's, less CPU's, 
> and more memory.
> With this Jira we want to add the ability for users to specify the resources 
> for different stages.
> Note that https://issues.apache.org/jira/browse/SPARK-24615 had some 
> discussions on this but this part of it was removed from that.
> We should come up with a proposal on how to do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to