[ 
https://issues.apache.org/jira/browse/SPARK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-27495:
----------------------------------
    Description: 
*Q1.* What are you trying to do? Articulate your objectives using absolutely no 
jargon.

Objectives:
 # Allow users to specify task and executor resource requirements at the stage 
level. 
 # Spark will use the stage level requirements to acquire the necessary 
resources/executors and schedule tasks based on the per stage requirements.

Many times users have different resource requirements for different stages of 
their application so they want to be able to configure resources at the stage 
level. For instance, you have a single job that has 2 stages. The first stage 
does some  ETL which requires a lot of tasks, each with a small amount of 
memory and 1 core each. Then you have a second stage where you feed that ETL 
data into an ML algorithm. The second stage only requires a few executors but 
each executor needs a lot of memory, GPUs, and many cores.  This feature allows 
the user to specify the task and executor resource requirements for the ETL 
Stage and then change them for the ML stage of the job.

Resources include cpu, memory (on heap, overhead, pyspark, and off heap), and 
extra Resources (GPU/FPGA/etc). It has the potential to allow for other things 
like limiting the number of tasks per stage, specifying other parameters for 
things like shuffle, etc. Changing the executor resources will rely on dynamic 
allocation being enabled.

Main use cases:
 # ML use case where user does ETL and feeds it into an ML algorithm where it’s 
using the RDD API. This should work with barrier scheduling as well once it 
supports dynamic allocation.
 # Spark internal use by catalyst. Catalyst could control the stage level 
resources as it finds the need to change it between stages for different 
optimizations. For instance, with the new columnar plugin to the query planner 
we can insert stages into the plan that would change running something on the 
CPU in row format to running it on the GPU in columnar format. This API would 
allow the planner to make sure the stages that run on the GPU get the 
corresponding GPU resources it needs to run. Another possible use case for 
catalyst is that it would allow catalyst to add in more optimizations to where 
the user doesn’t need to configure container sizes at all. If the 
optimizer/planner can handle that for the user, everyone wins.

This SPIP focuses on the RDD API but we don’t exclude the Dataset API. I think 
the DataSet API will require more changes because it specifically hides the RDD 
from the users via the plans and catalyst can optimize the plan and insert 
things into the plan. The only way I’ve found to make this work with the 
Dataset API would be modifying all the plans to be able to get the resource 
requirements down into where it creates the RDDs, which I believe would be a 
lot of change.  If other people know better options, it would be great to hear 
them.

*Q2.* What problem is this proposal NOT designed to solve?

The initial implementation is not going to add Dataset APIs.

We are starting with allowing users to specify a specific set of task/executor 
resources and plan to design it to be extendable, but the first implementation 
will not support changing generic SparkConf configs and only specific limited 
resources.

This initial version will have a programmatic API for specifying the resource 
requirements per stage, we can add the ability to perhaps have profiles in the 
configs later if its useful.

*Q3.* How is it done today, and what are the limits of current practice?

Currently this is either done by having multiple spark jobs or requesting 
containers with the max resources needed for any part of the job.  To do this 
today, you can break it into separate jobs where each job requests the 
corresponding resources needed, but then you have to write the data out 
somewhere and then read it back in between jobs.  This is going to take longer 
as well as require that job coordination between those to make sure everything 
works smoothly. Another option would be to request executors with your largest 
need up front and potentially waste those resources when they aren't being 
used, which in turn wastes money. For instance, for an ML application where it 
does ETL first, many times people request containers with GPUs and the GPUs sit 
idle while the ETL is happening. This is wasting those GPU resources and in 
turn money because those GPUs could have been used by other applications until 
they were really needed.  

Note for the catalyst internal use, that can’t be done today.

*Q4.* What is new in your approach and why do you think it will be successful?

This is a new way for users to specify the per stage resource requirements.  
This will give users and Spark a lot more flexibility within a job and get 
better utilization of their hardware.

*Q5.* Who cares? If you are successful, what difference will it make?

Spark application developers, cluster admins, managers and companies who pay 
the bills for running Spark. It has the potential to make a huge difference in 
cost by utilizing resources better and saving developers time.

I’ve talked to different people from different companies and all of them have 
said this would be a useful feature for them.

*Q6.* What are the risks?

The scoping of the new API could cause some confusion to the user as to which 
resources actually get used in a stage. If the user has specified different 
resources in multiple RDDs that get combined into a single stage, the scheduler 
will have to merge those and come up with a final container size to request. We 
will have a specific algorithm for merging but if the user doesn’t realize 
things get combined or that some RDD’s require shuffle, they might get 
confused.  I don't know how to get around this other then to document the way 
it works and try to make it obvious to the user what was chosen. See the design 
doc for options on scoping.

The cluster managers (like YARN) and dynamic allocation manager have to track 
everything at a ResourceProfile (specific set of resource requirements) level 
rather than just a global cores or executors level, so it requires a bunch of 
data structure changes to those.

*Q7.* How long will it take?

I suspect this will take multiple months because it’s a fairly large change.  I 
think we can do it in pieces fairly easily though. For instance, I think we can 
do the dynamic allocation manager and scheduler, YARN cluster manager, and then 
finally the RDD API.  We can do the backend pieces first where the global 
resource configs apply and then once we add in the actual RDD API, it will only 
be there and only at that point would the user actually see it.  I have a rough 
prototype of those where I was investigating what would all need to change.

*Q8.* What are the mid-term and final “exams” to check for success?

Success is for the user to specify the resources per stage with dynamic 
allocation and everything to work with it. One stage would run with a set of 
resources and when the next stage starts with different resources the first 
stage containers are let go and new ones acquired.  The mid-term might be to 
put in the changes for the allocation manager and cluster manager and scheduler 
and have the normal global resource requirements continue to work as expected.

*Appendix A.* Proposed API Changes. Optional section defining APIs changes, if 
any. Backward and forward compatibility must be taken into account.

I split the appendices out into a google doc since it was getting big and to 
allow inline comments, see link below

*Appendix B.* Optional Design Sketch: How are the goals going to be 
accomplished? Give sufficient technical detail to allow a contributor to judge 
whether it’s likely to be feasible. Note that this is not a full design 
document.

I split the appendices out into a google doc since it was getting big and to 
allow inline comments, see link below

*Appendix C.* Optional Rejected Designs: What alternatives were considered? Why 
were they rejected? If no alternatives have been considered, the problem needs 
more thought.

I split the appendices out into a google doc since it was getting big and to 
allow inline comments, see link below

  was:
*Q1.* What are you trying to do? Articulate your objectives using absolutely no 
jargon.

Objectives:
 # Allow users to specify task and executor resource requirements at the stage 
level. 
 # Spark will use the stage level requirements to acquire the necessary 
resources/executors and schedule tasks based on the per stage requirements.

Many times users have different resource requirements for different stages of 
their application so they want to be able to configure resources at the stage 
level. For instance, you have a single job that has 2 stages. The first stage 
does some  ETL which requires a lot of tasks, each with a small amount of 
memory and 1 core each. Then you have a second stage where you feed that ETL 
data into an ML algorithm. The second stage only requires a few executors but 
each executor needs a lot of memory, GPUs, and many cores.  This feature allows 
the user to specify the task and executor resource requirements for the ETL 
Stage and then change them for the ML stage of the job. 


Resources include cpu, memory (on heap, overhead, pyspark, and off heap), and 
extra Resources (GPU/FPGA/etc). It has the potential to allow for other things 
like limiting the number of tasks per stage, specifying other parameters for 
things like shuffle, etc. Changing the executor resources will rely on dynamic 
allocation being enabled.


Main use cases:
 # ML use case where user does ETL and feeds it into an ML algorithm where it’s 
using the RDD API. This should work with barrier scheduling as well once it 
supports dynamic allocation.
 # Spark internal use by catalyst. Catalyst could control the stage level 
resources as it finds the need to change it between stages for different 
optimizations. For instance, with the new columnar plugin to the query planner 
we can insert stages into the plan that would change running something on the 
CPU in row format to running it on the GPU in columnar format. This API would 
allow the planner to make sure the stages that run on the GPU get the 
corresponding GPU resources it needs to run. Another possible use case for 
catalyst is that it would allow catalyst to add in more optimizations to where 
the user doesn’t need to configure container sizes at all. If the 
optimizer/planner can handle that for the user, everyone wins. 


This SPIP focuses on the RDD API but we don’t exclude the Dataset API. I think 
the DataSet API will require more changes because it specifically hides the RDD 
from the users via the plans and catalyst can optimize the plan and insert 
things into the plan. The only way I’ve found to make this work with the 
Dataset API would be modifying all the plans to be able to get the resource 
requirements down into where it creates the RDDs, which I believe would be a 
lot of change.  If other people know better options, it would be great to hear 
them.

*Q2.* What problem is this proposal NOT designed to solve?

The initial implementation is not going to add Dataset APIs.

We are starting with allowing users to specify a specific set of task/executor 
resources and plan to design it to be extendable, but the first implementation 
will not support changing generic SparkConf configs and only specific limited 
resources.

This initial version will have a programmatic API for specifying the resource 
requirements per stage, we can add the ability to perhaps have profiles in the 
configs later if its useful.


*Q3.* How is it done today, and what are the limits of current practice?

Currently this is either done by having multiple spark jobs or requesting 
containers with the max resources needed for any part of the job.  To do this 
today, you can break it into separate jobs where each job requests the 
corresponding resources needed, but then you have to write the data out 
somewhere and then read it back in between jobs.  This is going to take longer 
as well as require that job coordination between those to make sure everything 
works smoothly. Another option would be to request executors with your largest 
need up front and potentially waste those resources when they aren't being 
used, which in turn wastes money. For instance, for an ML application where it 
does ETL first, many times people request containers with GPUs and the GPUs sit 
idle while the ETL is happening. This is wasting those GPU resources and in 
turn money because those GPUs could have been used by other applications until 
they were really needed.  

Note for the catalyst internal use, that can’t be done today.

*Q4.* What is new in your approach and why do you think it will be successful?

This is a new way for users to specify the per stage resource requirements.  
This will give users and Spark a lot more flexibility within a job and get 
better utilization of their hardware. 


*Q5.* Who cares? If you are successful, what difference will it make?

Spark application developers, cluster admins, managers and companies who pay 
the bills for running Spark. It has the potential to make a huge difference in 
cost by utilizing resources better and saving developers time.

I’ve talked to different people from different companies and all of them have 
said this would be a useful feature for them.

*Q6.* What are the risks?

The scoping of the new API could cause some confusion to the user as to which 
resources actually get used in a stage. If the user has specified different 
resources in multiple RDDs that get combined into a single stage, the scheduler 
will have to merge those and come up with a final container size to request. We 
will have a specific algorithm for merging but if the user doesn’t realize 
things get combined or that some RDD’s require shuffle, they might get 
confused.  I don't know how to get around this other then to document the way 
it works and try to make it obvious to the user what was chosen. See the design 
doc for options on scoping.

The cluster managers (like YARN) and dynamic allocation manager have to track 
everything at a ResourceProfile (specific set of resource requirements) level 
rather than just a global cores or executors level, so it requires a bunch of 
data structure changes to those.

*Q7.* How long will it take?

I suspect this will take multiple months because it’s a fairly large change.  I 
think we can do it in pieces fairly easily though. For instance, I think we can 
do the dynamic allocation manager and scheduler, YARN cluster manager, and then 
finally the RDD API.  We can do the backend pieces first where the global 
resource configs apply and then once we add in the actual RDD API, it will only 
be there and only at that point would the user actually see it.  I have a rough 
prototype of those where I was investigating what would all need to change. 


*Q8.* What are the mid-term and final “exams” to check for success?

Success is for the user to specify the resources per stage with dynamic 
allocation and everything to work with it. One stage would run with a set of 
resources and when the next stage starts with different resources the first 
stage containers are let go and new ones acquired.  The mid-term might be to 
put in the changes for the allocation manager and cluster manager and scheduler 
and have the normal global resource requirements continue to work as expected.

*Appendix A.* Proposed API Changes. Optional section defining APIs changes, if 
any. Backward and forward compatibility must be taken into account.

See Design Doc

*Appendix B.* Optional Design Sketch: How are the goals going to be 
accomplished? Give sufficient technical detail to allow a contributor to judge 
whether it’s likely to be feasible. Note that this is not a full design 
document.

See Design Doc

*Appendix C.* Optional Rejected Designs: What alternatives were considered? Why 
were they rejected? If no alternatives have been considered, the problem needs 
more thought.

See design doc


> SPIP: Support Stage level resource configuration and scheduling
> ---------------------------------------------------------------
>
>                 Key: SPARK-27495
>                 URL: https://issues.apache.org/jira/browse/SPARK-27495
>             Project: Spark
>          Issue Type: Epic
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Thomas Graves
>            Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> Objectives:
>  # Allow users to specify task and executor resource requirements at the 
> stage level. 
>  # Spark will use the stage level requirements to acquire the necessary 
> resources/executors and schedule tasks based on the per stage requirements.
> Many times users have different resource requirements for different stages of 
> their application so they want to be able to configure resources at the stage 
> level. For instance, you have a single job that has 2 stages. The first stage 
> does some  ETL which requires a lot of tasks, each with a small amount of 
> memory and 1 core each. Then you have a second stage where you feed that ETL 
> data into an ML algorithm. The second stage only requires a few executors but 
> each executor needs a lot of memory, GPUs, and many cores.  This feature 
> allows the user to specify the task and executor resource requirements for 
> the ETL Stage and then change them for the ML stage of the job.
> Resources include cpu, memory (on heap, overhead, pyspark, and off heap), and 
> extra Resources (GPU/FPGA/etc). It has the potential to allow for other 
> things like limiting the number of tasks per stage, specifying other 
> parameters for things like shuffle, etc. Changing the executor resources will 
> rely on dynamic allocation being enabled.
> Main use cases:
>  # ML use case where user does ETL and feeds it into an ML algorithm where 
> it’s using the RDD API. This should work with barrier scheduling as well once 
> it supports dynamic allocation.
>  # Spark internal use by catalyst. Catalyst could control the stage level 
> resources as it finds the need to change it between stages for different 
> optimizations. For instance, with the new columnar plugin to the query 
> planner we can insert stages into the plan that would change running 
> something on the CPU in row format to running it on the GPU in columnar 
> format. This API would allow the planner to make sure the stages that run on 
> the GPU get the corresponding GPU resources it needs to run. Another possible 
> use case for catalyst is that it would allow catalyst to add in more 
> optimizations to where the user doesn’t need to configure container sizes at 
> all. If the optimizer/planner can handle that for the user, everyone wins.
> This SPIP focuses on the RDD API but we don’t exclude the Dataset API. I 
> think the DataSet API will require more changes because it specifically hides 
> the RDD from the users via the plans and catalyst can optimize the plan and 
> insert things into the plan. The only way I’ve found to make this work with 
> the Dataset API would be modifying all the plans to be able to get the 
> resource requirements down into where it creates the RDDs, which I believe 
> would be a lot of change.  If other people know better options, it would be 
> great to hear them.
> *Q2.* What problem is this proposal NOT designed to solve?
> The initial implementation is not going to add Dataset APIs.
> We are starting with allowing users to specify a specific set of 
> task/executor resources and plan to design it to be extendable, but the first 
> implementation will not support changing generic SparkConf configs and only 
> specific limited resources.
> This initial version will have a programmatic API for specifying the resource 
> requirements per stage, we can add the ability to perhaps have profiles in 
> the configs later if its useful.
> *Q3.* How is it done today, and what are the limits of current practice?
> Currently this is either done by having multiple spark jobs or requesting 
> containers with the max resources needed for any part of the job.  To do this 
> today, you can break it into separate jobs where each job requests the 
> corresponding resources needed, but then you have to write the data out 
> somewhere and then read it back in between jobs.  This is going to take 
> longer as well as require that job coordination between those to make sure 
> everything works smoothly. Another option would be to request executors with 
> your largest need up front and potentially waste those resources when they 
> aren't being used, which in turn wastes money. For instance, for an ML 
> application where it does ETL first, many times people request containers 
> with GPUs and the GPUs sit idle while the ETL is happening. This is wasting 
> those GPU resources and in turn money because those GPUs could have been used 
> by other applications until they were really needed.  
> Note for the catalyst internal use, that can’t be done today.
> *Q4.* What is new in your approach and why do you think it will be successful?
> This is a new way for users to specify the per stage resource requirements.  
> This will give users and Spark a lot more flexibility within a job and get 
> better utilization of their hardware.
> *Q5.* Who cares? If you are successful, what difference will it make?
> Spark application developers, cluster admins, managers and companies who pay 
> the bills for running Spark. It has the potential to make a huge difference 
> in cost by utilizing resources better and saving developers time.
> I’ve talked to different people from different companies and all of them have 
> said this would be a useful feature for them.
> *Q6.* What are the risks?
> The scoping of the new API could cause some confusion to the user as to which 
> resources actually get used in a stage. If the user has specified different 
> resources in multiple RDDs that get combined into a single stage, the 
> scheduler will have to merge those and come up with a final container size to 
> request. We will have a specific algorithm for merging but if the user 
> doesn’t realize things get combined or that some RDD’s require shuffle, they 
> might get confused.  I don't know how to get around this other then to 
> document the way it works and try to make it obvious to the user what was 
> chosen. See the design doc for options on scoping.
> The cluster managers (like YARN) and dynamic allocation manager have to track 
> everything at a ResourceProfile (specific set of resource requirements) level 
> rather than just a global cores or executors level, so it requires a bunch of 
> data structure changes to those.
> *Q7.* How long will it take?
> I suspect this will take multiple months because it’s a fairly large change.  
> I think we can do it in pieces fairly easily though. For instance, I think we 
> can do the dynamic allocation manager and scheduler, YARN cluster manager, 
> and then finally the RDD API.  We can do the backend pieces first where the 
> global resource configs apply and then once we add in the actual RDD API, it 
> will only be there and only at that point would the user actually see it.  I 
> have a rough prototype of those where I was investigating what would all need 
> to change.
> *Q8.* What are the mid-term and final “exams” to check for success?
> Success is for the user to specify the resources per stage with dynamic 
> allocation and everything to work with it. One stage would run with a set of 
> resources and when the next stage starts with different resources the first 
> stage containers are let go and new ones acquired.  The mid-term might be to 
> put in the changes for the allocation manager and cluster manager and 
> scheduler and have the normal global resource requirements continue to work 
> as expected.
> *Appendix A.* Proposed API Changes. Optional section defining APIs changes, 
> if any. Backward and forward compatibility must be taken into account.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below
> *Appendix B.* Optional Design Sketch: How are the goals going to be 
> accomplished? Give sufficient technical detail to allow a contributor to 
> judge whether it’s likely to be feasible. Note that this is not a full design 
> document.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below
> *Appendix C.* Optional Rejected Designs: What alternatives were considered? 
> Why were they rejected? If no alternatives have been considered, the problem 
> needs more thought.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to