Hi,

I second Chesnay's comment and would like to better understand the
motivation behind this. At the surface it sounds to me like this might
require quite a bit of work for a very narrow use case.

At the same time I have a feeling that Yuan, you are mixing this feature
request (checkpointing subgraphs/pipeline regions independently) and a very
very different issue of "task local checkpoints"? Those problems are kind
of similar, but not quite.

Best,
Piotrek

wt., 8 lut 2022 o 11:44 Chesnay Schepler <ches...@apache.org> napisał(a):

> Could someone expand on these operational issues you're facing when
> achieving this via separate jobs?
>
> I feel like we're skipping a step, arguing about solutions without even
> having discussed the underlying problem.
>
> On 08/02/2022 11:25, Gen Luo wrote:
> > Hi,
> >
> > @Yuan
> > Do you mean that there should be no shared state between source subtasks?
> > Sharing state between checkpoints of a specific subtask should be fine.
> >
> > Sharing state between subtasks of a task can be an issue, no matter
> whether
> > it's a source. That's also what I was afraid of in the previous replies.
> In
> > one word, if the behavior of a pipeline region can somehow influence the
> > state of other pipeline regions, their checkpoints have to be aligned
> > before rescaling.
> >
> > On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei <yuanmei.w...@gmail.com> wrote:
> >
> >> Hey Folks,
> >>
> >> Thanks for the discussion!
> >>
> >> *Motiviation and use cases*
> >> I think motiviation and use cases are very clear and I do not have
> doubts
> >> on this part.
> >> A typical use case is ETL with two-phase-commit, hundreds of partitions
> can
> >> be blocked by a single straggler (a single task's checkpoint abortion
> can
> >> affect all, not necessary failure).
> >>
> >> *Source offset redistribution*
> >> As for the known sources & implementation for Flink, I can not find a
> case
> >> that does not work, *for now*.
> >> I need to dig a bit more: how splits are tracked assigned, not
> successfully
> >> processed, succesffully processed e.t.c.
> >> I guess it is a single shared source OPCoordinator. And how this
> *shared*
> >> state (between tasks) is preserved?
> >>
> >> *Input partition/splits treated completely independent from each other*
> >> This part I am still not sure, as mentioned if we have shared state of
> >> source in the above section.
> >>
> >> To Thomas:
> >>> In Yuan's example, is there a reason why CP8 could not be promoted to
> >>> CP10 by the coordinator for PR2 once it receives the notification that
> >>> CP10 did not complete? It appears that should be possible since in its
> >>> effect it should be no different than no data processed between CP8
> >>>   and CP10?
> >> Not sure what "promoted" means here, but
> >> 1. I guess it does not matter whether it is CP8 or CP10 any more,
> >> if no shared state in source, as exactly what you meantinoed,
> >> "it should be no different than no data processed between CP8 and CP10"
> >>
> >> 2. I've noticed that from this question there is a gap between
> >> "*allow aborted/failed checkpoint in independent sub-graph*" and
> >> my intention: "*independent sub-graph checkpointing indepently*"
> >>
> >> Best
> >> Yuan
> >>
> >>
> >> On Tue, Feb 8, 2022 at 11:34 AM Gen Luo <luogen...@gmail.com> wrote:
> >>
> >>> Hi,
> >>>
> >>> I'm thinking about Yuan's case. Let's assume that the case is running
> in
> >>> current Flink:
> >>> 1. CP8 finishes
> >>> 2. For some reason, PR2 stops consuming records from the source (but is
> >> not
> >>> stuck), and PR1 continues consuming new records.
> >>> 3. CP9 and CP10 finish
> >>> 4. PR2 starts to consume quickly to catch up with PR1, and reaches the
> >> same
> >>> final status with that in Yuan's case before CP11 starts.
> >>>
> >>> I support that in this case, the status of the job can be the same as
> in
> >>> Yuan's case, and the snapshot (including source states) taken at CP10
> >>> should be the same as the composed global snapshot in Yuan's case,
> which
> >> is
> >>> combining CP10 of PR1 and CP8 of PR2. This should be true if neither
> >> failed
> >>> checkpointing nor uncommitted consuming have side effects, both of
> which
> >>> can break the exactly-once semantics when replaying. So I think there
> >>> should be no difference between rescaling the combined global snapshot
> or
> >>> the globally taken one, i.e. if the input partitions are not
> independent,
> >>> we are probably not able to rescale the source state in the current
> Flink
> >>> eiter.
> >>>
> >>> And @Thomas, I do agree that the operational burden is
> >>> significantly reduced, while I'm a little afraid that checkpointing the
> >>> subgraphs individually may increase most of the runtime overhead back
> >>> again. Maybe we can find a better way to implement this.
> >>>
> >>> On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <t...@apache.org> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> Thanks for opening this discussion! The proposed enhancement would be
> >>>> interesting for use cases in our infrastructure as well.
> >>>>
> >>>> There are scenarios where it makes sense to have multiple disconnected
> >>>> subgraphs in a single job because it can significantly reduce the
> >>>> operational burden as well as the runtime overhead. Since we allow
> >>>> subgraphs to recover independently, then why not allow them to make
> >>>> progress independently also, which would imply that checkpointing must
> >>>> succeed for non affected subgraphs as certain behavior is tied to
> >>>> checkpoint completion, like Kafka offset commit, file output etc.
> >>>>
> >>>> As for source offset redistribution, offset/position needs to be tied
> >>>> to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
> >>>> and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
> >>>> source framework, it would be hard to implement a source with correct
> >>>> behavior that does not track the position along with the split.
> >>>>
> >>>> In Yuan's example, is there a reason why CP8 could not be promoted to
> >>>> CP10 by the coordinator for PR2 once it receives the notification that
> >>>> CP10 did not complete? It appears that should be possible since in its
> >>>> effect it should be no different than no data processed between CP8
> >>>> and CP10?
> >>>>
> >>>> Thanks,
> >>>> Thomas
> >>>>
> >>>> On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <trohrm...@apache.org>
> >>> wrote:
> >>>>> Thanks for the clarification Yuan and Gen,
> >>>>>
> >>>>> I agree that the checkpointing of the sources needs to support the
> >>>>> rescaling case, otherwise it does not work. Is there currently a
> >> source
> >>>>> implementation where this wouldn't work? For Kafka it should work
> >>> because
> >>>>> we store the offset per assigned partition. For Kinesis it is
> >> probably
> >>>> the
> >>>>> same. For the Filesource we store the set of unread input splits in
> >> the
> >>>>> source coordinator and the state of the assigned splits in the
> >> sources.
> >>>>> This should probably also work since new splits are only handed out
> >> to
> >>>>> running tasks.
> >>>>>
> >>>>> Cheers,
> >>>>> Till
> >>>>>
> >>>>> On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yuanmei.w...@gmail.com>
> >>> wrote:
> >>>>>> Hey Till,
> >>>>>>
> >>>>>>> Why rescaling is a problem for pipelined regions/independent
> >>>> execution
> >>>>>> subgraphs:
> >>>>>>
> >>>>>> Take a simplified example :
> >>>>>> job graph : source  (2 instances) -> sink (2 instances)
> >>>>>> execution graph:
> >>>>>> source (1/2)  -> sink (1/2)   [pieplined region 1]
> >>>>>> source (2/2)  -> sink (2/2)   [pieplined region 2]
> >>>>>>
> >>>>>> Let's assume checkpoints are still triggered globally, meaning
> >>>> different
> >>>>>> pipelined regions share the global checkpoint id (PR1 CP1 matches
> >>> with
> >>>> PR2
> >>>>>> CP1).
> >>>>>>
> >>>>>> Now let's assume PR1 completes CP10 and PR2 completes CP8.
> >>>>>>
> >>>>>> Let's say we want to rescale to parallelism 3 due to increased
> >> input.
> >>>>>> - Notice that we can not simply rescale based on the latest
> >> completed
> >>>>>> checkpoint (CP8), because PR1 has already had data (CP8 -> CP10)
> >>> output
> >>>>>> externally.
> >>>>>> - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on
> >>>> how the
> >>>>>> source's offset redistribution is implemented.
> >>>>>>     The answer is yes if we treat each input partition as
> >> independent
> >>>> from
> >>>>>> each other, *but I am not sure whether we can make that
> >> assumption*.
> >>>>>> If not, the rescaling cannot happen until PR1 and PR2 are aligned
> >>> with
> >>>> CPs.
> >>>>>> Best
> >>>>>> -Yuan
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <trohrm...@apache.org
> >>>> wrote:
> >>>>>>> Hi everyone,
> >>>>>>>
> >>>>>>> Yuan and Gen could you elaborate why rescaling is a problem if we
> >>> say
> >>>>>> that
> >>>>>>> separate pipelined regions can take checkpoints independently?
> >>>>>>> Conceptually, I somehow think that a pipelined region that is
> >>> failed
> >>>> and
> >>>>>>> cannot create a new checkpoint is more or less the same as a
> >>>> pipelined
> >>>>>>> region that didn't get new input or a very very slow pipelined
> >>> region
> >>>>>> which
> >>>>>>> couldn't read new records since the last checkpoint (assuming
> >> that
> >>>> the
> >>>>>>> checkpoint coordinator can create a global checkpoint by
> >> combining
> >>>>>>> individual checkpoints (e.g. taking the last completed checkpoint
> >>>> from
> >>>>>> each
> >>>>>>> pipelined region)). If this comparison is correct, then this
> >> would
> >>>> mean
> >>>>>>> that we have rescaling problems under the latter two cases.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Till
> >>>>>>>
> >>>>>>> On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <luogen...@gmail.com>
> >>> wrote:
> >>>>>>>> Hi Gyula,
> >>>>>>>>
> >>>>>>>> Thanks for sharing the idea. As Yuan mentioned, I think we can
> >>>> discuss
> >>>>>>> this
> >>>>>>>> within two scopes. One is the job subgraph, the other is the
> >>>> execution
> >>>>>>>> subgraph, which I suppose is the same as PipelineRegion.
> >>>>>>>>
> >>>>>>>> An idea is to individually checkpoint the PipelineRegions, for
> >>> the
> >>>>>>>> recovering in a single run.
> >>>>>>>>
> >>>>>>>> Flink has now supported PipelineRegion based failover, with a
> >>>> subset
> >>>>>> of a
> >>>>>>>> global checkpoint snapshot. The checkpoint barriers are spread
> >>>> within a
> >>>>>>>> PipelineRegion, so the checkpointing of individual
> >>> PipelineRegions
> >>>> is
> >>>>>>>> actually independent. Since in a single run of a job, the
> >>>>>> PipelineRegions
> >>>>>>>> are fixed, we can individually checkpoint separated
> >>>> PipelineRegions,
> >>>>>>>> despite what status the other PipelineRegions are, and use a
> >>>> snapshot
> >>>>>> of
> >>>>>>> a
> >>>>>>>> failing region to recover, instead of the subset of a global
> >>>> snapshot.
> >>>>>>> This
> >>>>>>>> can support separated job subgraphs as well, since they will
> >> also
> >>>> be
> >>>>>>>> separated into different PipelineRegions. I think this can
> >>> fulfill
> >>>> your
> >>>>>>>> needs.
> >>>>>>>>
> >>>>>>>> In fact the individual snapshots of all PipelineRegions can
> >> form
> >>> a
> >>>>>> global
> >>>>>>>> snapshot, and the alignment of snapshots of individual regions
> >> is
> >>>> not
> >>>>>>>> necessary. But rescaling this global snapshot can be
> >> potentially
> >>>>>>> complex. I
> >>>>>>>> think it's better to use the individual snapshots in a single
> >>> run,
> >>>> and
> >>>>>>> take
> >>>>>>>> a global checkpoint/savepoint before restarting the job,
> >>> rescaling
> >>>> it
> >>>>>> or
> >>>>>>>> not.
> >>>>>>>>
> >>>>>>>> A major issue of this plan is that it breaks the checkpoint
> >>>> mechanism
> >>>>>> of
> >>>>>>>> Flink. As far as I know, even in the approximate recovery, the
> >>>> snapshot
> >>>>>>>> used to recover a single task is still a part of a global
> >>>> snapshot. To
> >>>>>>>> implement the individual checkpointing of PipelineRegions,
> >> there
> >>>> may
> >>>>>> need
> >>>>>>>> to be a checkpoint coordinator for each PipelineRegion, and a
> >> new
> >>>>>> global
> >>>>>>>> checkpoint coordinator. When the scale goes up, there can be
> >> many
> >>>>>>>> individual regions, which can be a big burden to the job
> >> manager.
> >>>> The
> >>>>>>>> meaning of the checkpoint id will also be changed, which can
> >>> affect
> >>>>>> many
> >>>>>>>> aspects. There can be lots of work and risks, and the risks
> >> still
> >>>> exist
> >>>>>>> if
> >>>>>>>> we only individually checkpoint separated job subgraphs, since
> >>> the
> >>>>>>>> mechanism is still broken. If that is what you need, maybe
> >>>> separating
> >>>>>>> them
> >>>>>>>> into different jobs is an easier and better choice, as Caizhi
> >> and
> >>>> Yuan
> >>>>>>>> mentioned.
> >>>>>>>>
> >>>>>>>> On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <
> >> yuanmei.w...@gmail.com
> >>>>>> wrote:
> >>>>>>>>> Hey Gyula,
> >>>>>>>>>
> >>>>>>>>> That's a very interesting idea. The discussion about the
> >>>> `Individual`
> >>>>>>> vs
> >>>>>>>>> `Global` checkpoint was raised before, but the main concern
> >> was
> >>>> from
> >>>>>>> two
> >>>>>>>>> aspects:
> >>>>>>>>>
> >>>>>>>>> - Non-deterministic replaying may lead to an inconsistent
> >> view
> >>> of
> >>>>>>>>> checkpoint
> >>>>>>>>> - It is not easy to form a clear cut of past and future and
> >>>> hence no
> >>>>>>>> clear
> >>>>>>>>> cut of where the start point of the source should begin to
> >>> replay
> >>>>>> from.
> >>>>>>>>> Starting from independent subgraphs as you proposed may be a
> >>> good
> >>>>>>>> starting
> >>>>>>>>> point. However, when we talk about subgraph, do we mention it
> >>> as
> >>>> a
> >>>>>> job
> >>>>>>>>> subgraph (each vertex is one or more operators) or execution
> >>>> subgraph
> >>>>>>>> (each
> >>>>>>>>> vertex is a task instance)?
> >>>>>>>>>
> >>>>>>>>> If it is a job subgraph, then indeed, why not separate it
> >> into
> >>>>>> multiple
> >>>>>>>>> jobs as Caizhi mentioned.
> >>>>>>>>> If it is an execution subgraph, then it is difficult to
> >> handle
> >>>>>>> rescaling
> >>>>>>>>> due to inconsistent views of checkpoints between tasks of the
> >>>> same
> >>>>>>>>> operator.
> >>>>>>>>>
> >>>>>>>>> `Individual/Subgraph Checkpointing` is definitely an
> >>> interesting
> >>>>>>>> direction
> >>>>>>>>> to think of, and I'd love to hear more from you!
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>>
> >>>>>>>>> Yuan
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <
> >>>> tsreape...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>> Hi Gyula!
> >>>>>>>>>>
> >>>>>>>>>> Thanks for raising this discussion. I agree that this will
> >> be
> >>>> an
> >>>>>>>>>> interesting feature but I actually have some doubts about
> >> the
> >>>>>>>> motivation
> >>>>>>>>>> and use case. If there are multiple individual subgraphs in
> >>> the
> >>>>>> same
> >>>>>>>> job,
> >>>>>>>>>> why not just distribute them to multiple jobs so that each
> >>> job
> >>>>>>> contains
> >>>>>>>>>> only one individual graph and can now fail without
> >> disturbing
> >>>> the
> >>>>>>>> others?
> >>>>>>>>>>
> >>>>>>>>>> Gyula Fóra <gyf...@apache.org> 于2022年2月7日周一 05:22写道:
> >>>>>>>>>>
> >>>>>>>>>>> Hi all!
> >>>>>>>>>>>
> >>>>>>>>>>> At the moment checkpointing only works for healthy jobs
> >>> with
> >>>> all
> >>>>>>>>> running
> >>>>>>>>>>> (or some finished) tasks. This sounds reasonable in most
> >>>> cases
> >>>>>> but
> >>>>>>>>> there
> >>>>>>>>>>> are a few applications where it would make sense to
> >>>> checkpoint
> >>>>>>>> failing
> >>>>>>>>>> jobs
> >>>>>>>>>>> as well.
> >>>>>>>>>>>
> >>>>>>>>>>> Due to how the checkpointing mechanism works, subgraphs
> >>> that
> >>>>>> have a
> >>>>>>>>>> failing
> >>>>>>>>>>> task cannot be checkpointed without violating the
> >>>> exactly-once
> >>>>>>>>> semantics.
> >>>>>>>>>>> However if the job has multiple independent subgraphs
> >> (that
> >>>> are
> >>>>>> not
> >>>>>>>>>>> connected to each other), even if one subgraph is
> >> failing,
> >>>> the
> >>>>>>> other
> >>>>>>>>>>> completely running one could be checkpointed. In these
> >>> cases
> >>>> the
> >>>>>>>> tasks
> >>>>>>>>> of
> >>>>>>>>>>> the failing subgraph could simply inherit the last
> >>> successful
> >>>>>>>>> checkpoint
> >>>>>>>>>>> metadata (before they started failing). This logic would
> >>>> produce
> >>>>>> a
> >>>>>>>>>>> consistent checkpoint.
> >>>>>>>>>>>
> >>>>>>>>>>> The job as a whole could now make stateful progress even
> >> if
> >>>> some
> >>>>>>>>>> subgraphs
> >>>>>>>>>>> are constantly failing. This can be very valuable if for
> >>> some
> >>>>>>> reason
> >>>>>>>>> the
> >>>>>>>>>>> job has a larger number of independent subgraphs that are
> >>>>>> expected
> >>>>>>> to
> >>>>>>>>>> fail
> >>>>>>>>>>> every once in a while, or if some subgraphs can have
> >> longer
> >>>>>>> downtimes
> >>>>>>>>>> that
> >>>>>>>>>>> would now cause the whole job to stall.
> >>>>>>>>>>>
> >>>>>>>>>>> What do you think?
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Gyula
> >>>>>>>>>>>
>
>

Reply via email to