Re: [ANNOUNCE] Apache Celeborn is graduated to Top Level Project

2024-03-27 Thread Erik fang
Congratulations!

Regards,
Erik

On Wed, Mar 27, 2024 at 1:48 PM Mridul Muralidharan 
wrote:

> Congratulations !!
>
> Regards,
> Mridul
>
>
> On Tue, Mar 26, 2024 at 11:54 PM Nicholas Jiang 
> wrote:
>
> > Congratulations! Witness the continuous development of the
> > community.Regards,
> > Nicholas Jiang
> > At 2024-03-25 20:49:36, "Ethan Feng"  wrote:
> > >Hello Celeborn community,
> > >
> > >I am glad to share that the ASF board has approved a resolution to
> > >graduate Celeborn into a full Top Level Project. Thank you all for
> > >your help in reaching this milestone.
> > >
> > >To transition from the Apache Incubator to a new TLP, there are a few
> > >action items[1] we need to complete the transition. I have opened an
> > >Umbrella Issue[2] to track the tasks, and you are welcome to take on
> > >the sub-tasks and leave comments if I have missed anything.
> > >
> > >Additionally, the GitHub repository migration is already complete[3].
> > >Please update your local git repository to track the new repo[4]. If
> > >you named the upstream as "apache", you can run the following command
> > >to complete the remote repo tracking migration.
> > >
> > >` git remote set-url apache g...@github.com:apache/celeborn.git `
> > >
> > >Please find the relevant URLs below:
> > >[1]
> >
> https://incubator.apache.org/guides/transferring.html#life_after_graduation
> > >[2] https://github.com/apache/celeborn/issues/2415
> > >[3] https://issues.apache.org/jira/browse/INFRA-25635
> > >[4] https://github.com/apache/celeborn
> > >
> > >Thanks,
> > >Ethan Feng
> >
>


Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-19 Thread Erik fang
Hi Mridul, Your explanation is clear and great

Thank you so much!

On Fri, Oct 20, 2023 at 11:59 AM Keyong Zhou  wrote:

