Could someone expand on these operational issues you're facing when achieving this via separate jobs?

I feel like we're skipping a step, arguing about solutions without even having discussed the underlying problem.

On 08/02/2022 11:25, Gen Luo wrote:
Hi,

@Yuan
Do you mean that there should be no shared state between source subtasks?
Sharing state between checkpoints of a specific subtask should be fine.

Sharing state between subtasks of a task can be an issue, no matter whether
it's a source. That's also what I was afraid of in the previous replies. In
one word, if the behavior of a pipeline region can somehow influence the
state of other pipeline regions, their checkpoints have to be aligned
before rescaling.

On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei <yuanmei.w...@gmail.com> wrote:

Hey Folks,

Thanks for the discussion!

*Motiviation and use cases*
I think motiviation and use cases are very clear and I do not have doubts
on this part.
A typical use case is ETL with two-phase-commit, hundreds of partitions can
be blocked by a single straggler (a single task's checkpoint abortion can
affect all, not necessary failure).

*Source offset redistribution*
As for the known sources & implementation for Flink, I can not find a case
that does not work, *for now*.
I need to dig a bit more: how splits are tracked assigned, not successfully
processed, succesffully processed e.t.c.
I guess it is a single shared source OPCoordinator. And how this *shared*
state (between tasks) is preserved?

*Input partition/splits treated completely independent from each other*
This part I am still not sure, as mentioned if we have shared state of
source in the above section.

To Thomas:
In Yuan's example, is there a reason why CP8 could not be promoted to
CP10 by the coordinator for PR2 once it receives the notification that
CP10 did not complete? It appears that should be possible since in its
effect it should be no different than no data processed between CP8
  and CP10?
Not sure what "promoted" means here, but
1. I guess it does not matter whether it is CP8 or CP10 any more,
if no shared state in source, as exactly what you meantinoed,
"it should be no different than no data processed between CP8 and CP10"

2. I've noticed that from this question there is a gap between
"*allow aborted/failed checkpoint in independent sub-graph*" and
my intention: "*independent sub-graph checkpointing indepently*"

Best
Yuan


On Tue, Feb 8, 2022 at 11:34 AM Gen Luo <luogen...@gmail.com> wrote:

Hi,

I'm thinking about Yuan's case. Let's assume that the case is running in
current Flink:
1. CP8 finishes
2. For some reason, PR2 stops consuming records from the source (but is
not
stuck), and PR1 continues consuming new records.
3. CP9 and CP10 finish
4. PR2 starts to consume quickly to catch up with PR1, and reaches the
same
final status with that in Yuan's case before CP11 starts.

I support that in this case, the status of the job can be the same as in
Yuan's case, and the snapshot (including source states) taken at CP10
should be the same as the composed global snapshot in Yuan's case, which
is
combining CP10 of PR1 and CP8 of PR2. This should be true if neither
failed
checkpointing nor uncommitted consuming have side effects, both of which
can break the exactly-once semantics when replaying. So I think there
should be no difference between rescaling the combined global snapshot or
the globally taken one, i.e. if the input partitions are not independent,
we are probably not able to rescale the source state in the current Flink
eiter.

And @Thomas, I do agree that the operational burden is
significantly reduced, while I'm a little afraid that checkpointing the
subgraphs individually may increase most of the runtime overhead back
again. Maybe we can find a better way to implement this.

On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <t...@apache.org> wrote:

Hi,

Thanks for opening this discussion! The proposed enhancement would be
interesting for use cases in our infrastructure as well.

There are scenarios where it makes sense to have multiple disconnected
subgraphs in a single job because it can significantly reduce the
operational burden as well as the runtime overhead. Since we allow
subgraphs to recover independently, then why not allow them to make
progress independently also, which would imply that checkpointing must
succeed for non affected subgraphs as certain behavior is tied to
checkpoint completion, like Kafka offset commit, file output etc.

As for source offset redistribution, offset/position needs to be tied
to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
source framework, it would be hard to implement a source with correct
behavior that does not track the position along with the split.

In Yuan's example, is there a reason why CP8 could not be promoted to
CP10 by the coordinator for PR2 once it receives the notification that
CP10 did not complete? It appears that should be possible since in its
effect it should be no different than no data processed between CP8
and CP10?

Thanks,
Thomas

On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <trohrm...@apache.org>
wrote:
Thanks for the clarification Yuan and Gen,

I agree that the checkpointing of the sources needs to support the
rescaling case, otherwise it does not work. Is there currently a
source
implementation where this wouldn't work? For Kafka it should work
because
we store the offset per assigned partition. For Kinesis it is
probably
the
same. For the Filesource we store the set of unread input splits in
the
source coordinator and the state of the assigned splits in the
sources.
This should probably also work since new splits are only handed out
to
running tasks.

Cheers,
Till

On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yuanmei.w...@gmail.com>
wrote:
Hey Till,

Why rescaling is a problem for pipelined regions/independent
execution
subgraphs:

Take a simplified example :
job graph : source  (2 instances) -> sink (2 instances)
execution graph:
source (1/2)  -> sink (1/2)   [pieplined region 1]
source (2/2)  -> sink (2/2)   [pieplined region 2]

Let's assume checkpoints are still triggered globally, meaning
different
pipelined regions share the global checkpoint id (PR1 CP1 matches
with
PR2
CP1).

Now let's assume PR1 completes CP10 and PR2 completes CP8.

Let's say we want to rescale to parallelism 3 due to increased
input.
- Notice that we can not simply rescale based on the latest
completed
checkpoint (CP8), because PR1 has already had data (CP8 -> CP10)
output
externally.
- Can we take CP10 from PR1 and CP8 from PR2? I think it depends on
how the
source's offset redistribution is implemented.
    The answer is yes if we treat each input partition as
