Hi!

Thanks for writing this FLIP, and interesting idea.

I would like to understand a bit better why exactly we need this, and what
our alternative options are. My main concerns are the following:

*(1) Can we achieve this without changing the checkpointing mechanism?*

The checkpoint mechanism is already complex and it is super sensitive (a
bug there threatens every user with data loss). Such components must stay
as minimal as possible. When there is a way to solve this outside
checkpointing, then that should be the default approach.

To check that, I would really like to understand this specific CDC use case
a bit more. If I understand it correctly, the issue is that checkpoints are
not possible while the initial database snapshot is ingested, before the
actual Change-Data processing starts.

(a) What is the reason that no checkpoint is possible during this time? Why
is there no way to store in the checkpoint the position of the snapshot
ingestion? Is it because the snapshots are streamed in from a JDBC query
and that is not deterministic, meaning retrying the query yields different
rows (or order of rows)?

(b) If there can be no checkpoint progress during the snapshot ingestion
and all checkpoints are rejected, what prevents us from just storing an
empty state in the checkpoint? Meaning the system may have taken a bunch of
checkpoints, but any recovery would start the source from the beginning.
Is there some concern about not emitting data during that database snapshot
reading phase?

(c) There is quite some work in the direction of blending batch and
streaming execution, meaning having an initial batch execution step for
some data (like the backlog in Kafka, or a DB snapshot) and then switching
to streaming execution for the real-time stream (new records in Kafka, or
CDC records). If we focus on advancing that, we get the behavior that all
the initial data is processed in one batch (no intermediate checkpoint),
and we also get the performance speedup provided by not having to work with
RocksDB for the batch step. I think this is where the future is.

Regarding the case to not checkpoint in-between transaction markers: I
don't know how often these occur, but I would expect potentially very
often. If we reject whenever some operator is in-between transaction
markers, we will reject very many (possibly almost all) checkpoints once we
get to a certain scale (1000s of parallel operators).
I think we need a different approach there, for example first grouping
events into transaction sessions, or so.


*(2) Inability to Checkpoint shouldn't become a first-class concept.*

I think this is really a question of Flink design principles and direction.
I believe we need to push Flink to a direction that it can always
checkpoint, and more frequently and more reliably than at the moment.
Various ongoing efforts move the system in that direction, giving it the
ability to checkpoint always (async sources, async sinks, non-blocking
mailbox), more predictably (unaligned checkpoints) and more frequently
(log-based checkpoints).

That is something that makes it much easier to operate the system. When
checkpoints become infrequent or unpredictable in the interval (when they
are not reliable), it becomes a big operational problem.

If we start assuming that operators can arbitrarily reject checkpoints, we
will easily get into situations where many checkpoints are rejected and it
takes quite a while until a checkpoint can happen. There are many jobs with
1000s of operators, and when one of them rejects a checkpoint, the
checkpoint as a whole fails.
Letting operators decide when to take checkpoints is a cool property
that makes many things easier, but I am skeptical whether this is
compatible with global checkpoints.

Not having frequent checkpoints (but very infrequent ones) is really more
of a batch situation, and that brings me back to the point above: I think a
really well-working solution for this would be the hybrid batch/streaming
execution.

For that reason, I am very skeptical that we should add a first-class API
that suggests that a checkpoint happens only sometimes, when everyone
happens to agree to it.
I think that sends the wrong signal, both to developer direction, and to
users in how to implement their applications. Rejecting checkpoints should
be a rare thing, so not something an API suggests can happen all the time.

Rejecting checkpoints for a few times at the beginning of a job might also
be feasible, when later checkpoint generally succeed, but then again, the
proposed API does not suggest that, it suggests any checkpoint is always
rejectably by all operators.

If we really need checkpoint rejection, I think Piotr's idea goes into a
pretty good direction: Having a "RejectionException" that leads to a
checkpoint abort in the regular way, but doesn't increment the
failure counter.

*Conclusion*

Because of the sensitivity of the checkpoint mechanism, and because the API
proposed here suggests a behavior of the system that I think isn't
generally feasible, I am not sure we should go ahead with this without
further clarification.

Maybe we can approach it like this.
  - See if there are different ways to create this behavior: Empty
checkpoints, Hybrid Source, etc.
  - If no other way to implement this is possible, go with a
RejectionException as a temporary workaround.
  - Invest into hybrid batch/streaming execution as the proper long-term
solution for setups like this.

