Hi all, Sorry for the delay in getting the first draft of (my first) SPIP out. https://docs.google.com/document/d/1hxEPUirf3eYwNfMOmUHpuI5dIt_HJErCdo7_yr9htQc/edit?pli=1
Let me know what you think. Regards kalyan. On Sat, Jan 20, 2024 at 8:19 AM Ashish Singh <asi...@apache.org> wrote: > Hey all, > > Thanks for this discussion, the timing of this couldn't be better! > > At Pinterest, we recently started to look into reducing OOM failures while > also reducing memory consumption of spark applications. We considered the > following options. > 1. Changing core count on executor to change memory available per task in > the executor. > 2. Changing resource profile based on task failures and gc metrics to grow > or shrink executor memory size. We do this at application level based on > the app's past runs today. > 3. K8s vertical pod autoscaler > <https://github.com/kubernetes/autoscaler/tree/master/vertical-pod-autoscaler> > > Internally, we are mostly getting aligned on option 2. We would love to > make this happen and are looking forward to the SPIP. > > > On Wed, Jan 17, 2024 at 9:34 AM Mridul Muralidharan <mri...@gmail.com> > wrote: > >> >> Hi, >> >> We are internally exploring adding support for dynamically changing the >> resource profile of a stage based on runtime characteristics. >> This includes failures due to OOM and the like, slowness due to excessive >> GC, resource wastage due to excessive overprovisioning, etc. >> Essentially handles scale up and scale down of resources. >> Instead of baking these into the scheduler directly (which is already >> complex), we are modeling it as a plugin - so that the 'business logic' of >> how to handle task events and mutate state is pluggable. >> >> The main limitation I find with mutating only the cores is the limits it >> places on what kind of problems can be solved with it - and mutating >> resource profiles is a much more natural way to handle this >> (spark.task.cpus predates RP). >> >> Regards, >> Mridul >> >> On Wed, Jan 17, 2024 at 9:18 AM Tom Graves <tgraves...@yahoo.com.invalid> >> wrote: >> >>> It is interesting. I think there are definitely some discussion points >>> around this. reliability vs performance is always a trade off and its >>> great it doesn't fail but if it doesn't meet someone's SLA now that could >>> be as bad if its hard to figure out why. I think if something like this >>> kicks in, it needs to be very obvious to the user so they can see that it >>> occurred. Do you have something in place on UI or something that indicates >>> this? The nice thing is also you aren't wasting memory by increasing it for >>> all tasks when maybe you only need it for one or two. The downside is you >>> are only finding out after failure. >>> >>> I do also worry a little bit that in your blog post, the error you >>> pointed out isn't a java OOM but an off heap memory issue (overhead + heap >>> usage). You don't really address heap memory vs off heap in that article. >>> Only thing I see mentioned is spark.executor.memory which is heap memory. >>> Obviously adjusting to only run one task is going to give that task more >>> overall memory but the reasons its running out in the first place could be >>> different. If it was on heap memory for instance with more tasks I would >>> expect to see more GC and not executor OOM. If you are getting executor >>> OOM you are likely using more off heap memory/stack space, etc then you >>> allocated. Ultimately it would be nice to know why that is happening and >>> see if we can address it to not fail in the first place. That could be >>> extremely difficult though, especially if using software outside Spark that >>> is using that memory. >>> >>> As Holden said, we need to make sure this would play nice with the >>> resource profiles, or potentially if we can use the resource profile >>> functionality. Theoretically you could extend this to try to get new >>> executor if using dynamic allocation for instance. >>> >>> I agree doing a SPIP would be a good place to start to have more >>> discussions. >>> >>> Tom >>> >>> On Wednesday, January 17, 2024 at 12:47:51 AM CST, kalyan < >>> justfors...@gmail.com> wrote: >>> >>> >>> Hello All, >>> >>> At Uber, we had recently, done some work on improving the reliability of >>> spark applications in scenarios of fatter executors going out of memory and >>> leading to application failure. Fatter executors are those that have more >>> than 1 task running on it at a given time concurrently. This has >>> significantly improved the reliability of many spark applications for us at >>> Uber. We made a blog about this recently. Link: >>> https://www.uber.com/en-US/blog/dynamic-executor-core-resizing-in-spark/ >>> >>> At a high level, we have done the below changes: >>> >>> 1. When a Task fails with the OOM of an executor, we update the core >>> requirements of the task to max executor cores. >>> 2. When the task is picked for rescheduling, the new attempt of the >>> task happens to be on an executor where no other task can run >>> concurrently. >>> All cores get allocated to this task itself. >>> 3. This way we ensure that the configured memory is completely at >>> the disposal of a single task. Thus eliminating contention of memory. >>> >>> The best part of this solution is that it's reactive. It kicks in only >>> when the executors fail with the OOM exception. >>> >>> We understand that the problem statement is very common and we expect >>> our solution to be effective in many cases. >>> >>> There could be more cases that can be covered. Executor failing with OOM >>> is like a hard signal. The framework(making the driver aware of >>> what's happening with the executor) can be extended to handle scenarios of >>> other forms of memory pressure like excessive spilling to disk, etc. >>> >>> While we had developed this on Spark 2.4.3 in-house, we would like to >>> collaborate and contribute this work to the latest versions of Spark. >>> >>> What is the best way forward here? Will an SPIP proposal to detail the >>> changes help? >>> >>> Regards, >>> Kalyan. >>> Uber India. >>> >>