[ 
https://issues.apache.org/jira/browse/SPARK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16902076#comment-16902076
 ] 

Thomas Graves commented on SPARK-27495:
---------------------------------------

Thanks for the comments.
 # I was trying to leave the API open to allow both.  Not sure you had a chance 
to look at the design/api doc, but with the ResourceProfile I have a .require 
function, that would be a must have and then mentioned in there we could add a 
.prefer function that would be more of a hint or a nice to have.  Also I should 
have clarified (and I'll update the description) that in this initial 
implementation I only intend to support scheduling on the same things we do 
now. So for Tasks it would only be CPU and other resources (GPU, FPGa, etc). 
For Executor resource we would support requesting cpu, memory (offheap, onheap, 
overhead, pyspark), and other resources.  So I wouldn't add support for task 
memory per stage at this point. It would also take other changes to enforce a 
task memory usage as well.  But your point is valid if that gets added so I 
think we need to be careful about what we allow for each. 
 # So I think you have brought up 2 issues here.
 ## One is how we do resource merging/conflict resolution. My proposal was to 
simply look at the RDD's within the stage and merge any resource profiles.  
There are multiple ways we could do the merge, one is naively just use a max, 
but as you pointed out some operations might be better done as a sum, so I was 
hoping to be able to either make it configurable with perhaps some defaults for 
different operations. I guess the question is if we need that for the initial 
implementation or we just make sure to design for it.  I think you are also 
proposing to use the parents RDD (do you just mean parents within a stage or 
always?) but I think that all depends on how we define it and how the user is 
going to expect it.  I might have a very large RDD that needs processing that 
shrinks down to a very small one.  To process that large RDD I need a lot of 
resources, but once its shrunk I don't need that many.  So the child of that 
wouldn't need the same resources as its parent.  That is why I was proposing to 
set the resources specifically for that RDD. We need to handle those that have 
shuffle, like groupBy to make sure its carried over. 
 ## so the other things you brought up I think it talking more about the task 
resources here since you are saying per partition, correct?  There are 
definitely still issues even within the same RDD that some partitions may be 
larger then others.  I'm not really trying to solve that problem here.  This 
feature may open it up so that Spark can to more intelligent things there, but 
this initial round users would still have to configure the resources for the 
worse case.  I'm essentially leaving the configs the same as what they have 
now. Users I think are used to defining the executor resources based on the 
worse case any task within the worst stage will consume, this just allows them 
to go one step further to control it per stage.  It also hopefully opens it up 
to do more of that configuration for them in the future.   For now, the idea is 
why would only configure the resources for the stages they want, all other 
stages would default back to the global configs.  I'll clarify this in the spip 
as well. We could change this but I think it gets hard for the user to know the 
scoping.

> SPIP: Support Stage level resource configuration and scheduling
> ---------------------------------------------------------------
>
>                 Key: SPARK-27495
>                 URL: https://issues.apache.org/jira/browse/SPARK-27495
>             Project: Spark
>          Issue Type: Epic
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Thomas Graves
>            Assignee: Thomas Graves
>            Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> Objectives:
>  # Allow users to specify task and executor resource requirements at the 
> stage level. 
>  # Spark will use the stage level requirements to acquire the necessary 
> resources/executors and schedule tasks based on the per stage requirements.
> Many times users have different resource requirements for different stages of 
> their application so they want to be able to configure resources at the stage 
> level. For instance, you have a single job that has 2 stages. The first stage 
> does some  ETL which requires a lot of tasks, each with a small amount of 
> memory and 1 core each. Then you have a second stage where you feed that ETL 
> data into an ML algorithm. The second stage only requires a few executors but 
> each executor needs a lot of memory, GPUs, and many cores.  This feature 
> allows the user to specify the task and executor resource requirements for 
> the ETL Stage and then change them for the ML stage of the job.
> Resources include cpu, memory (on heap, overhead, pyspark, and off heap), and 
> extra Resources (GPU/FPGA/etc). It has the potential to allow for other 
> things like limiting the number of tasks per stage, specifying other 
> parameters for things like shuffle, etc. Changing the executor resources will 
> rely on dynamic allocation being enabled.
> Main use cases:
>  # ML use case where user does ETL and feeds it into an ML algorithm where 
> it’s using the RDD API. This should work with barrier scheduling as well once 
> it supports dynamic allocation.
>  # Spark internal use by catalyst. Catalyst could control the stage level 
> resources as it finds the need to change it between stages for different 
> optimizations. For instance, with the new columnar plugin to the query 
> planner we can insert stages into the plan that would change running 
> something on the CPU in row format to running it on the GPU in columnar 
> format. This API would allow the planner to make sure the stages that run on 
> the GPU get the corresponding GPU resources it needs to run. Another possible 
> use case for catalyst is that it would allow catalyst to add in more 
> optimizations to where the user doesn’t need to configure container sizes at 
> all. If the optimizer/planner can handle that for the user, everyone wins.
> This SPIP focuses on the RDD API but we don’t exclude the Dataset API. I 
> think the DataSet API will require more changes because it specifically hides 
> the RDD from the users via the plans and catalyst can optimize the plan and 
> insert things into the plan. The only way I’ve found to make this work with 
> the Dataset API would be modifying all the plans to be able to get the 
> resource requirements down into where it creates the RDDs, which I believe 
> would be a lot of change.  If other people know better options, it would be 
> great to hear them.
> *Q2.* What problem is this proposal NOT designed to solve?
> The initial implementation is not going to add Dataset APIs.
> We are starting with allowing users to specify a specific set of 
> task/executor resources and plan to design it to be extendable, but the first 
> implementation will not support changing generic SparkConf configs and only 
> specific limited resources.
> This initial version will have a programmatic API for specifying the resource 
> requirements per stage, we can add the ability to perhaps have profiles in 
> the configs later if its useful.
> *Q3.* How is it done today, and what are the limits of current practice?
> Currently this is either done by having multiple spark jobs or requesting 
> containers with the max resources needed for any part of the job.  To do this 
> today, you can break it into separate jobs where each job requests the 
> corresponding resources needed, but then you have to write the data out 
> somewhere and then read it back in between jobs.  This is going to take 
> longer as well as require that job coordination between those to make sure 
> everything works smoothly. Another option would be to request executors with 
> your largest need up front and potentially waste those resources when they 
> aren't being used, which in turn wastes money. For instance, for an ML 
> application where it does ETL first, many times people request containers 
> with GPUs and the GPUs sit idle while the ETL is happening. This is wasting 
> those GPU resources and in turn money because those GPUs could have been used 
> by other applications until they were really needed.  
> Note for the catalyst internal use, that can’t be done today.
> *Q4.* What is new in your approach and why do you think it will be successful?
> This is a new way for users to specify the per stage resource requirements.  
> This will give users and Spark a lot more flexibility within a job and get 
> better utilization of their hardware.
> *Q5.* Who cares? If you are successful, what difference will it make?
> Spark application developers, cluster admins, managers and companies who pay 
> the bills for running Spark. It has the potential to make a huge difference 
> in cost by utilizing resources better and saving developers time.
> I’ve talked to different people from different companies and all of them have 
> said this would be a useful feature for them.
> *Q6.* What are the risks?
> The scoping of the new API could cause some confusion to the user as to which 
> resources actually get used in a stage. If the user has specified different 
> resources in multiple RDDs that get combined into a single stage, the 
> scheduler will have to merge those and come up with a final container size to 
> request. We will have a specific algorithm for merging but if the user 
> doesn’t realize things get combined or that some RDD’s require shuffle, they 
> might get confused.  I don't know how to get around this other then to 
> document the way it works and try to make it obvious to the user what was 
> chosen. See the design doc for options on scoping.
> The cluster managers (like YARN) and dynamic allocation manager have to track 
> everything at a ResourceProfile (specific set of resource requirements) level 
> rather than just a global cores or executors level, so it requires a bunch of 
> data structure changes to those.
> *Q7.* How long will it take?
> I suspect this will take multiple months because it’s a fairly large change.  
> I think we can do it in pieces fairly easily though. For instance, I think we 
> can do the dynamic allocation manager and scheduler, YARN cluster manager, 
> and then finally the RDD API.  We can do the backend pieces first where the 
> global resource configs apply and then once we add in the actual RDD API, it 
> will only be there and only at that point would the user actually see it.  I 
> have a rough prototype of those where I was investigating what would all need 
> to change.
> *Q8.* What are the mid-term and final “exams” to check for success?
> Success is for the user to specify the resources per stage with dynamic 
> allocation and everything to work with it. One stage would run with a set of 
> resources and when the next stage starts with different resources the first 
> stage containers are let go and new ones acquired.  The mid-term might be to 
> put in the changes for the allocation manager and cluster manager and 
> scheduler and have the normal global resource requirements continue to work 
> as expected.
> *Appendix A.* Proposed API Changes. Optional section defining APIs changes, 
> if any. Backward and forward compatibility must be taken into account.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below
> *Appendix B.* Optional Design Sketch: How are the goals going to be 
> accomplished? Give sufficient technical detail to allow a contributor to 
> judge whether it’s likely to be feasible. Note that this is not a full design 
> document.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below
> *Appendix C.* Optional Rejected Designs: What alternatives were considered? 
> Why were they rejected? If no alternatives have been considered, the problem 
> needs more thought.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to