Thanks for raising this valueable point, Xintong

Supporting external shuffle service makes sense to me. In order to recover
the internal states of ShuffleMaster after JM restarts, we will add the
following 3 methods to ShuffleMaster:

boolean supportsBatchSnapshot();
void snapshotState(CompletableFuture<byte[]> snapshotFuture);
void restoreState(byte[] snapshotData);

We will provide empty implementations by default. If an external service
wants to support Job Recovery, it needs to override these methods.  Before
the job starts running, we will check whether the shuffle master supports
taking snapshots(through method supportsBatchSnapshot). If it is not
supported, we will disable Job Recovery for jobs.

The default Netty/TM shuffle is stateless, so we only need to override the
"supportsBatchSnapshot" method to let it return true ("snapshotState" and
"restoreState" keep empty implementations).

You can find more details in FLIP  "JobEvent" section.

Best,
Lijie

Xintong Song <tonysong...@gmail.com> 于2023年12月4日周一 15:34写道:

> Thanks for the proposal, Lijie and Zhu.
>
> I have been having offline discussions with the Apache Celeborn folks
> regarding integrating Apache Celeborn into Flink's Hybrid Shuffle mode. One
> thing coming from those discussions that might relate to this FLIP is that
> Celeborn maintains some internal states inside its LifecycleManager (think
> of this as a component resident in Flink's Shuffle Master), which would
> also need persistent and recovery in order for the partitions to be reused
> after a JM crash. Given that Flink supports pluggable shuffle services,
> there could be other custom shuffle services with similar demands. I wonder
> if it makes sense to also add interfaces that take snapshots from Shuffle
> Master once a while, and provide such snapshots to Shuffle Master upon
> recovery?
>
> Best,
>
> Xintong
>
>
>
> On Thu, Nov 30, 2023 at 5:48 PM Lijie Wang <wangdachui9...@gmail.com>
> wrote:
>
> > Hi Guowei,
> >
> > Thanks for your feedback.
> >
> > >> As far as I know, there are multiple job managers on standby in some
> > scenarios. In this case, is your design still effective?
> > I think it's still effective. There will only be one leader. After
> becoming
> > the leader, the startup process of JobMaster is the same as only one
> > jobmanger restarts, so I think the current process should also be
> > applicable to multi-jobmanager situation. We will also do some tests to
> > cover this case.
> >
> > >> How do you rule out that there might still be some states in the
> memory
> > of the original operator coordinator?
> > Current restore process is the same as steraming jobs restore from
> > checkpoint(call the same methods) after failover, which is widely used in
> > production, so I think there is no problem.
> >
> > >> Additionally, using NO_CHECKPOINT seems a bit odd. Why not use a
> normal
> > checkpoint ID greater than 0 and record it in the event store?
> > We use -1(NO_CHECKPOINT) to distinguish it from a normal checkpoint, -1
> > indicates that this is a snapshot for the no-checkpoint/batch scenarios.
> >
> > Besides, considering that currently some operator coordinators may not
> > support taking snapshots in the no-checkpint/batch scenarios (or don't
> > support passing -1 as a checkpoint id), we think it is better to let the
> > developer explicitly specify whether it supports snapshots in the batch
> > scenario. Therefore, we intend to introduce the "SupportsBatchSnapshot"
> > interface for split enumerator and the "supportsBatchSnapshot" method for
> > operator coordinator. You can find more details in FLIP "Introduce
> > SupportsBatchSnapshot interface" and "JobEvent" sections.
> >
> > Looking forward to your further feedback.
> >
> > Best,
> > Lijie
> >
> > Guowei Ma <guowei....@gmail.com> 于2023年11月19日周日 10:47写道:
> >
> > > Hi,
> > >
> > >
> > > This is a very good proposal, as far as I know, it can solve some very
> > > critical production operations in certain scenarios. I have two minor
> > > issues:
> > >
> > > As far as I know, there are multiple job managers on standby in some
> > > scenarios. In this case, is your design still effective? I'm unsure if
> > you
> > > have conducted any tests. For instance, standby job managers might take
> > > over these failed jobs more quickly.
> > > Regarding the part about the operator coordinator, how can you ensure
> > that
> > > the checkpoint mechanism can restore the state of the operator
> > coordinator:
> > > For example:
> > > How do you rule out that there might still be some states in the memory
> > of
> > > the original operator coordinator? After all, the implementation was
> done
> > > under the assumption of scenarios where the job manager doesn't fail.
> > > Additionally, using NO_CHECKPOINT seems a bit odd. Why not use a normal
> > > checkpoint ID greater than 0 and record it in the event store?
> > > If the issues raised in point 2 cannot be resolved in the short term,
> > would
> > > it be possible to consider not supporting failover with a source job
> > > manager?
> > >
> > > Best,
> > > Guowei
> > >
> > >
> > > On Thu, Nov 2, 2023 at 6:01 PM Lijie Wang <wangdachui9...@gmail.com>
> > > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > Zhu Zhu and I would like to start a discussion about FLIP-383:
> Support
> > > Job
> > > > Recovery for Batch Jobs[1]
> > > >
> > > > Currently, when Flink’s job manager crashes or gets killed, possibly
> > due
> > > to
> > > > unexpected errors or planned nodes decommission, it will cause the
> > > > following two situations:
> > > > 1. Failed, if the job does not enable HA.
> > > > 2. Restart, if the job enable HA. If it’s a streaming job, the job
> will
> > > be
> > > > resumed from the last successful checkpoint. If it’s a batch job, it
> > has
> > > to
> > > > run from beginning, all previous progress will be lost.
> > > >
> > > > In view of this, we think the JM crash may cause great regression for
> > > batch
> > > > jobs, especially long running batch jobs. This FLIP is mainly to
> solve
> > > this
> > > > problem so that batch jobs can recover most job progress after JM
> > > crashes.
> > > > In this FLIP, our goal is to let most finished tasks not need to be
> > > re-run.
> > > >
> > > > You can find more details in the FLIP-383[1]. Looking forward to your
> > > > feedback.
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs
> > > >
> > > > Best,
> > > > Lijie
> > > >
> > >
> >
>

Reply via email to