> Hi Mridul, thanks for the explanation, it's clear to me now, Thanks!
>
> Mridul Muralidharan  于2023年10月20日周五 11:15写道:
>
> > To add my response - what I described (w.r.t failing job) applies only to
> > ResultStage.
> > It walks the lineage DAG to identify all indeterminate parents to
> rollback.
> > If there are only ShuffleMapStages in the set of stages to rollback, it
> > will simply discard their output, rollback all of them, and then retry
> > these stages (same shuffle-id, a new stage attempt)
> >
> >
> > Regards,
> > Mridul
> >
> >
> >
> > On Thu, Oct 19, 2023 at 10:08 PM Mridul Muralidharan 
> > wrote:
> >
> > >
> > > Good question, and ResultStage is actually special cased in spark as
> its
> > > output could have already been consumed (for example collect() to
> driver,
> > > etc) - and so if it is one of the stages which needs to be rolled back,
> > the
> > > job is aborted.
> > >
> > > To illustrate, see the following:
> > > -- snip --
> > >
> > > package org.apache.spark
> > >
> > >
> > > import scala.reflect.ClassTag
> > >
> > > import org.apache.spark._
> > > import org.apache.spark.rdd.{DeterministicLevel, RDD}
> > >
> > > class DelegatingRDD[E: ClassTag](delegate: RDD[E]) extends
> > RDD[E](delegate) {
> > >
> > >   override def compute(split: Partition, context: TaskContext):
> > Iterator[E] = {
> > > delegate.compute(split, context)
> > >   }
> > >
> > >   override protected def getPartitions: Array[Partition] =
> > > delegate.partitions
> > > }
> > >
> > > class IndeterminateRDD[E: ClassTag](delegate: RDD[E]) extends
> > DelegatingRDD[E](delegate) {
> > >   override def getOutputDeterministicLevel: DeterministicLevel.Value =
> > DeterministicLevel.INDETERMINATE
> > > }
> > >
> > > class FailingRDD[E: ClassTag](delegate: RDD[E]) extends
> > DelegatingRDD[E](delegate) {
> > >   override def compute(split: Partition, context: TaskContext):
> > Iterator[E] = {
> > > val tc = TaskContext.get
> > > if (tc.stageAttemptNumber() == 0 && tc.partitionId() == 0 &&
> > tc.attemptNumber() == 0) {
> > >   // Wait for all tasks to be done, then call exit
> > >   Thread.sleep(5000)
> > >   System.exit(-1)
> > > }
> > > delegate.compute(split, context)
> > >   }
> > > }
> > >
> > > // Make sure test_output directory is deleted before running this.
> > > //
> > > object Test {
> > >
> > >   def main(args: Array[String]): Unit = {
> > > val conf = new SparkConf().setMaster("local-cluster[4,1,1024]")
> > > val sc = new SparkContext(conf)
> > >
> > > val mapperRdd = new IndeterminateRDD(sc.parallelize(0 until 1,
> > 20).map(v => (v, v)))
> > > val resultRdd = new FailingRDD(mapperRdd.groupByKey())
> > > resultRdd.saveAsTextFile("test_output")
> > >   }
> > > }
> > >
> > > -- snip --
> > >
> > >
> > >
> > > Here, the mapper stage has been forced to be INDETERMINATE.
> > > In the reducer stage, the first attempt to compute partition 0 will
> wait
> > for a bit and then exit - since the master is a local-cluster, this
> results
> > in FetchFailure when the second attempt of partition 0 tries to fetch
> > shuffle data.
> > > When spark tries to regenerate parent shuffle output, it sees that the
> > parent is INDETERMINATE - and so fails the entire job.with the message:
> > > "
> > > org.apache.spark.SparkException: Job aborted due to stage failure: A
> > shuffle map stage with indeterminate output was failed and retried.
> > However, Spark cannot rollback the ResultStage 1 to re-process the input
> > data, and has to fail this job. Please eliminate the indeterminacy by
> > checkpointing the RDD before repartition and try again.
> > > "
> > >
> > > This is coming from here <
> >
> https://github.com/apache/spark/blob/28292d51e7dbe2f3488e82435abb48d3d31f6044/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2090
> >
> > - when rolling back stages, if spark determines that a ResultStage needs
> to
> > 

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-19 Thread Erik fang
Mridul,

sure, I totally agree SPARK-25299 is a much better solution, as long as we
can get it from spark community
(btw, private[spark] of RDD.outputDeterministicLevel is no big deal,
celeborn already has spark-integration code with  [spark] scope)

I also have a question about INDETERMINATE stage recompute, and may need
your help
The rule for INDETERMINATE ShuffleMapStage rerun is reasonable, however, I
don't find related logic for INDETERMINATE ResultStage rerun in DAGScheduler
If INDETERMINATE ShuffleMapStage got entirely recomputed, the
corresponding ResultStage should be entirely recomputed as well, per my
understanding

I found https://issues.apache.org/jira/browse/SPARK-25342 to rollback a
ResultStage but it was not merged
Do you know any context or related ticket for INDETERMINATE ResultStage
rerun?

Thanks in advance!

Regards,
Erik

On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan 
wrote:

