[ https://issues.apache.org/jira/browse/SPARK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-27495: --------------------------------- Target Version/s: 3.2.0 (was: 3.1.0) > SPIP: Support Stage level resource configuration and scheduling > --------------------------------------------------------------- > > Key: SPARK-27495 > URL: https://issues.apache.org/jira/browse/SPARK-27495 > Project: Spark > Issue Type: Epic > Components: Spark Core > Affects Versions: 3.0.0 > Reporter: Thomas Graves > Assignee: Thomas Graves > Priority: Major > Labels: SPIP > > *Q1.* What are you trying to do? Articulate your objectives using absolutely > no jargon. > Objectives: > # Allow users to specify task and executor resource requirements at the > stage level. > # Spark will use the stage level requirements to acquire the necessary > resources/executors and schedule tasks based on the per stage requirements. > Many times users have different resource requirements for different stages of > their application so they want to be able to configure resources at the stage > level. For instance, you have a single job that has 2 stages. The first stage > does some ETL which requires a lot of tasks, each with a small amount of > memory and 1 core each. Then you have a second stage where you feed that ETL > data into an ML algorithm. The second stage only requires a few executors but > each executor needs a lot of memory, GPUs, and many cores. This feature > allows the user to specify the task and executor resource requirements for > the ETL Stage and then change them for the ML stage of the job. > Resources include cpu, memory (on heap, overhead, pyspark, and off heap), and > extra Resources (GPU/FPGA/etc). It has the potential to allow for other > things like limiting the number of tasks per stage, specifying other > parameters for things like shuffle, etc. Initially I would propose we only > support resources as they are now. So Task resources would be cpu and other > resources (GPU, FPGA), that way we aren't adding in extra scheduling things > at this point. Executor resources would be cpu, memory, and extra > resources(GPU,FPGA, etc). Changing the executor resources will rely on > dynamic allocation being enabled. > Main use cases: > # ML use case where user does ETL and feeds it into an ML algorithm where > it’s using the RDD API. This should work with barrier scheduling as well once > it supports dynamic allocation. > # This adds the framework/api for Spark's own internal use. In the future > (not covered by this SPIP), Catalyst could control the stage level resources > as it finds the need to change it between stages for different optimizations. > For instance, with the new columnar plugin to the query planner we can insert > stages into the plan that would change running something on the CPU in row > format to running it on the GPU in columnar format. This API would allow the > planner to make sure the stages that run on the GPU get the corresponding GPU > resources it needs to run. Another possible use case for catalyst is that it > would allow catalyst to add in more optimizations to where the user doesn’t > need to configure container sizes at all. If the optimizer/planner can handle > that for the user, everyone wins. > This SPIP focuses on the RDD API but we don’t exclude the Dataset API. I > think the DataSet API will require more changes because it specifically hides > the RDD from the users via the plans and catalyst can optimize the plan and > insert things into the plan. The only way I’ve found to make this work with > the Dataset API would be modifying all the plans to be able to get the > resource requirements down into where it creates the RDDs, which I believe > would be a lot of change. If other people know better options, it would be > great to hear them. > *Q2.* What problem is this proposal NOT designed to solve? > The initial implementation is not going to add Dataset APIs. > We are starting with allowing users to specify a specific set of > task/executor resources and plan to design it to be extendable, but the first > implementation will not support changing generic SparkConf configs and only > specific limited resources. > This initial version will have a programmatic API for specifying the resource > requirements per stage, we can add the ability to perhaps have profiles in > the configs later if its useful. > *Q3.* How is it done today, and what are the limits of current practice? > Currently this is either done by having multiple spark jobs or requesting > containers with the max resources needed for any part of the job. To do this > today, you can break it into separate jobs where each job requests the > corresponding resources needed, but then you have to write the data out > somewhere and then read it back in between jobs. This is going to take > longer as well as require that job coordination between those to make sure > everything works smoothly. Another option would be to request executors with > your largest need up front and potentially waste those resources when they > aren't being used, which in turn wastes money. For instance, for an ML > application where it does ETL first, many times people request containers > with GPUs and the GPUs sit idle while the ETL is happening. This is wasting > those GPU resources and in turn money because those GPUs could have been used > by other applications until they were really needed. > Note for the catalyst internal use, that can’t be done today. > *Q4.* What is new in your approach and why do you think it will be successful? > This is a new way for users to specify the per stage resource requirements. > This will give users and Spark a lot more flexibility within a job and get > better utilization of their hardware. > *Q5.* Who cares? If you are successful, what difference will it make? > Spark application developers, cluster admins, managers and companies who pay > the bills for running Spark. It has the potential to make a huge difference > in cost by utilizing resources better and saving developers time. > I’ve talked to different people from different companies and all of them have > said this would be a useful feature for them. > *Q6.* What are the risks? > The scoping of the new API could cause some confusion to the user as to which > resources actually get used in a stage. If the user has specified different > resources in multiple RDDs that get combined into a single stage, the > scheduler will have to merge those and come up with a final container size to > request. We will have a specific algorithm for merging but if the user > doesn’t realize things get combined or that some RDD’s require shuffle, they > might get confused. I don't know how to get around this other then to > document the way it works and try to make it obvious to the user what was > chosen. Another option here is to have it fail if it gets a conflict to make > sure the user is aware. We could have a config flag for this to have it fail > first and then they could allow it to by turning the config on. See the > design doc for options on scoping. > The cluster managers (like YARN) and dynamic allocation manager have to track > everything at a ResourceProfile (specific set of resource requirements) level > rather than just a global cores or executors level, so it requires a bunch of > data structure changes to those. > *Q7.* How long will it take? > I suspect this will take multiple months because it’s a fairly large change. > I think we can do it in pieces fairly easily though. For instance, I think we > can do the dynamic allocation manager and scheduler, YARN cluster manager, > and then finally the RDD API. We can do the backend pieces first where the > global resource configs apply and then once we add in the actual RDD API, it > will only be there and only at that point would the user actually see it. I > have a rough prototype of those where I was investigating what would all need > to change. > *Q8.* What are the mid-term and final “exams” to check for success? > Success is for the user to specify the resources per stage with dynamic > allocation and everything to work with it. One stage would run with a set of > resources and when the next stage starts with different resources the first > stage containers are let go and new ones acquired. The mid-term might be to > put in the changes for the allocation manager and cluster manager and > scheduler and have the normal global resource requirements continue to work > as expected. > *Appendix A.* Proposed API Changes. Optional section defining APIs changes, > if any. Backward and forward compatibility must be taken into account. > I split the appendices out into a google doc since it was getting big and to > allow inline comments, see link below > *Appendix B.* Optional Design Sketch: How are the goals going to be > accomplished? Give sufficient technical detail to allow a contributor to > judge whether it’s likely to be feasible. Note that this is not a full design > document. > I split the appendices out into a google doc since it was getting big and to > allow inline comments, see link below > *Appendix C.* Optional Rejected Designs: What alternatives were considered? > Why were they rejected? If no alternatives have been considered, the problem > needs more thought. > I split the appendices out into a google doc since it was getting big and to > allow inline comments, see link below -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org