Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-11-03 Thread Mridul Muralidharan
Yes, DAGScheduler is dealing with it at a stage level - and so individual RDD’s DeterministicLevel would be handled in order to determine the stage’s level. Regards, Mridul On Fri, Nov 3, 2023 at 9:45 AM Keyong Zhou wrote: > I checked RDD#getOutputDeterministicLevel and find that if an RDD's

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-11-03 Thread Keyong Zhou
I checked RDD#getOutputDeterministicLevel and find that if an RDD's upstream is INDETERMINATE, then it's also INDETERMINATE. Thanks, Keyong Zhou Keyong Zhou 于2023年11月3日周五 19:57写道: > Hi Mridul, > > I still have a question. DAGScheduler#submitMissingTasks will > only

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-11-03 Thread Keyong Zhou
Hi Mridul, I still have a question. DAGScheduler#submitMissingTasks will only unregisterAllMapAndMergeOutput if the current ShuffleMapStage is Indeterminate. What if the current stage is determinate, but its upstream stage is Indeterminate, and its upstream stage is rerun? Thanks, Keyong Zhou

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-19 Thread Erik fang
Hi Mridul, Your explanation is clear and great Thank you so much! On Fri, Oct 20, 2023 at 11:59 AM Keyong Zhou wrote: > Hi Mridul, thanks for the explanation, it's clear to me now, Thanks! > > Mridul Muralidharan 于2023年10月20日周五 11:15写道: > > > To add my response - what I described (w.r.t

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-19 Thread Keyong Zhou
Hi Mridul, thanks for the explanation, it's clear to me now, Thanks! Mridul Muralidharan 于2023年10月20日周五 11:15写道: > To add my response - what I described (w.r.t failing job) applies only to > ResultStage. > It walks the lineage DAG to identify all indeterminate parents to rollback. > If there

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-19 Thread Mridul Muralidharan
To add my response - what I described (w.r.t failing job) applies only to ResultStage. It walks the lineage DAG to identify all indeterminate parents to rollback. If there are only ShuffleMapStages in the set of stages to rollback, it will simply discard their output, rollback all of them, and

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-19 Thread Mridul Muralidharan
Good question, and ResultStage is actually special cased in spark as its output could have already been consumed (for example collect() to driver, etc) - and so if it is one of the stages which needs to be rolled back, the job is aborted. To illustrate, see the following: -- snip -- package

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-19 Thread Keyong Zhou
In fact, I'm wondering if Spark will rerun the whole reduce ShuffleMapStage if its upstream ShuffleMapStage is INDETERMINATE and rerun. Keyong Zhou 于2023年10月19日周四 23:00写道: > Thanks Erik for bringing up this question, I'm also curious about the > answer, any feedback is appreciated. > > Thanks,

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-19 Thread Keyong Zhou
Thanks Erik for bringing up this question, I'm also curious about the answer, any feedback is appreciated. Thanks, Keyong Zhou Erik fang 于2023年10月19日周四 22:16写道: > Mridul, > > sure, I totally agree SPARK-25299 is a much better solution, as long as we > can get it from spark community > (btw,

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-19 Thread Erik fang
Mridul, sure, I totally agree SPARK-25299 is a much better solution, as long as we can get it from spark community (btw, private[spark] of RDD.outputDeterministicLevel is no big deal, celeborn already has spark-integration code with [spark] scope) I also have a question about INDETERMINATE

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-16 Thread Mridul Muralidharan
On Mon, Oct 16, 2023 at 11:31 AM Erik fang wrote: > Hi Mridul, > > For a), > DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier() > >

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-16 Thread Erik fang
Hi Mridul, For a), DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier() to decide whether the whole stage needs to be recomputed I think

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-14 Thread Mridul Muralidharan
On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan wrote: > > A reducer oriented view of shuffle, especially without replication, could > indeed be susceptible to this issue you described (a single fetch failure > would require all mappers to need to be recomputed) - note, not necessarily > all

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-14 Thread Mridul Muralidharan
A reducer oriented view of shuffle, especially without replication, could indeed be susceptible to this issue you described (a single fetch failure would require all mappers to need to be recomputed) - note, not necessarily all reducers to be recomputed though. Note that I have not looked much

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-14 Thread Mridul Muralidharan
Hi, Spark will try to minimize the recomputation cost as much as possible. For example, if parent stage was DETERMINATE, it simply needs to recompute the missing (mapper) partitions (which resulted in fetch failure). Note, this by itself could require further recomputation in the DAG if the

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-14 Thread Sungwoo Park
a) If one or more tasks for a stage (and so its shuffle id) is going to be recomputed, if it is an INDETERMINATE stage, all shuffle output will be discarded and it will be entirely recomputed (see here

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-13 Thread Mridul Muralidharan
Hi, So there are a couple of things here based on whether the stages are DETERMINATE or INDETERMINATE. The exit I added to my example was to trigger some of these cases, and you can come up with more involved scenarios where this would apply :-) At a high level, we have the following: a) If

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-12 Thread Erik fang
Hi Mridul, sorry for the late reply Per my understanding, the key point about Spark shuffleId and StageId/StageAttemptId is, shuffleId is assigned at ShuffleDependency creation time and bounded to the RDD/ShuffleDependency, while StageId/StageAttemptId is assigned and changes at Job execution

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-08 Thread Sungwoo Park
Hello Keyong, We have implemented the previous plan to generate a fresh shufflId when failing to fetch data from Celeborn workers. Thanks for your comment. While testing Celeborn-MR3 with task re-execution (or stage resubmission), I have noticed that shuffleId does not change its associated

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-06 Thread Keyong Zhou
Hi Sungwoo, Sorry for the late reply. Reusing a committed shuffleId does not work in current architecture, even after calling unregisterShuffle in LifecycleManager, because the cleanup of metadata is delayed and not guaranteed. It will be more complicated when we consider graceful restart of

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-01 Thread Sungwoo Park
Hi Keyong, Instead of picking up a new shuffleId, can we reuse an existing shuffleId after unregistering it? If the following plan worked, it would further simplify the implementation: 1. Downstream tasks fail because of read failures. 2. All active downstream tasks are killed, so the

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-09-30 Thread Keyong Zhou
Hi Sungwoo, I think your approach works with current architecture of Celeborn, and interpreting IOException when reading as read failure makes sense. Currently only when CommitFiles fails will LifecycleManager announce data lost. Thanks, Keyong Zhou Sungwoo Park 于2023年9月29日周五 22:05写道: > >

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-09-29 Thread Sungwoo Park
Since the partition split has a good chance to contain data from almost all upstream mapper tasks, the cost of re-computing all upstream tasks may have little difference to re-computing the actual mapper tasks in most cases. Of course it's not always true. To change from 'complete' to

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-09-29 Thread Keyong Zhou
Since the partition split has a good chance to contain data from almost all upstream mapper tasks, the cost of re-computing all upstream tasks may have little difference to re-computing the actual mapper tasks in most cases. Of course it's not always true. To change from 'complete' to

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-09-29 Thread Keyong Zhou
Hi Sungwoo, Thanks for your reply. For the required two features you mentioned, here is my understanding. 1. Currently Celeborn Worker will track map indices for each partition split file if `celeborn.client.shuffle.rangeReadFilter.enabled` is enabled. This config's original purpose is to filter

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-09-27 Thread orpl
Hello, As we are developing MR3 extension for Celeborn, I would like to add my comments on stage re-run in the context of using Celeborn for MR3. I don't know the internal details of Spark stage re-run very well, so my apology if my comments are irrelevant to the proposal in the design

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-09-25 Thread Erik fang
Thanks Mridul Your comments provide lots of information and are very helpful, highly appreciated! For a, I agree that restart SPARK-25299 requires big effort, and I don't think we can get it merged to Spark in short time For b & c, I've checked DeterministicLevel/Barrier RDD related codes in

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-09-23 Thread Mridul Muralidharan
Hi, I am not yet very familiar with Celeborn, so will restrict my notes on the proposal in context to Apache Spark: a) For Option 1, there is SPARK-25299 - which was started a few years back. Unfortunately, the work there has stalled: but if there is interest in pushing that forward, I can

[PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-09-22 Thread Erik fang
Hi folks, I have a proposal to implement Spark stage resubmission to handle shuffle fetch failure in Celeborn https://docs.google.com/document/d/1dkG6fww3g99VAb1wkphNlUES_MpngVPNg8601chmVp8 please have a look and let me know what you think Regards, Erik