>
>
> On Mon, Oct 16, 2023 at 11:31 AM Erik fang  wrote:
>
>> Hi Mridul,
>>
>> For a),
>> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()
>> <https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975>
>> to decide whether the whole stage needs to be recomputed
>> I think we can pass the same information to Celeborn in
>> ShuffleManager.registerShuffle()
>> <https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39>,
>>  since
>> RDD in ShuffleDependency contains the RDD object
>> It seems Stage.isIndeterminate() is unreadable from ShuffleDependency,
>> but luckily rdd is used internally
>>
>> def isIndeterminate: Boolean = {
>>   rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
>> }
>>
>> Relies on internal implementation is not good, but doable.
>> I don't expect spark RDD/Stage implementation changes frequently, and we
>> can discuss with Spark community for a RDD isIndeterminate API if they
>> change it in the future
>>
>
>
> Only RDD.getOutputDeterministicLevel is publicly exposed,
> RDD.outputDeterministicLevel is not and it is private[spark].
> While I dont expect changes to this, it is inherently unstable to depend
> on it.
>
> Btw, please see the discussion with Sungwoo Park, if Celeborn is
> maintaining a reducer oriented view, you will need to recompute all the
> mappers anyway - what you might save is the subset of reducer partitions
> which can be skipped if it is DETERMINATE.
>
>
>
>
>>
>> for c)
>> I also considered a similar solution in celeborn
>> Celeborn (LifecycleManager) can get the full picture of remaining shuffle
>> data from previous stage attempt and reuse it in stage recompute
>> , and the whole process will be transparent to Spark/DagScheduler
>>
>
> Celeborn does not have visibility into this - and this is potentially
> subject to invasive changes in Apache Spark as it evolves.
> For example, I recently merged a couple of changes which would make this
> different in master compared to previous versions.
> Until the remote shuffle service SPIP is implemented and these are
> abstracted out & made pluggable, it will continue to be quite volatile.
>
> Note that the behavior for 3.5 and older is known - since Spark versions
> have been released - it is the behavior in master and future versions of
> Spark which is subject to change.
> So delivering on SPARK-25299 would future proof all remote shuffle
> implementations.
>
>
> Regards,
> Mridul
>
>
>
>>
>> Per my perspective, leveraging partial stage recompute and
>> remaining shuffle data needs a lot of work to do in Celeborn
>> I prefer to implement a simple whole stage recompute first with interface
>> defined with recomputeAll = true flag, and explore partial stage recompute
>> in seperate ticket as future optimization
>> How do you think about it?
>>
>> Regards,
>> Erik
>>
>>
>> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan 
>> wrote:
>>
>>>
>>>
>>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan 
>>> wrote:
>>>
>>>>
>>>> A reducer oriented view of shuffle, especially without replication,
>>>> could indeed be susceptible to this issue you described (a single fetch
>>>> failure would require all mappers to need to be recomputed) - note, not
>>>> necessarily all reducers to be recomputed though.
>>>>
>>>> Note that I have not looked much into Celeborn specifically on this
>>>

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-16 Thread Erik fang
Hi Mridul,

For a),
DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()

to decide whether the whole stage needs to be recomputed
I think we can pass the same information to Celeborn in
ShuffleManager.registerShuffle()
,
since
RDD in ShuffleDependency contains the RDD object
It seems Stage.isIndeterminate() is unreadable from ShuffleDependency, but
luckily rdd is used internally

def isIndeterminate: Boolean = {
  rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
}

Relies on internal implementation is not good, but doable.
I don't expect spark RDD/Stage implementation changes frequently, and we
can discuss with Spark community for a RDD isIndeterminate API if they
change it in the future

for c)
I also considered a similar solution in celeborn
Celeborn (LifecycleManager) can get the full picture of remaining shuffle
data from previous stage attempt and reuse it in stage recompute
, and the whole process will be transparent to Spark/DagScheduler

Per my perspective, leveraging partial stage recompute and
remaining shuffle data needs a lot of work to do in Celeborn
I prefer to implement a simple whole stage recompute first with interface
defined with recomputeAll = true flag, and explore partial stage recompute
in seperate ticket as future optimization
How do you think about it?

Regards,
Erik


On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan 
wrote:

