[ 
https://issues.apache.org/jira/browse/SPARK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16921608#comment-16921608
 ] 

Thomas Graves commented on SPARK-27495:
---------------------------------------

Thanks for the feedback [~felixcheung]

1.   yes this is out of scope of this  SPIP, my statement was intended to just 
say this would allow for that in the future if we decided to do it.  I updated 
the description hopefully that is more clear.

2. We could certainly fail if there is a conflict for now and ask the user to 
resolve and fix their code.  I am a little worried users will hit  this fairly 
easily once it starts to get used and in most cases I would think doing a 
simple max or sum would be sufficient.   Perhaps if we start with a config for 
this and have it fail by default, but allow them to turn it off and it does 
something static like a max.  Then we can see how things go an potentially add 
in other options,. thoughts?

3.  Yeah I agree for ML use cases the strict mode makes more sensee to me, I 
think the hint could come into play more in the ETL side and I think even that 
would be unusual. If you are using those extra resources its more then likely 
to make things go faster so I would think you would want all things to run on 
those, but I guess if you have a shared cluster and those resources are in high 
demand it could be faster just to run on whatever is available.  The API would 
allow us to add it in the future but its not included in the first go here.

> SPIP: Support Stage level resource configuration and scheduling
> ---------------------------------------------------------------
>
>                 Key: SPARK-27495
>                 URL: https://issues.apache.org/jira/browse/SPARK-27495
>             Project: Spark
>          Issue Type: Epic
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Thomas Graves
>            Assignee: Thomas Graves
>            Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> Objectives:
>  # Allow users to specify task and executor resource requirements at the 
> stage level. 
>  # Spark will use the stage level requirements to acquire the necessary 
> resources/executors and schedule tasks based on the per stage requirements.
> Many times users have different resource requirements for different stages of 
> their application so they want to be able to configure resources at the stage 
> level. For instance, you have a single job that has 2 stages. The first stage 
> does some  ETL which requires a lot of tasks, each with a small amount of 
> memory and 1 core each. Then you have a second stage where you feed that ETL 
> data into an ML algorithm. The second stage only requires a few executors but 
> each executor needs a lot of memory, GPUs, and many cores.  This feature 
> allows the user to specify the task and executor resource requirements for 
> the ETL Stage and then change them for the ML stage of the job. 
> Resources include cpu, memory (on heap, overhead, pyspark, and off heap), and 
> extra Resources (GPU/FPGA/etc). It has the potential to allow for other 
> things like limiting the number of tasks per stage, specifying other 
> parameters for things like shuffle, etc. Initially I would propose we only 
> support resources as they are now. So Task resources would be cpu and other 
> resources (GPU, FPGA), that way we aren't adding in extra scheduling things 
> at this point.  Executor resources would be cpu, memory, and extra 
> resources(GPU,FPGA, etc). Changing the executor resources will rely on 
> dynamic allocation being enabled.
> Main use cases:
>  # ML use case where user does ETL and feeds it into an ML algorithm where 
> it’s using the RDD API. This should work with barrier scheduling as well once 
> it supports dynamic allocation.
>  # This adds the framework/api for Spark's own internal use.  In the future 
> (not covered by this SPIP), Catalyst could control the stage level resources 
> as it finds the need to change it between stages for different optimizations. 
> For instance, with the new columnar plugin to the query planner we can insert 
> stages into the plan that would change running something on the CPU in row 
> format to running it on the GPU in columnar format. This API would allow the 
> planner to make sure the stages that run on the GPU get the corresponding GPU 
> resources it needs to run. Another possible use case for catalyst is that it 
> would allow catalyst to add in more optimizations to where the user doesn’t 
> need to configure container sizes at all. If the optimizer/planner can handle 
> that for the user, everyone wins.
> This SPIP focuses on the RDD API but we don’t exclude the Dataset API. I 
> think the DataSet API will require more changes because it specifically hides 
> the RDD from the users via the plans and catalyst can optimize the plan and 
> insert things into the plan. The only way I’ve found to make this work with 
> the Dataset API would be modifying all the plans to be able to get the 
> resource requirements down into where it creates the RDDs, which I believe 
> would be a lot of change.  If other people know better options, it would be 
> great to hear them.
> *Q2.* What problem is this proposal NOT designed to solve?
> The initial implementation is not going to add Dataset APIs.
> We are starting with allowing users to specify a specific set of 
> task/executor resources and plan to design it to be extendable, but the first 
> implementation will not support changing generic SparkConf configs and only 
> specific limited resources.
> This initial version will have a programmatic API for specifying the resource 
> requirements per stage, we can add the ability to perhaps have profiles in 
> the configs later if its useful.
> *Q3.* How is it done today, and what are the limits of current practice?
> Currently this is either done by having multiple spark jobs or requesting 
> containers with the max resources needed for any part of the job.  To do this 
> today, you can break it into separate jobs where each job requests the 
> corresponding resources needed, but then you have to write the data out 
> somewhere and then read it back in between jobs.  This is going to take 
> longer as well as require that job coordination between those to make sure 
> everything works smoothly. Another option would be to request executors with 
> your largest need up front and potentially waste those resources when they 
> aren't being used, which in turn wastes money. For instance, for an ML 
> application where it does ETL first, many times people request containers 
> with GPUs and the GPUs sit idle while the ETL is happening. This is wasting 
> those GPU resources and in turn money because those GPUs could have been used 
> by other applications until they were really needed.  
> Note for the catalyst internal use, that can’t be done today.
> *Q4.* What is new in your approach and why do you think it will be successful?
> This is a new way for users to specify the per stage resource requirements.  
> This will give users and Spark a lot more flexibility within a job and get 
> better utilization of their hardware.
> *Q5.* Who cares? If you are successful, what difference will it make?
> Spark application developers, cluster admins, managers and companies who pay 
> the bills for running Spark. It has the potential to make a huge difference 
> in cost by utilizing resources better and saving developers time.
> I’ve talked to different people from different companies and all of them have 
> said this would be a useful feature for them.
> *Q6.* What are the risks?
> The scoping of the new API could cause some confusion to the user as to which 
> resources actually get used in a stage. If the user has specified different 
> resources in multiple RDDs that get combined into a single stage, the 
> scheduler will have to merge those and come up with a final container size to 
> request. We will have a specific algorithm for merging but if the user 
> doesn’t realize things get combined or that some RDD’s require shuffle, they 
> might get confused.  I don't know how to get around this other then to 
> document the way it works and try to make it obvious to the user what was 
> chosen. See the design doc for options on scoping.
> The cluster managers (like YARN) and dynamic allocation manager have to track 
> everything at a ResourceProfile (specific set of resource requirements) level 
> rather than just a global cores or executors level, so it requires a bunch of 
> data structure changes to those.
> *Q7.* How long will it take?
> I suspect this will take multiple months because it’s a fairly large change.  
> I think we can do it in pieces fairly easily though. For instance, I think we 
> can do the dynamic allocation manager and scheduler, YARN cluster manager, 
> and then finally the RDD API.  We can do the backend pieces first where the 
> global resource configs apply and then once we add in the actual RDD API, it 
> will only be there and only at that point would the user actually see it.  I 
> have a rough prototype of those where I was investigating what would all need 
> to change.
> *Q8.* What are the mid-term and final “exams” to check for success?
> Success is for the user to specify the resources per stage with dynamic 
> allocation and everything to work with it. One stage would run with a set of 
> resources and when the next stage starts with different resources the first 
> stage containers are let go and new ones acquired.  The mid-term might be to 
> put in the changes for the allocation manager and cluster manager and 
> scheduler and have the normal global resource requirements continue to work 
> as expected.
> *Appendix A.* Proposed API Changes. Optional section defining APIs changes, 
> if any. Backward and forward compatibility must be taken into account.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below
> *Appendix B.* Optional Design Sketch: How are the goals going to be 
> accomplished? Give sufficient technical detail to allow a contributor to 
> judge whether it’s likely to be feasible. Note that this is not a full design 
> document.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below
> *Appendix C.* Optional Rejected Designs: What alternatives were considered? 
> Why were they rejected? If no alternatives have been considered, the problem 
> needs more thought.
> I split the appendices out into a google doc since it was getting big and to 
> allow inline comments, see link below



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to