[ 
https://issues.apache.org/jira/browse/SPARK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reassigned SPARK-27495:
-------------------------------------

    Assignee: Thomas Graves

> SPIP: Support Stage level resource configuration and scheduling
> ---------------------------------------------------------------
>
>                 Key: SPARK-27495
>                 URL: https://issues.apache.org/jira/browse/SPARK-27495
>             Project: Spark
>          Issue Type: Epic
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Thomas Graves
>            Assignee: Thomas Graves
>            Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> Objectives:
>  # Allow users to specify task and executor resource requirements at the 
> stage level. 
>  # Spark will use the stage level requirements to acquire the necessary 
> resources/executors and schedule tasks based on the per stage requirements.
> Many times users have different resource requirements for different stages of 
> their application so they want to be able to configure resources at the stage 
> level. For instance, you have a single job that has 2 stages. The first stage 
> does some  ETL which requires a lot of tasks, each with a small amount of 
> memory and 1 core each. Then you have a second stage where you feed that ETL 
> data into an ML algorithm. The second stage only requires a few executors but 
> each executor needs a lot of memory, GPUs, and many cores.  This feature 
> allows the user to specify the task and executor resource requirements for 
> the ETL Stage and then change them for the ML stage of the job.
> Resources include cpu, memory (on heap, overhead, pyspark, and off heap), and 
> extra Resources (GPU/FPGA/etc). It has the potential to allow for other 
> things like limiting the number of tasks per stage, specifying other 
> parameters for things like shuffle, etc. Changing the executor resources will 
> rely on dynamic allocation being enabled.
> Main use cases:
>  # ML use case where user does ETL and feeds it into an ML algorithm where 
> it’s using the RDD API. This should work with barrier scheduling as well once 
> it supports dynamic allocation.
>  # Spark internal use by catalyst. Catalyst could control the stage level 
> resources as it finds the need to change it between stages for different 
> optimizations. For instance, with the new columnar plugin to the query 
> planner we can insert stages into the plan that would change running 
> something on the CPU in row format to running it on the GPU in columnar 
> format. This API would allow the planner to make sure the stages that run on 
> the GPU get the corresponding GPU resources it needs to run. Another possible 
> use case for catalyst is that it would allow catalyst to add in more 
> optimizations to where the user doesn’t need to configure container sizes at 
> all. If the optimizer/planner can handle that for the user, everyone wins.
> This SPIP focuses on the RDD API but we don’t exclude the Dataset API. I 
> think the DataSet API will require more changes because it specifically hides 
> the RDD from the users via the plans and catalyst can optimize the plan and 
> insert things into the plan. The only way I’ve found to make this work with 
> the Dataset API would be modifying all the plans to be able to get the 
> resource requirements down into where it creates the RDDs, which I believe 
> would be a lot of change.  If other people know better options, it would be 
> great to hear them.
> *Q2.* What problem is this proposal NOT designed to solve?
> The initial implementation is not going to add Dataset APIs.
> We are starting with allowing users to specify a specific set of 
> task/executor resources and plan to design it to be extendable, but the first 
> implementation will not support changing generic SparkConf configs and only 
> specific limited resources.
> This initial version will have a programmatic API for specifying the resource 
> requirements per stage, we can add the ability to perhaps have profiles in 
> the configs later if its useful.
> *Q3.* How is it done today, and what are the limits of current practice?
> Currently this is either done by having multiple spark jobs or requesting 
> containers with the max resources needed for any part of the job.  To do this 
> today, you can break it into separate jobs where each job requests the 
> corresponding resources needed, but then you have to write the data out 
> somewhere and then read it back in between jobs.  This is going to take 
> longer as well as require that job coordination between those to make sure 
> everything works smoothly. Another option would be to request executors with 
> your largest need up front and potentially waste those resources when they 
> aren't being used, which in turn wastes money. For instance, for an ML 
> application where it does ETL first, many times people request containers 
> with GPUs and the GPUs sit idle while the ETL is happening. This is wasting 
> those GPU resources and in turn money because those GPUs could have been used 
> by other applications until they were really needed.  
> Note for the catalyst internal use, that can’t be done today.
> *Q4.* What is new in your approach and why do you think it will be successful?
> This is a new way for users to specify the per stage resource requirements.  
> This will give users and Spark a lot more flexibility within a job and get 
> better utilization of their hardware.
> *Q5.* Who cares? If you are successful, what difference will it make?
> Spark application developers, cluster admins, managers and companies who pay 
> the bills for running Spark. It has the potential to make a huge difference 
> in cost by utilizing resources better and saving developers time.
> I’ve talked to different people from different companies and all of them have 
> said this would be a useful feature for them.
> *Q6.* What are the risks?
> The scoping of the new API could cause some confusion to the user as to which 
> resources actually get used in a stage. If the user has specified different 
> resources in multiple RDDs that get combined into a single stage, the 
> scheduler will have to merge those and come up with a final container size to 
> request. We will have a specific algorithm for merging but if the user 
> doesn’t realize things get combined or that some RDD’s require shuffle, they 
> might get confused.  I don't know how to get around this other then to 
> document the way it works and try to make it obvious to the user what was 
> chosen. See the design doc for options on scoping.
> The cluster managers (like YARN) and dynamic allocation manager have to track 
> everything at a ResourceProfile (specific set of resource requirements) level 
> rather than just a global cores or executors level, so it requires a bunch of 
> data structure changes to those.
> *Q7.* How long will it take?
> I suspect this will take multiple months because it’s a fairly large change.  
> I think we can do it in pieces fairly easily though. For instance, I think we 
> can do the dynamic allocation manager and scheduler, YARN cluster manager, 
> and then finally the RDD API.  We can do the backend pieces first where the 
> global resource configs apply and then once we add in the actual RDD API, it 
> will only be there and only at that point would the user actually see it.  I 
> have a rough prototype of those where I was investigating what would all need 
> to change.
> *Q8.* What are the mid-term and final “exams” to check for success?
> Success is for the user to specify the resources per stage with dynamic 
> allocation and everything to work with it. One stage would run with a set of 
> resources and when the next stage starts with different resources the first 
> stage containers are let go and new ones acquired.  The mid-term might be to 
> put in the changes for the allocation manager and cluster manager and 
> scheduler and have the normal global resource requirements continue to work 
> as expected.
> *Appendix A.* Proposed API Changes. Optional section defining APIs changes, 
> if any. Backward and forward compatibility must be taken into account.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below
> *Appendix B.* Optional Design Sketch: How are the goals going to be 
> accomplished? Give sufficient technical detail to allow a contributor to 
> judge whether it’s likely to be feasible. Note that this is not a full design 
> document.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below
> *Appendix C.* Optional Rejected Designs: What alternatives were considered? 
> Why were they rejected? If no alternatives have been considered, the problem 
> needs more thought.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to