Apologies for the long text. This is a pretty critical decision here, so I
wanted to provide all the context.

Best,
Stephan



On Thu, Jun 10, 2021 at 5:32 AM Senhong Liu <senhong...@gmail.com> wrote:

> Hi Piotrek,
>
> Thanks for your feedback!
>
> 1. Why not ExternallyInducedSourceReader/ExternallyInducedSource?
> a. The
> `org.apache.flink.api.connector.source.ExternallyInducedSourceReader` and
> `org.apache.flink.api.connector.source.ExternallyInducedSource` seems like
> playing the role of checkpoint coordinator. Once it is implmented, the
> source reader might need to design the similar logic as what the checkpoint
> coordinator does. I think it would be better to let the checkpoint
> coordinator plays its own role of triggering the checkpoint.
> b. The new interface can not only implemented in the source operator, but
> also the other operators. However, I am not having a solid use case about
> implementing it to downstream operator so far. So basically it's for the
> future compatibility.
>
> 2. Why not exception?
> Actually, I don't think rejecting a checkpoint is an exception. Just like
> the soft failure I introduced in the FLIP, the rejection and
> therefore checkpoint failure could be acceptable to the user. However, the
> tolerable checkpoint number is only counting on those failures that are NOT
> acceptable to the users, e.g., checkpoint expiration.
>
>
>
> Piotr Nowojski <pnowoj...@apache.org> 于2021年6月9日周三 下午3:37写道:
>
> > Hi Senhong,
> >
> > Thanks for the proposal. I have a couple of questions.
> >
> > Have you seen
> > `org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource` (for
> > the legacy SourceFunction) and
> > `org.apache.flink.api.connector.source.ExternallyInducedSourceReader`
> (for
> > FLIP-27) interfaces? They work the other way around, by letting the
> source
> > to trigger/initiate a checkpoint, instead of declining it. Could it be
> made
> > to work in your use case? If not, can you explain why?
> >
> > Regarding declining/failing the checkpoint (without blocking the barrier
> > waiting for snapshot availability), can not you achieve the same thing
> by a
> > combination of throwing an exception in for example
> > `org.apache.flink.api.connector.source.SourceReader#snapshotState` call
> and
> > setting the tolerable checkpoint failure number? [1]
> >
> > Best, Piotrek
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setTolerableCheckpointFailureNumber-int-
> >
> > śr., 9 cze 2021 o 09:11 Senhong Liu <senhong...@gmail.com> napisał(a):
> >
> > > Here is some brief context about the new feature.
> > >
> > > 1. Actively checkpoint rejecting by the operator. Follow by the current
> > > checkpoint mechanism, one more preliminary step is added to help the
> > > operator determine that if it is able to take snapshots. The
> preliminary
> > > step is a new API provided to the users/developers. The new API will be
> > > implemented in the Source API (the new one based on FLIP-27) for CDC
> > > implementation. The new API can also be implemented in other operator
> if
> > > necessary.
> > >
> > > 2. Handling the failure returned from the operator. If the checkpoint
> is
> > > rejected by the operator, an appropriate failure reason needs to be
> > > returned
> > > from the operator as well. In the current design, two failure reasons
> are
> > > listed, soft failure and hard failure. The previous one would be
> ignored
> > by
> > > the Flink and the later one would be counted as continuous checkpoint
> > > failure according to the current checkpoint failure manager mechanism.
> > >
> > > 3. To prevent that the operator keeps reporting soft failure and
> > therefore
> > > no checkpoint can be completed for a long time, we introduce a new
> > > configuration about the tolerable checkpoint failure timeout, which is
> a
> > > timer that starts with the checkpoint scheduler. Overall, the timer
> would
> > > only be reset if and only if the checkpoint completes. Otherwise, it
> > would
> > > do nothing until the tolerable timeout is hit. If the timer rings, it
> > would
> > > then trigger the current checkpoint failover.
> > >
> > > Question:
> > > a. According to the current design, the checkpoint might fail for a
> > > possibly
> > > long time with a large checkpoint interval, for example. Is there any
> > > better
> > > idea to make the checkpoint more likely to succeed? For example,
> trigger
> > > the
> > > checkpoint immediately after the last one is rejected. But it seems
> > > unappropriate because that would increase the overhead.
> > > b. Is there any better idea on handling the soft failure?
> > >
> > >
> > >
> > >
> > >
> > > --
> > > Sent from:
> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > >
> >
>

Reply via email to