>
>
> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan 
> wrote:
>
>>
>> A reducer oriented view of shuffle, especially without replication, could
>> indeed be susceptible to this issue you described (a single fetch failure
>> would require all mappers to need to be recomputed) - note, not necessarily
>> all reducers to be recomputed though.
>>
>> Note that I have not looked much into Celeborn specifically on this
>> aspect yet, so my comments are *fairly* centric to Spark internals :-)
>>
>> Regards,
>> Mridul
>>
>>
>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park  wrote:
>>
>>> Hello,
>>>
>>> (Sorry for sending the same message again.)
>>>
>>> From my understanding, the current implementation of Celeborn makes it
>>> hard to find out which mapper should be re-executed when a partition cannot
>>> be read, and we should re-execute all the mappers in the upstream stage. If
>>> we can find out which mapper/partition should be re-executed, the current
>>> logic of stage recomputation could be (partially or totally) reused.
>>>
>>> Regards,
>>>
>>> --- Sungwoo
>>>
>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan 
>>> wrote:
>>>

 Hi,

   Spark will try to minimize the recomputation cost as much as possible.
 For example, if parent stage was DETERMINATE, it simply needs to
 recompute the missing (mapper) partitions (which resulted in fetch
 failure). Note, this by itself could require further recomputation in the
 DAG if the inputs required to comput the parent partitions are missing, and
 so on - so it is dynamic.

 Regards,
 Mridul

 On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park 
 wrote:

> > a) If one or more tasks for a stage (and so its shuffle id) is going
> to be
> > recomputed, if it is an INDETERMINATE stage, all shuffle output will
> be
> > discarded and it will be entirely recomputed (see here
> > <
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
> >
> > ).
>
> If a reducer (in a downstream stage) fails to read data, can we find
> out
> which tasks should recompute their output? From the previous
> discussion, I
> thought this was hard (in the current implementation), and we should
> re-execute all tasks in the upstream stage.
>
> Thanks,
>
> --- Sungwoo
>



Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-12 Thread Erik fang
Hi Mridul,

sorry for the late reply

Per my understanding, the key point about Spark shuffleId and
StageId/StageAttemptId is,
shuffleId is assigned at ShuffleDependency creation time and bounded to the
RDD/ShuffleDependency, while StageId/StageAttemptId is assigned and changes
at Job execution time
In the RDD example, there are two shuffle data with id 0 and 1,  and those
shuffle data are expected to be accessed(read/write) with the correct
shuffle id
and we can do that in celeborn with shuffle id mapping

I made some small modification to the example to avoid exit , and grab some
logs for Spark DAGScheduler/Celeborn LifecycleManager to help explain

import org.apache.spark.TaskContext

val rdd1 = sc.parallelize(0 until 1, 20).map(v => (v, v)).groupByKey()
val rdd2 = rdd1.mapPartitions { iter =>
  val tc = TaskContext.get()
  println("print stageAttemptNumber " + tc.stageAttemptNumber())
  iter
}

rdd2.count()
rdd2.map(v => (v._1, v._2)).groupByKey().count()

*// DAGScheduler starts job 0, submit ShuffleMapStage 0 for spark shuffle-0*
23/10/12 12:58:14 INFO DAGScheduler: Registering RDD 1 (map at :24)
as input to shuffle 0
23/10/12 12:58:14 INFO DAGScheduler: Got job 0 (count at :25) with
20 output partitions
23/10/12 12:58:14 INFO DAGScheduler: Final stage: ResultStage 1 (count at <
console>:25)
23/10/12 12:58:14 INFO DAGScheduler: Parents of final stage:
List(ShuffleMapStage 0)
23/10/12 12:58:14 INFO DAGScheduler: Missing parents: List(ShuffleMapStage
0)
23/10/12 12:58:14 DEBUG DAGScheduler: submitStage(ResultStage 1 (name=count
at :25;jobs=0))
23/10/12 12:58:14 DEBUG DAGScheduler: missing: List(ShuffleMapStage 0)
23/10/12 12:58:14 DEBUG DAGScheduler: submitStage(ShuffleMapStage 0
(name=map at :24;jobs=0))
23/10/12 12:58:14 DEBUG DAGScheduler: missing: List()
23/10/12 12:58:14 INFO DAGScheduler: Submitting ShuffleMapStage 0
(MapPartitionsRDD[1] at map at :24), which has no missing parents
23/10/12 12:58:14 DEBUG DAGScheduler: submitMissingTasks(ShuffleMapStage 0)