independent
from
each other, *but I am not sure whether we can make that
assumption*.
If not, the rescaling cannot happen until PR1 and PR2 are aligned
with
CPs.
Best
-Yuan







On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <trohrm...@apache.org
wrote:
Hi everyone,

Yuan and Gen could you elaborate why rescaling is a problem if we
say
that
separate pipelined regions can take checkpoints independently?
Conceptually, I somehow think that a pipelined region that is
failed
and
cannot create a new checkpoint is more or less the same as a
pipelined
region that didn't get new input or a very very slow pipelined
region
which
couldn't read new records since the last checkpoint (assuming
that
the
checkpoint coordinator can create a global checkpoint by
combining
individual checkpoints (e.g. taking the last completed checkpoint
from
each
pipelined region)). If this comparison is correct, then this
would
mean
that we have rescaling problems under the latter two cases.

Cheers,
Till

On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <luogen...@gmail.com>
wrote:
Hi Gyula,

Thanks for sharing the idea. As Yuan mentioned, I think we can
discuss
this
within two scopes. One is the job subgraph, the other is the
execution
subgraph, which I suppose is the same as PipelineRegion.

An idea is to individually checkpoint the PipelineRegions, for
the
recovering in a single run.

Flink has now supported PipelineRegion based failover, with a
subset
of a
global checkpoint snapshot. The checkpoint barriers are spread
within a
PipelineRegion, so the checkpointing of individual
PipelineRegions
is
actually independent. Since in a single run of a job, the
PipelineRegions
are fixed, we can individually checkpoint separated
PipelineRegions,
despite what status the other PipelineRegions are, and use a
snapshot
of
a
failing region to recover, instead of the subset of a global
snapshot.
This
can support separated job subgraphs as well, since they will
also
be
separated into different PipelineRegions. I think this can
fulfill
your
needs.

In fact the individual snapshots of all PipelineRegions can
form
a
global
snapshot, and the alignment of snapshots of individual regions
is
not
necessary. But rescaling this global snapshot can be
potentially
complex. I
think it's better to use the individual snapshots in a single
run,
and
take
a global checkpoint/savepoint before restarting the job,
rescaling
it
or
not.

A major issue of this plan is that it breaks the checkpoint
mechanism
of
Flink. As far as I know, even in the approximate recovery, the
snapshot
used to recover a single task is still a part of a global
snapshot. To
implement the individual checkpointing of PipelineRegions,
there
may
need
to be a checkpoint coordinator for each PipelineRegion, and a
new
global
checkpoint coordinator. When the scale goes up, there can be
many
individual regions, which can be a big burden to the job
manager.
The
meaning of the checkpoint id will also be changed, which can
affect
many
aspects. There can be lots of work and risks, and the risks
still
exist
if
we only individually checkpoint separated job subgraphs, since
the
mechanism is still broken. If that is what you need, maybe
separating
them
into different jobs is an easier and better choice, as Caizhi
and
Yuan
mentioned.

On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <
yuanmei.w...@gmail.com
wrote:
Hey Gyula,

That's a very interesting idea. The discussion about the
`Individual`
vs
`Global` checkpoint was raised before, but the main concern
was
from
two
aspects:

- Non-deterministic replaying may lead to an inconsistent
view
of
checkpoint
- It is not easy to form a clear cut of past and future and
hence no
clear
cut of where the start point of the source should begin to
replay
from.
Starting from independent subgraphs as you proposed may be a
good
starting
point. However, when we talk about subgraph, do we mention it
as
a
job
subgraph (each vertex is one or more operators) or execution
subgraph
(each
vertex is a task instance)?

If it is a job subgraph, then indeed, why not separate it
into
multiple
jobs as Caizhi mentioned.
If it is an execution subgraph, then it is difficult to
handle
rescaling
due to inconsistent views of checkpoints between tasks of the
same
operator.

`Individual/Subgraph Checkpointing` is definitely an
interesting
direction
to think of, and I'd love to hear more from you!

Best,

Yuan







On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <
tsreape...@gmail.com>
wrote:
Hi Gyula!

Thanks for raising this discussion. I agree that this will
be
an
interesting feature but I actually have some doubts about
the
motivation
and use case. If there are multiple individual subgraphs in
the
same
job,
why not just distribute them to multiple jobs so that each
job
contains
only one individual graph and can now fail without
disturbing
the
others?

Gyula Fóra <gyf...@apache.org> 于2022年2月7日周一 05:22写道:

Hi all!

At the moment checkpointing only works for healthy jobs
with
all
running
(or some finished) tasks. This sounds reasonable in most
cases
but
there
are a few applications where it would make sense to
checkpoint
failing
jobs
as well.

Due to how the checkpointing mechanism works, subgraphs
that
have a
failing
task cannot be checkpointed without violating the
exactly-once
semantics.
However if the job has multiple independent subgraphs
(that
are
not
connected to each other), even if one subgraph is
failing,
the
other
completely running one could be checkpointed. In these
cases
the
tasks
of
the failing subgraph could simply inherit the last
successful
checkpoint
metadata (before they started failing). This logic would
produce
a
consistent checkpoint.

The job as a whole could now make stateful progress even
if
some
subgraphs
are constantly failing. This can be very valuable if for
some
reason
the
job has a larger number of independent subgraphs that are
expected
to
fail
every once in a while, or if some subgraphs can have
longer
downtimes
that
would now cause the whole job to stall.

What do you think?

Cheers,
Gyula


Reply via email to