Hi Gyula,

Thanks for sharing the idea. As Yuan mentioned, I think we can discuss this
within two scopes. One is the job subgraph, the other is the execution
subgraph, which I suppose is the same as PipelineRegion.

An idea is to individually checkpoint the PipelineRegions, for the
recovering in a single run.

Flink has now supported PipelineRegion based failover, with a subset of a
global checkpoint snapshot. The checkpoint barriers are spread within a
PipelineRegion, so the checkpointing of individual PipelineRegions is
actually independent. Since in a single run of a job, the PipelineRegions
are fixed, we can individually checkpoint separated PipelineRegions,
despite what status the other PipelineRegions are, and use a snapshot of a
failing region to recover, instead of the subset of a global snapshot. This
can support separated job subgraphs as well, since they will also be
separated into different PipelineRegions. I think this can fulfill your
needs.

In fact the individual snapshots of all PipelineRegions can form a global
snapshot, and the alignment of snapshots of individual regions is not
necessary. But rescaling this global snapshot can be potentially complex. I
think it's better to use the individual snapshots in a single run, and take
a global checkpoint/savepoint before restarting the job, rescaling it or
not.

A major issue of this plan is that it breaks the checkpoint mechanism of
Flink. As far as I know, even in the approximate recovery, the snapshot
used to recover a single task is still a part of a global snapshot. To
implement the individual checkpointing of PipelineRegions, there may need
to be a checkpoint coordinator for each PipelineRegion, and a new global
checkpoint coordinator. When the scale goes up, there can be many
individual regions, which can be a big burden to the job manager. The
meaning of the checkpoint id will also be changed, which can affect many
aspects. There can be lots of work and risks, and the risks still exist if
we only individually checkpoint separated job subgraphs, since the
mechanism is still broken. If that is what you need, maybe separating them
into different jobs is an easier and better choice, as Caizhi and Yuan
mentioned.

On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <yuanmei.w...@gmail.com> wrote:

> Hey Gyula,
>
> That's a very interesting idea. The discussion about the `Individual` vs
> `Global` checkpoint was raised before, but the main concern was from two
> aspects:
>
> - Non-deterministic replaying may lead to an inconsistent view of
> checkpoint
> - It is not easy to form a clear cut of past and future and hence no clear
> cut of where the start point of the source should begin to replay from.
>
> Starting from independent subgraphs as you proposed may be a good starting
> point. However, when we talk about subgraph, do we mention it as a job
> subgraph (each vertex is one or more operators) or execution subgraph (each
> vertex is a task instance)?
>
> If it is a job subgraph, then indeed, why not separate it into multiple
> jobs as Caizhi mentioned.
> If it is an execution subgraph, then it is difficult to handle rescaling
> due to inconsistent views of checkpoints between tasks of the same
> operator.
>
> `Individual/Subgraph Checkpointing` is definitely an interesting direction
> to think of, and I'd love to hear more from you!
>
> Best,
>
> Yuan
>
>
>
>
>
>
>
> On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <tsreape...@gmail.com> wrote:
>
> > Hi Gyula!
> >
> > Thanks for raising this discussion. I agree that this will be an
> > interesting feature but I actually have some doubts about the motivation
> > and use case. If there are multiple individual subgraphs in the same job,
> > why not just distribute them to multiple jobs so that each job contains
> > only one individual graph and can now fail without disturbing the others?
> >
> >
> > Gyula Fóra <gyf...@apache.org> 于2022年2月7日周一 05:22写道:
> >
> > > Hi all!
> > >
> > > At the moment checkpointing only works for healthy jobs with all
> running
> > > (or some finished) tasks. This sounds reasonable in most cases but
> there
> > > are a few applications where it would make sense to checkpoint failing
> > jobs
> > > as well.
> > >
> > > Due to how the checkpointing mechanism works, subgraphs that have a
> > failing
> > > task cannot be checkpointed without violating the exactly-once
> semantics.
> > > However if the job has multiple independent subgraphs (that are not
> > > connected to each other), even if one subgraph is failing, the other
> > > completely running one could be checkpointed. In these cases the tasks
> of
> > > the failing subgraph could simply inherit the last successful
> checkpoint
> > > metadata (before they started failing). This logic would produce a
> > > consistent checkpoint.
> > >
> > > The job as a whole could now make stateful progress even if some
> > subgraphs
> > > are constantly failing. This can be very valuable if for some reason
> the
> > > job has a larger number of independent subgraphs that are expected to
> > fail
> > > every once in a while, or if some subgraphs can have longer downtimes
> > that
> > > would now cause the whole job to stall.
> > >
> > > What do you think?
> > >
> > > Cheers,
> > > Gyula
> > >
> >
>

Reply via email to