*// LifecycleManager received GetShuffleId request from ShuffleMapStage 0
with spark_shuffleId 0, stage attemptId 0, and generate celeborn_shuffleId
0 for write*
23/10/12 12:58:16 DEBUG LifecycleManager: Received GetShuffleId
request,appShuffleId 0 maxAttemptNum 4 attemptId 0 isShuffleWriter true
23/10/12 12:58:16 DEBUG LifecycleManager: Received RegisterShuffle request,
0, 20, 20.
23/10/12 12:58:16 INFO LifecycleManager: New shuffle request, shuffleId 0,
partitionType: REDUCE numMappers: 20, numReducers: 20.

*// ShuffleMapStage finish, Submit ResultStage 1*
23/10/12 12:58:18 INFO LifecycleManager: Received StageEnd request,
shuffleId 0.
23/10/12 12:58:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool
23/10/12 12:58:18 INFO DAGScheduler: ShuffleMapStage 0 (map at :24)
finished in 4.246 s
23/10/12 12:58:18 INFO DAGScheduler: looking for newly runnable stages
23/10/12 12:58:18 INFO DAGScheduler: running: Set()
23/10/12 12:58:18 INFO DAGScheduler: waiting: Set(ResultStage 1)
23/10/12 12:58:18 INFO DAGScheduler: failed: Set()
23/10/12 12:58:18 DEBUG DAGScheduler: submitStage(ResultStage 1 (name=count
at :25;jobs=0))
23/10/12 12:58:18 DEBUG DAGScheduler: missing: List()
23/10/12 12:58:18 INFO DAGScheduler: Submitting ResultStage 1
(MapPartitionsRDD[3] at mapPartitions at :24), which has no
missing parents
23/10/12 12:58:18 DEBUG DAGScheduler: submitMissingTasks(ResultStage 1)

*// LifecycleManager received GetShuffleId request from ResultStage 1 with
spark_shuffleId 0, stage attemptId 0*
*// and generate celeborn_shuffleId 0 to return shuffle metadata with
celeborn's GetShuffleFileGroup for read*
23/10/12 12:58:19 DEBUG LifecycleManager: Received GetShuffleId
request,appShuffleId 0 maxAttemptNum 4 attemptId 0 isShuffleWriter false
23/10/12 12:58:19 DEBUG CelebornShuffleReader: read shuffleId 0 for
appShuffleId 0 attemptNum 0
23/10/12 12:58:19 DEBUG LifecycleManager: Received GetShuffleFileGroup
request,shuffleId 0.

23/10/12 12:58:20 INFO DAGScheduler: ResultStage 1 (count at :25)
finished in 1.546 s
23/10/12 12:58:20 INFO DAGScheduler: Job 0 is finished. Cancelling
potential speculative or zombie tasks for this job

*// DAGScheduler starts job 1, submit ShuffleMapStage 3 for spark shuffle-1*
23/10/12 12:58:22 INFO DAGScheduler: Registering RDD 4 (map at :25)
as input to shuffle 1
23/10/12 12:58:22 INFO DAGScheduler: Got job 1 (count at :25) with
20 output partitions
23/10/12 12:58:22 INFO DAGScheduler: Final stage: ResultStage 4 (count at <
console>:25)
23/10/12 12:58:22 INFO DAGScheduler: Parents of final stage:
List(ShuffleMapStage 3)
23/10/12 12:58:22 INFO DAGScheduler: Missing parents: List(ShuffleMapStage
3)
23/10/12 12:58:22 DEBUG DAGScheduler: submitStage(ResultStage 4 (name=count
at :25;jobs=1))
23/10/12 12:58:22 DEBUG DAGScheduler: missing: List(ShuffleMapStage 3)
23/10/12 12:58:22 DEBUG DAGScheduler: submitStage(ShuffleMapStage 3
(name=map at :25;jobs=1))
23/10/12 12:58:22 DEBUG DAGScheduler: 

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-09-25 Thread Erik fang
Thanks Mridul
Your comments provide lots of information and are very helpful, highly
appreciated!

