Hi Stephan,

Thank you so much for all these practical suggestions.

I agree that hybrid batch/streaming execution is an awesome idea and that
no checkpoint could be successful during the batch phase. However, the
checkpoint could fail frequently because of the large batch size, which
might be unnecessary and confusing to users.

An empty state is a good idea but still, I was wondering that it might be
confusing to users. I think this could be a substitute option and I will
look deep into it. But making that more flexible to users might also be a
good direction.

Best,
Senhong

Stephan Ewen <se...@apache.org> 于2021年7月13日周二 下午7:28写道:

> Before jumping into the designs of other mechanisms, can we clarify whether
> adjusting checkpointing is really the right approach here?
>
> What about storing empty state in the checkpoints, so that any recovery
> simply does a full replay of the input.
>
> Regarding the hybrid batch/streaming execution: The nice part about this
> would be that no checkpoints are necessary during the batch phase.
>
> On Tue, Jul 13, 2021 at 5:51 AM Senhong Liu <senhong...@gmail.com> wrote:
>
> > Hi Stephan,
> >
> > Thank you so much for replying and suggesting! Follow by your question, I
> > would give some explanation and new thoughts.
> >
> > (1) More detailed info about CDC use case.
> >
> > In the previous design of FLINK-CDC, they would start a full-table
> scanning
> > at the beginning by holding a read-write lock. Taking a checkpoint in the
> > middle would be meaningless since there is no guarantee that data would
> not
> > be changed during recovery.
> >
> > (2) Could hybrid batch/streaming execution solve the problem?
> >
> > It is acceptable to me and actually, the current Flink-CDC is actually
> > implementing a new feature like this[1][2]. But a large batch size could
> > still fail the checkpoint frequently after all and it could still be
> > confusing for users to understand what's going on.
> >
> > (3) Making the checkpoint more controllable and reliable.
> >
> > Overall, I am also looking for a way of making the checkpoint more
> > controllable and reliable, since hybrid batch/streaming execution is
> also a
> > tricky way of controlling the checkpoint. But maybe it is not
> configurable
> > or developable for users to implement for their specific use case.
> >
> > However, combining with your opinion, I am thinking of designing a set of
> > REST API to control the checkpoint scheduler might be more acceptable.
> >
> >
> > [1]
> > https://github.com/debezium/debezium-design-documents/blob/main/DDD-3.md
> > [2]
> >
> >
> https://github.com/ververica/flink-cdc-connectors/commit/c6ca6c187471b538a9774258d2572194e1bb855b
> >
> > Stephan Ewen <se...@apache.org> 于2021年7月13日周二 上午1:25写道:
> >
> > > Hi!
> > >
> > > Thanks for writing this FLIP, and interesting idea.
> > >
> > > I would like to understand a bit better why exactly we need this, and
> > what
> > > our alternative options are. My main concerns are the following:
> > >
> > > *(1) Can we achieve this without changing the checkpointing mechanism?*
> > >
> > > The checkpoint mechanism is already complex and it is super sensitive
> (a
> > > bug there threatens every user with data loss). Such components must
> stay
> > > as minimal as possible. When there is a way to solve this outside
> > > checkpointing, then that should be the default approach.
> > >
> > > To check that, I would really like to understand this specific CDC use
> > case
> > > a bit more. If I understand it correctly, the issue is that checkpoints
> > are
> > > not possible while the initial database snapshot is ingested, before
> the
> > > actual Change-Data processing starts.
> > >
> > > (a) What is the reason that no checkpoint is possible during this time?
> > Why
> > > is there no way to store in the checkpoint the position of the snapshot
> > > ingestion? Is it because the snapshots are streamed in from a JDBC
> query
> > > and that is not deterministic, meaning retrying the query yields
> > different
> > > rows (or order of rows)?
> > >
> > > (b) If there can be no checkpoint progress during the snapshot
> ingestion
> > > and all checkpoints are rejected, what prevents us from just storing an
> > > empty state in the checkpoint? Meaning the system may have taken a
> bunch
> > of
> > > checkpoints, but any recovery would start the source from the
> beginning.
> > > Is there some concern about not emitting data during that database
> > snapshot
> > > reading phase?
> > >
> > > (c) There is quite some work in the direction of blending batch and
> > > streaming execution, meaning having an initial batch execution step for
> > > some data (like the backlog in Kafka, or a DB snapshot) and then
> > switching
> > > to streaming execution for the real-time stream (new records in Kafka,
> or
> > > CDC records). If we focus on advancing that, we get the behavior that
> all
> > > the initial data is processed in one batch (no intermediate
> checkpoint),
> > > and we also get the performance speedup provided by not having to work
> > with
> > > RocksDB for the batch step. I think this is where the future is.
> > >
> > > Regarding the case to not checkpoint in-between transaction markers: I
> > > don't know how often these occur, but I would expect potentially very
> > > often. If we reject whenever some operator is in-between transaction
> > > markers, we will reject very many (possibly almost all) checkpoints
> once
> > we
> > > get to a certain scale (1000s of parallel operators).
> > > I think we need a different approach there, for example first grouping
> > > events into transaction sessions, or so.
> > >
> > >
> > > *(2) Inability to Checkpoint shouldn't become a first-class concept.*
> > >
> > > I think this is really a question of Flink design principles and
> > direction.
> > > I believe we need to push Flink to a direction that it can always
> > > checkpoint, and more frequently and more reliably than at the moment.
> > > Various ongoing efforts move the system in that direction, giving it
> the
> > > ability to checkpoint always (async sources, async sinks, non-blocking
> > > mailbox), more predictably (unaligned checkpoints) and more frequently
> > > (log-based checkpoints).
> > >
> > > That is something that makes it much easier to operate the system. When
> > > checkpoints become infrequent or unpredictable in the interval (when
> they
> > > are not reliable), it becomes a big operational problem.
> > >
> > > If we start assuming that operators can arbitrarily reject checkpoints,
> > we
> > > will easily get into situations where many checkpoints are rejected and
> > it
> > > takes quite a while until a checkpoint can happen. There are many jobs
> > with
> > > 1000s of operators, and when one of them rejects a checkpoint, the
> > > checkpoint as a whole fails.
> > > Letting operators decide when to take checkpoints is a cool property
> > > that makes many things easier, but I am skeptical whether this is
> > > compatible with global checkpoints.
> > >
> > > Not having frequent checkpoints (but very infrequent ones) is really
> more
> > > of a batch situation, and that brings me back to the point above: I
> > think a
> > > really well-working solution for this would be the hybrid
> batch/streaming
> > > execution.
> > >
> > > For that reason, I am very skeptical that we should add a first-class
> API
> > > that suggests that a checkpoint happens only sometimes, when everyone
> > > happens to agree to it.
> > > I think that sends the wrong signal, both to developer direction, and
> to
> > > users in how to implement their applications. Rejecting checkpoints
> > should
> > > be a rare thing, so not something an API suggests can happen all the
> > time.
> > >
> > > Rejecting checkpoints for a few times at the beginning of a job might
> > also
> > > be feasible, when later checkpoint generally succeed, but then again,
> the
> > > proposed API does not suggest that, it suggests any checkpoint is
> always
> > > rejectably by all operators.
> > >
> > > If we really need checkpoint rejection, I think Piotr's idea goes into
> a
> > > pretty good direction: Having a "RejectionException" that leads to a
> > > checkpoint abort in the regular way, but doesn't increment the
> > > failure counter.
> > >
> > > *Conclusion*
> > >
> > > Because of the sensitivity of the checkpoint mechanism, and because the
> > API
> > > proposed here suggests a behavior of the system that I think isn't
> > > generally feasible, I am not sure we should go ahead with this without
> > > further clarification.
> > >
> > > Maybe we can approach it like this.
> > >   - See if there are different ways to create this behavior: Empty
> > > checkpoints, Hybrid Source, etc.
> > >   - If no other way to implement this is possible, go with a
> > > RejectionException as a temporary workaround.
> > >   - Invest into hybrid batch/streaming execution as the proper
> long-term
> > > solution for setups like this.
> > >
> > > Apologies for the long text. This is a pretty critical decision here,
> so
> > I
> > > wanted to provide all the context.
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > >
> > > On Thu, Jun 10, 2021 at 5:32 AM Senhong Liu <senhong...@gmail.com>
> > wrote:
> > >
> > > > Hi Piotrek,
> > > >
> > > > Thanks for your feedback!
> > > >
> > > > 1. Why not ExternallyInducedSourceReader/ExternallyInducedSource?
> > > > a. The
> > > > `org.apache.flink.api.connector.source.ExternallyInducedSourceReader`
> > and
> > > > `org.apache.flink.api.connector.source.ExternallyInducedSource` seems
> > > like
> > > > playing the role of checkpoint coordinator. Once it is implmented,
> the
> > > > source reader might need to design the similar logic as what the
> > > checkpoint
> > > > coordinator does. I think it would be better to let the checkpoint
> > > > coordinator plays its own role of triggering the checkpoint.
> > > > b. The new interface can not only implemented in the source operator,
> > but
> > > > also the other operators. However, I am not having a solid use case
> > about
> > > > implementing it to downstream operator so far. So basically it's for
> > the
> > > > future compatibility.
> > > >
> > > > 2. Why not exception?
> > > > Actually, I don't think rejecting a checkpoint is an exception. Just
> > like
> > > > the soft failure I introduced in the FLIP, the rejection and
> > > > therefore checkpoint failure could be acceptable to the user.
> However,
> > > the
> > > > tolerable checkpoint number is only counting on those failures that
> are
> > > NOT
> > > > acceptable to the users, e.g., checkpoint expiration.
> > > >
> > > >
> > > >
> > > > Piotr Nowojski <pnowoj...@apache.org> 于2021年6月9日周三 下午3:37写道:
> > > >
> > > > > Hi Senhong,
> > > > >
> > > > > Thanks for the proposal. I have a couple of questions.
> > > > >
> > > > > Have you seen
> > > > > `org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource`
> > > (for
> > > > > the legacy SourceFunction) and
> > > > >
> `org.apache.flink.api.connector.source.ExternallyInducedSourceReader`
> > > > (for
> > > > > FLIP-27) interfaces? They work the other way around, by letting the
> > > > source
> > > > > to trigger/initiate a checkpoint, instead of declining it. Could it
> > be
> > > > made
> > > > > to work in your use case? If not, can you explain why?
> > > > >
> > > > > Regarding declining/failing the checkpoint (without blocking the
> > > barrier
> > > > > waiting for snapshot availability), can not you achieve the same
> > thing
> > > > by a
> > > > > combination of throwing an exception in for example
> > > > > `org.apache.flink.api.connector.source.SourceReader#snapshotState`
> > call
> > > > and
> > > > > setting the tolerable checkpoint failure number? [1]
> > > > >
> > > > > Best, Piotrek
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setTolerableCheckpointFailureNumber-int-
> > > > >
> > > > > śr., 9 cze 2021 o 09:11 Senhong Liu <senhong...@gmail.com>
> > napisał(a):
> > > > >
> > > > > > Here is some brief context about the new feature.
> > > > > >
> > > > > > 1. Actively checkpoint rejecting by the operator. Follow by the
> > > current
> > > > > > checkpoint mechanism, one more preliminary step is added to help
> > the
> > > > > > operator determine that if it is able to take snapshots. The
> > > > preliminary
> > > > > > step is a new API provided to the users/developers. The new API
> > will
> > > be
> > > > > > implemented in the Source API (the new one based on FLIP-27) for
> > CDC
> > > > > > implementation. The new API can also be implemented in other
> > operator
> > > > if
> > > > > > necessary.
> > > > > >
> > > > > > 2. Handling the failure returned from the operator. If the
> > checkpoint
> > > > is
> > > > > > rejected by the operator, an appropriate failure reason needs to
> be
> > > > > > returned
> > > > > > from the operator as well. In the current design, two failure
> > reasons
> > > > are
> > > > > > listed, soft failure and hard failure. The previous one would be
> > > > ignored
> > > > > by
> > > > > > the Flink and the later one would be counted as continuous
> > checkpoint
> > > > > > failure according to the current checkpoint failure manager
> > > mechanism.
> > > > > >
> > > > > > 3. To prevent that the operator keeps reporting soft failure and
> > > > > therefore
> > > > > > no checkpoint can be completed for a long time, we introduce a
> new
> > > > > > configuration about the tolerable checkpoint failure timeout,
> which
> > > is
> > > > a
> > > > > > timer that starts with the checkpoint scheduler. Overall, the
> timer
> > > > would
> > > > > > only be reset if and only if the checkpoint completes. Otherwise,
> > it
> > > > > would
> > > > > > do nothing until the tolerable timeout is hit. If the timer
> rings,
> > it
> > > > > would
> > > > > > then trigger the current checkpoint failover.
> > > > > >
> > > > > > Question:
> > > > > > a. According to the current design, the checkpoint might fail
> for a
> > > > > > possibly
> > > > > > long time with a large checkpoint interval, for example. Is there
> > any
> > > > > > better
> > > > > > idea to make the checkpoint more likely to succeed? For example,
> > > > trigger
> > > > > > the
> > > > > > checkpoint immediately after the last one is rejected. But it
> seems
> > > > > > unappropriate because that would increase the overhead.
> > > > > > b. Is there any better idea on handling the soft failure?
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Sent from:
> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to