For a, I agree that restart SPARK-25299 requires big effort, and I don't
think we can get it merged to Spark in short time

For b & c, I've checked DeterministicLevel/Barrier RDD related codes in
DAGScheduler previously, and will take a close look at SPARK-23243
Regarding MapOutputTracker, I had the concern of thread safety as well,
however, I found shuffleStatuses is a ConcurrentHashMap and ShuffleStatus's
public methods are thread safe
<https://github.com/apache/spark/blob/v3.2.0/core/src/main/scala/org/apache/spark/MapOutputTracker.scala#L53>
I suppose it should be safe to modify outside of DAGScheduler?

For d, I think I get your point
let me have some test and get back to you soon

Regards,
Erik

On Sat, Sep 23, 2023 at 6:10 PM Mridul Muralidharan 
wrote:

> Hi,
>
>   I am not yet very familiar with Celeborn, so will restrict my notes on
> the proposal in context to Apache Spark:
>
> a) For Option 1, there is SPARK-25299 - which was started a few years back.
> Unfortunately, the work there has stalled: but if there is interest in
> pushing that forward, I can help shepard the contributions !
> Full disclosure, the earlier proposal might be fairly outdated, and will
> involve a bit of investment to restart that work.
>
> b) On the ability to reuse a previous mapper output/minimize cost - that
> depends on a stage's DeterministicLevel.
> DETERMINATE mapper stage output can be reused, and not others - and there
> is a lot of nuance around how DAGScheduler handles it.
> Lot of it has to do with data correctness (See SPARK-23243 and the PR's
> linked there for more indepth analysis of this) - and this has kept
> evolving in the years since.
> DAGScheduler directly updates MapOutputTracker for a few cases - which
> includes for this.
>
> c) As a follow up to (b) above, even though MapOutputTracker is part of
> SparkEnv, and so 'accessible', I would be careful modifying its state
> directly outside of DAGScheduler.
>
> d) The computation for "celeborn shuffleId" would not work - since
> spark.stage.maxConsecutiveAttempts is for consecutive failures for a single
> stage in a job.
> The same shuffle id can be computed by different stages across jobs (for
> example: very common with Spark SQL AQE btw).
> A simple example here [1]
>
>
> Other than Option 1, the rest look like a tradeoff to varying degrees.
> I am not familiar enough with Celeborn to give good suggestions yet though.
>
>
> All the best in trying to solve this issue - looking forward to updates !
>
> Regards,
> Mridul
>
> [1]
> Run with './bin/spark-shell  --master 'local-cluster[4, 3, 1024]'' or
> yarn/k8s
>
> import org.apache.spark.TaskContext
>
> val rdd1 = sc.parallelize(0 until 1, 20).map(v => (v, v)).groupByKey()
> val rdd2 = rdd1.mapPartitions { iter =>
>   val tc = TaskContext.get()
>   if (0 == tc.partitionId() && tc.stageAttemptNumber() < 1) {
> System.exit(0)
>   }
>   iter
> }
>
> rdd2.count()
> rdd2.map(v => (v._1, v._2)).groupByKey().count()
>
> For both the jobs, the same shuffle id is used for the first shuffle.
>
>
>
> On Fri, Sep 22, 2023 at 10:48 AM Erik fang  wrote:
>
> > Hi folks,
> >
> > I have a proposal to implement Spark stage resubmission to handle shuffle
> > fetch failure in Celeborn
> >
> >
> >
> https://docs.google.com/document/d/1dkG6fww3g99VAb1wkphNlUES_MpngVPNg8601chmVp8
> >
> > please have a look and let me know what you think
> >
> > Regards,
> > Erik
> >
>


[PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-09-22 Thread Erik fang
Hi folks,

I have a proposal to implement Spark stage resubmission to handle shuffle
fetch failure in Celeborn

https://docs.google.com/document/d/1dkG6fww3g99VAb1wkphNlUES_MpngVPNg8601chmVp8

please have a look and let me know what you think

Regards,
Erik