Hi Yun,

The termination phase I drafted depends on a de-fact that
`notifyCheckpointComplete` is ignored currently after `close`. Thus, any
external materialization inside `close`(including `endInput`) or
`notifyCheckpointComplete` is either not guaranteed or not committed.

I tried to emphasize no-breaking changes and post-pone migration in later
releases for this termination phase. But now, I am kind of worry about
whether it will cause long-term maintenance hell.

Personally, I think allowing checkpoint after `close`(@Till proposed this
in FLINK-21133) could be the minimal change. But there are concerns:
* It will break some existing code possibly in a silent way.(@Piotr pointed
this already in FLINK-21133)
* I think the name `close` is kind of misleading. (@Piotr suggested
renaming this to `finish` to not break code silently in FLINK-21133)

> stop-with-savepoint --drain would wait for a specific savepoint

For stop-with-savepoint, the checkpoint barrier is already created or
received there.


Best,
Kezhu Wang

On February 25, 2021 at 21:02:30, Yun Gao (yungao...@aliyun.com) wrote:

Hi all,

Very thanks for the discussions!



A. Regarding how to avoid emitting records in notifyCheckpointComplete:

Currently the structure of a new sink is writer -> committer -> global
committer and the paralellism of global committer
must be one. By design it would be used in several cases:
1. writer -> committer: for normal sinks that write pending data in writer
and commit pending data in committer.
2. writer -> global committer: for sinks require the committer's
parallelism be 1, like iceberg sink which
relies on optimistic lock to commit data hopes to reduce the conflicts.
3. writer -> committer -> global committer: for sinks that also need to
write meta data (like _SUCCESS file
or add record in hive metastore) in global committer.

The third case would cause the cascade commit problem. To overcome this
problem, we would like to
1. Previously the global committer also support writing metadata with 2pc.
Now we disable this mode
and always rely on the property that writing metadata is repeatable. With
this limitation there should no
need of cascade commit, but the committer would still emit records in
notifyCheckpointComplete.
2. We further move global committer in the case 3 to the operator
coordinator. Then the committer would
not need to emit records, but communicates with its operator coordinator.

One core issue for using the OperatorCoordinator is how to keep the
communication between the operator
and the OperatorCoordinator exactly-once. Since the message is always from
the oeprator to the OperatorCoordinator
in this case, we would only need to bookkeep the message sent between the
OperatorCoordinator takes snapshot and
the Operator takes snapshot in the state of the Operator.

On how to achieve the change in detail we would still need some think, it
currently seems we would have to had
some modification to the current new sink api.



B. Regarding the stop-with-savepoint --drain

Very thanks @Piotr for the further explanation and now I realize I have
understand wrongly for the semantics of
stop-with-savepoint --drain. Now I think that the problem should be we
should also include the records produced in
`endOfInput()` and `close()` also in the last savepoint, am I correct? If
so, it seems we still have some undetermined options for
the lifecycle of the operator, like in Kezhu's proposal the close() happens
at last, but it seems close() might also emit records (
so now the operator are closed with op1's close() -> op2's endOfInput() ->
op2's close() -> op3's endOfinput -> ...) ?

And on the other side, as Kezhu has also proposed, perhapse we might have
the stop-with-savepoint --drain and normal exit in the same process,
but have slightly difference in that stop-with-savepoint --drain would wait
for a specific savepoint and in normal exit, the operator
might wait for arbitrary checkpoint. If we could achieve not emitting
records in notifyCheckpointComplete, stop-with-savepoint --drain could
be done with one savepoint, and for the normal exit, the operator would not
need to wait for other slow operators to exit.

Best,
Yun



------------------Original Mail ------------------
*Sender:*Kezhu Wang <kez...@gmail.com>
*Send Date:*Thu Feb 25 15:11:53 2021
*Recipients:*Flink Dev <dev@flink.apache.org>, Piotr Nowojski <
piotr.nowoj...@gmail.com>
*CC:*Guowei Ma <guowei....@gmail.com>, jingsongl...@gmail.com <
jingsongl...@gmail.com>
*Subject:*Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
Finished

> Hi all, thanks for driving this and especially Piotr for re-active this
>
> thread.
>
>
>
> First, for `notifyCheckpointComplete`, I have strong preference towards
>
> "shut down the dataflow
>
> pipeline with one checkpoint in total", so I tend to option dropping "send
>
> records" from
>
> `notifyCheckpointComplete` for next level committing in pipeline, if we
>
> ever support it. Without this,
>
> we are unable to stop a pipeline manually with all results materialized.
>
>
>
> Second, for shutdown unification of `stop-with-savepoint --drain` and
>
> FLIP-147, I draft following
>
> phase based on emerging proposals(mainly by Piotr and Till) in this thread
>
> and FLINK-21133.
>
>
>
> ```java
>
> // StreamTask termination phase
>
> finish()(Call StreamOperator.finish in chaining order)
>
> advanceToEndOfEventTime()(nop if for no-source inputs, this could also be
>
> done in input processor or finish ?)
>
> if (there-is-a-pending-terminate-savepoint) {
>
> triggerBarrierForDownStream();
>
> waitCheckpointComplete();
>
> } else if (require-two-phase-commit-in-shutdown) {
>
> waitFinalCheckpoint();
>
> waitCheckpointComplete();
>
> }
>
> // TODO: Ignore/Cancel all checkpoints from this point(or hit to
>
> coordinator in final checkpoint response ?).
>
> close();
>
> ```
>
>
>
> Branches in `if` could reside in different code paths, but the effect
>
> should be similar to above.
>
> The divergence reflects fact that "stop-with-savepoint --drain" is
>
> triggered already while we need
>
> to wait for final checkpoint in natural input exhausting.
>
>
>
> With carefully chosen default functions, we will not break existing
>
> interfaces.
>
> * `StreamOperator.finish`
>
> * `RichFunction.finish`
>
>
>
> For existing codes:
>
> 1. "stop-with-savepoint" with no migration, it should behaves as before.
>
> 2. "require-two-phase-commit-in-shutdown" evaluates to false and no
>
> migration, it is same as before.
>
> 3. "require-two-phase-commit-in-shutdown" evaluates to true and no
>
> migration, records in `close` will
>
> lose.
>
>
>
> For case#3:
>
> A. No external materialization in `StreamOperator.close`: Nothing to lose.
>
> B. External materialization only in `StreamOperator.close`: I don't think
>
> Flink ever claimed that there is
>
> any guarantee for this situation.
>
> C. External materialization in `notifyCheckpointComplete` from
>
> `StreamOperator.close`: But due to fact that
>
> `notifyCheckpointComplete` was ignored after operator
>
> closed(FLINK-16383), so there will be no external
>
> materialization to lose.
>
>
>
> Then, we could recommend users to migrate possible “end-of-stream-flushing”
>
> from “close" to “finish”.
>
>
>
> Best,
>
> Kezhu Wang
>
>
>
>
>
> On February 24, 2021 at 23:47:36, Piotr Nowojski (piotr.nowoj...@gmail.com
> )
>
> wrote:
>
>
>
> Thanks for the reponses Guowei and Yun,
>
>
>
> Could you elaborate more/remind me, what does it mean to replace emitting
>
> results from the `notifyCheckpointComplete` with `OperatorCoordinator`
>
> approach?
>
>
>
> About the discussion in FLINK-21133 and how it relates to FLIP-147. You are
>
> right Yun gao, that in case of `stop-with-savepoint --drain` the whole job
>
> finishes, while in FLIP-147 some of the sources can work for arbitrarily
>
> long time after others have already finished. However from the runtime
>
> perspective, especially how to close a Task and it's operators, there is no
>
> difference between those two cases. In both cases, we need to end input,
>
> shut down processing time timers and commit external side effects (two
>
> phase commit support) before exiting the task. That's how the discussion
>
> about the behaviour of "stop-with-savepoint" was connected with FLIP-147.
>
>
>
> Currently on master, "stop-with-savepoint --drain" drains/flushes buffered
>
> records and deals correctly with timers, but all of that happens AFTER
>
> savepoint was completed. So any records flushed from the operators during
>
> endOfInput/close/shutting down processing timers are never committed to
>
> external systems. This is exactly the same problem as the "two phase
>
> commit" problem of FLIP-147, that should have the same solution and it
>
> should be solved at the same time. For example if we go with the *very
>
> undesirable* "one closed operator per one completed checkpoint/savepoint",
>
> in both cases CheckpointCoordinator, Scheduler and Task would need to keep
>
> the task alive and keep triggering checkpoints for that task, until all
>
> operators in the operator chain are closed (one closed operator per one
>
> completed checkpoint).
>
>
>
> Piotrek
>
>
>
>
>
>
>
> śr., 24 lut 2021 o 11:30 Yun Gao napisał(a):
>
>
>
> > Hi Till, Guowei,
>
> >
>
> > Very thanks for initiating the disucssion and the deep thoughts!
>
> >
>
> > For the notifyCheckpointComplete, I also agree we could try to avoid
>
> > emitting new records in notifyCheckpointComplete via using
>
> > OperatorCoordinator
>
> > for new sink API. Besides, the hive sink might also need some
>
> modification
>
> > for it also emits records in notifyCheckpointComplete.
>
> >
>
> > For unifying the process of stopping with savepoint and finished due to
>
> > all records
>
> > are processed, I also agree with that unifying would always be better if
>
> > we could achieve,
>
> > but I'm still not fully catch up with the implementation: Based on the
>
> > discussion in FLINK-21133,
>
> > my understanding is that for stopping with savepoint, now we want to
>
> first
>
> > stop the source, then we
>
> > trigger a savepoint, and after the source received
>
> > notifyCheckpointComplete, the source would
>
> > start emitting EndOfPartitionEvent to finish the job, am I correct ?
>
> >
>
> > For normal finish, a difference to me might be if we have multiple
>
> > sources, we could not guarantee
>
> > when the sources are to finish. We might have one source run one 1 minute
>
> > and another one run for
>
> > 1 hour. To unify with the process with stop with savepoint, we might need
>
> > to hold the fast source until
>
> > all the sources are finished? An coordinator would be introduced to count
>
> > the number of sources
>
> > runing and trigger the final savepoint / checkpoint. For the extreme
>
> > cases, if we have both bounded and
>
> > unbounded sources, we might only count how much bounded source are
>
> > remaining ? And if all the bounded
>
> > sources are finished we would trigger the special checkpoint. After all
>
> > the bounded part of the graph are
>
> > finished, the the remaining part could still do checkpoint and commit
>
> data
>
> > with FLIP-147.
>
> >
>
> > Best,
>
> > Yun
>
> >
>
> >
>
> >
>
> >
>
> > ------------------Original Mail ------------------
>
> > Sender:Guowei Ma
>
> > Send Date:Wed Feb 24 17:35:36 2021
>
> > Recipients:dev
>
> > CC:Arvid Heise
>
> > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> > Hi, Till
>
> >
>
> > Thank you very much for your careful consideration
>
> >
>
> >
>
> >
>
> > *1. Emit records in `NotifyCheckpointComplete`.*
>
> >
>
> > Sorry for making you misunderstanding because of my expression. I just
>
> >
>
> > want to say the current interface does not prevent users from doing it.
>
> >
>
> > From the perspective of the new sink api, we might not depend on emitting
>
> >
>
> > records in `NotifyCheckpointComplete`, like using `OperatorCoordinator`
>
> >
>
> > instead.
>
> >
>
> >
>
> >
>
> >
>
> >
>
> > *2. What does the FLIP-147 guarantee?*I think initially this FLIP want to
>
> >
>
> > achieve two targets:
>
> >
>
> > 1. Tasks/Operators exit correctly (as you mentioned the lifecycle of a
>
> >
>
> > Task/StreamTask/StreamOperator.).
>
> >
>
> > 2. Continue to trigger checkpoint after some tasks for mixed jobs.
>
> >
>
> >
>
> >
>
> > I think the first thing is related to the discussion in FLINK-21133. If I
>
> >
>
> > understand correctly, in addition to supporting the tasks / operators to
>
> >
>
> > exit correctly, now we also want to unify the process of the tasks and
>
> >
>
> > operators for savepoint / finish.
>
> >
>
> > I think the second thing is orthogonal to the FLINK-21133 because there
>
> are
>
> >
>
> > topologies that have both the bounded and unbounded input.
>
> >
>
> >
>
> >
>
> > *3. How to unify the operator exit process of FLIP-147 with
>
> >
>
> > stop-with-savepoint?*
>
> >
>
> > I am not very sure about how to do it yet. But if I understand the
>
> >
>
> > discussion in the jira correctly it needs to introduce some logic into
>
> >
>
> > `CheckpointCoordinator`, which responses for triggering “the unified
>
> >
>
> > operator exit process”. Am I correct?
>
> >
>
> >
>
> >
>
> > Best,
>
> >
>
> > Guowei
>
> >
>
> >
>
> >
>
> >
>
> >
>
> > On Tue, Feb 23, 2021 at 5:10 PM Till Rohrmann wrote:
>
> >
>
> >
>
> >
>
> > > Thanks for the explanation Yun and Guowei. I have to admit that I do
>
> not
>
> >
>
> > > fully understand why this is strictly required but I think that we are
>
> >
>
> > > touching two very important aspects which might have far fetching
>
> >
>
> > > consequences for how Flink works:
>
> >
>
> > >
>
> >
>
> > > 1) Do we want to allow that multiple checkpoints are required to
>
> >
>
> > > materialize results?
>
> >
>
> > > 2) Do we want to allow to emit records in notifyCheckpointComplete?
>
> >
>
> > >
>
> >
>
> > > For 1) I am not sure whether this has been discussed within the
>
> community
>
> >
>
> > > sufficiently. Requiring multiple checkpoints to materialize a result
>
> >
>
> > > because of multi level committers has the consequence that we increase
>
> > the
>
> >
>
> > > latency from checkpoint interval to #levels * checkpoint interval.
>
> >
>
> > > Moreover, having to drain the pipeline in multiple steps, would break
>
> the
>
> >
>
> > > stop-with-savepoint --drain because which savepoint do you report to
>
> the
>
> >
>
> > > user?
>
> >
>
> > >
>
> >
>
> > > For 2) allowing to send records after the final
>
> notifyCheckpointComplete
>
> >
>
> > > will effectively mean that we need to shut down a topology in multiple
>
> >
>
> > > steps (in the worst case one operator per checkpoint). This would be a
>
> >
>
> > > strong argument for not allowing this to me. The fact that users can
>
> send
>
> >
>
> > > records after the notifyCheckpointComplete is more by accident than by
>
> >
>
> > > design. I think we should make this a very deliberate decision and in
>
> > doubt
>
> >
>
> > > I would be in favour of a more restrictive model unless there is a very
>
> >
>
> > > good reason why this should be supported.
>
> >
>
> > >
>
> >
>
> > > Taking also the discussion in FLINK-21133 [1] into account, it seems to
>
> > me
>
> >
>
> > > that we haven't really understood what kind of guarantees we want to
>
> give
>
> >
>
> > > to our users and how the final checkpoint should exactly work. I
>
> > understand
>
> >
>
> > > that this is not included in the first scope of FLIP-147 but I think
>
> this
>
> >
>
> > > is so important that we should figure this out asap. Also because the
>
> > exact
>
> >
>
> > > shut down behaviour will have to be aligned with the lifecycle of a
>
> >
>
> > > Task/StreamTask/StreamOperator. And last but not least because other
>
> >
>
> > > features such as the new sink API start building upon a shut down model
>
> >
>
> > > which has not been fully understood/agreed upon.
>
> >
>
> > >
>
> >
>
> > > [1] https://issues.apache.org/jira/browse/FLINK-21133
>
> >
>
> > >
>
> >
>
> > > Cheers,
>
> >
>
> > > Till
>
> >
>
> > >
>
> >
>
> > > On Tue, Feb 16, 2021 at 9:45 AM Guowei Ma wrote:
>
> >
>
> > >
>
> >
>
> > > > Thanks Yun for the detailed explanation.
>
> >
>
> > > > A simple supplementary explanation about the sink case: Maybe we
>
> could
>
> >
>
> > > use
>
> >
>
> > > > `OperatorCoordinator` to avoid sending the element to the downstream
>
> >
>
> > > > operator.
>
> >
>
> > > > But I agree we could not limit the users not to emit records in the
>
> >
>
> > > > `notiyCheckpointComplete`.
>
> >
>
> > > >
>
> >
>
> > > > Best,
>
> >
>
> > > > Guowei
>
> >
>
> > > >
>
> >
>
> > > >
>
> >
>
> > > > On Tue, Feb 16, 2021 at 2:06 PM Yun Gao
>
> >
>
> > > > wrote:
>
> >
>
> > > >
>
> >
>
> > > > > Hi all,
>
> >
>
> > > > >
>
> >
>
> > > > > I'd like to first detail the issue with emitting records in
>
> >
>
> > > > > notifyCheckpointComplete for context. For specific usage,
>
> >
>
> > > > > an example would be for sink, it might want to write some metadata
>
> >
>
> > > after
>
> >
>
> > > > > all the transactions are committed
>
> >
>
> > > > > (like write a marker file _SUCCESS to the output directory). This
>
> > case
>
> >
>
> > > is
>
> >
>
> > > > > currently supported via the two level
>
> >
>
> > > > > committers of the new sink API: when received endOfInput(), the
>
> >
>
> > > Committer
>
> >
>
> > > > > wait for another checkpoint to
>
> >
>
> > > > > commits all the pending transactions and emit the list of files to
>
> > the
>
> >
>
> > > > > GlobalCommitter. The GlobalCommitter
>
> >
>
> > > > > would wait for another checkpoint to also write the metadata with
>
> 2pc
>
> >
>
> > > > > (Although sometimes 2pc is not needed
>
> >
>
> > > > > for writing metadata, it should be only an optimization and still
>
> >
>
> > > > requires
>
> >
>
> > > > > the Committer do commit before
>
> >
>
> > > > > notifying the global Committer. Also another note is
>
> GlobalCommitter
>
> > is
>
> >
>
> > > > > also added for some other cases
>
> >
>
> > > > > like some sinks want an commiter with dop = 1, like IceBergSink).
>
> >
>
> > > > >
>
> >
>
> > > > > However, a more general issue to me is that currently we do not
>
> limit
>
> >
>
> > > > > users to not emit records in
>
> >
>
> > > > > notifyCheckpointComplete in the API level. The sink case could be
>
> >
>
> > > viewed
>
> >
>
> > > > > as a special case, but in addition
>
> >
>
> > > > > to this one, logically users could also implement their own cases
>
> > that
>
> >
>
> > > > > emits records in notifyCheckpointComplete.
>
> >
>
> > > > >
>
> >
>
> > > > > Best,
>
> >
>
> > > > > Yun
>
> >
>
> > > > >
>
> >
>
> > > > > ------------------Original Mail ------------------
>
> >
>
> > > > > Sender:Arvid Heise
>
> >
>
> > > > > Send Date:Fri Feb 12 20:46:04 2021
>
> >
>
> > > > > Recipients:dev
>
> >
>
> > > > > CC:Yun Gao
>
> >
>
> > > > > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
>
> >
>
> > > Finished
>
> >
>
> > > > > Hi Piotr,
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > Thank you for raising your concern. Unfortunately, I do not have a
>
> >
>
> > > better
>
> >
>
> > > > >
>
> >
>
> > > > > idea than doing closing of operators intermittently with
>
> checkpoints
>
> > (=
>
> >
>
> > > > >
>
> >
>
> > > > > multiple last checkpoints).
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > However, two ideas on how to improve the overall user experience:
>
> >
>
> > > > >
>
> >
>
> > > > > 1. If an operator is not relying on notifyCheckpointComplete, we
>
> can
>
> >
>
> > > > close
>
> >
>
> > > > >
>
> >
>
> > > > > it faster (without waiting for a checkpoint). In general, I'd
>
> assume
>
> >
>
> > > that
>
> >
>
> > > > >
>
> >
>
> > > > > almost all non-sinks behave that way.
>
> >
>
> > > > >
>
> >
>
> > > > > 2. We may increase the checkpointing frequency for the last
>
> >
>
> > > checkpoints.
>
> >
>
> > > > We
>
> >
>
> > > > >
>
> >
>
> > > > > need to avoid overloading checkpoint storages and task managers,
>
> but
>
> > I
>
> >
>
> > > > >
>
> >
>
> > > > > assume the more operators are closed, the lower the checkpointing
>
> >
>
> > > > interval
>
> >
>
> > > > >
>
> >
>
> > > > > can be.
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > For 1, I'd propose to add (name TBD):
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > default boolean StreamOperator#requiresFinalCheckpoint() {
>
> >
>
> > > > >
>
> >
>
> > > > > return true;
>
> >
>
> > > > >
>
> >
>
> > > > > }
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > This means all operators are conservatively (=slowly) closed. For
>
> > most
>
> >
>
> > > > >
>
> >
>
> > > > > operators, we can then define their behavior by overriding in
>
> >
>
> > > > >
>
> >
>
> > > > > AbstractUdfStreamOperator
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > @Override
>
> >
>
> > > > >
>
> >
>
> > > > > boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() {
>
> >
>
> > > > >
>
> >
>
> > > > > return userFunction instanceof CheckpointListener;
>
> >
>
> > > > >
>
> >
>
> > > > > }
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > This idea can be further refined in also adding
>
> > requiresFinalCheckpoint
>
> >
>
> > > > to
>
> >
>
> > > > >
>
> >
>
> > > > > CheckpointListener to exclude all operators with UDFs that
>
> implement
>
> >
>
> > > > >
>
> >
>
> > > > > CheckpointListener but do not need it for 2pc.
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > @Override
>
> >
>
> > > > >
>
> >
>
> > > > > boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() {
>
> >
>
> > > > >
>
> >
>
> > > > > return userFunction instanceof CheckpointListener &&
>
> >
>
> > > > >
>
> >
>
> > > > > ((CheckpointListener) userFunction).requiresFinalCheckpoint();
>
> >
>
> > > > >
>
> >
>
> > > > > }
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > That approach would also work for statebackends/snapshot strategies
>
> >
>
> > > that
>
> >
>
> > > > >
>
> >
>
> > > > > require some 2pc.
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > If we can contain it to the @PublicEvolving StreamOperator, it
>
> would
>
> > be
>
> >
>
> > > > >
>
> >
>
> > > > > better of course.
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > Best,
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > Arvid
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > On Fri, Feb 12, 2021 at 11:36 AM Piotr Nowojski
>
> >
>
> > > > >
>
> >
>
> > > > > wrote:
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > Hey,
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > I would like to raise a concern about implementation of the final
>
> >
>
> > > > >
>
> >
>
> > > > > > checkpoints taking into account operators/functions that are
>
> >
>
> > > > implementing
>
> >
>
> > > > >
>
> >
>
> > > > > > two phase commit (2pc) protocol for exactly-once processing with
>
> > some
>
> >
>
> > > > >
>
> >
>
> > > > > > external state (kept outside of the Flink). Primarily
>
> exactly-once
>
> >
>
> > > > sinks.
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > First of all, as I understand it, this is not planned in the
>
> first
>
> >
>
> > > > > version
>
> >
>
> > > > >
>
> >
>
> > > > > > of this FLIP. I'm fine with that, however I would strongly
>
> > emphasize
>
> >
>
> > > > this
>
> >
>
> > > > >
>
> >
>
> > > > > > in every place we will be mentioning FLIP-147 efforts. This is
>
> >
>
> > > because
>
> >
>
> > > > > me,
>
> >
>
> > > > >
>
> >
>
> > > > > > as a user, upon hearing "Flink supports checkpointing with
>
> bounded
>
> >
>
> > > > > inputs"
>
> >
>
> > > > >
>
> >
>
> > > > > > I would expect 2pc to work properly and to commit the external
>
> side
>
> >
>
> > > > > effects
>
> >
>
> > > > >
>
> >
>
> > > > > > upon finishing. As it is now, I (as a user) would be surprised
>
> > with a
>
> >
>
> > > > >
>
> >
>
> > > > > > silent data loss (of not committed trailing data). This is just a
>
> >
>
> > > > remark,
>
> >
>
> > > > >
>
> >
>
> > > > > > that we need to attach this warning to every blog
>
> >
>
> > > > post/documentation/user
>
> >
>
> > > > >
>
> >
>
> > > > > > mailing list response related to "Support Checkpoints After Tasks
>
> >
>
> > > > >
>
> >
>
> > > > > > Finished". Also I would suggest to prioritize the follow up of
>
> >
>
> > > > supporting
>
> >
>
> > > > >
>
> >
>
> > > > > > 2pc.
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > Secondly, I think we are missing how difficult and problematic
>
> will
>
> >
>
> > > be
>
> >
>
> > > > > 2pc
>
> >
>
> > > > >
>
> >
>
> > > > > > support with the final checkpoint.
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > For starters, keep in mind that currently 2pc can be implemented
>
> by
>
> >
>
> > > > users
>
> >
>
> > > > >
>
> >
>
> > > > > > using both `@Public` APIs as functions and `@PublicEvolving`
>
> >
>
> > > operators
>
> >
>
> > > > in
>
> >
>
> > > > >
>
> >
>
> > > > > > any place in the job graph. It's not limited to only the sinks.
>
> For
>
> >
>
> > > > >
>
> >
>
> > > > > > example users could easily implement the `AsynFunction` (for
>
> >
>
> > > > >
>
> >
>
> > > > > > `AsyncWaitOperator`) that is using 2pc based on the
>
> >
>
> > > > `CheckpointListener`
>
> >
>
> > > > >
>
> >
>
> > > > > > interface. I'm not saying it's common, probably just a tiny
>
> > minority
>
> >
>
> > > of
>
> >
>
> > > > >
>
> >
>
> > > > > > users are doing that (if any at all), but nevertheless that's
>
> >
>
> > > possible
>
> >
>
> > > > > and
>
> >
>
> > > > >
>
> >
>
> > > > > > currently (implicitly?) supported in Flink.
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > Next complication is the support of bounded streams
>
> >
>
> > > (`BoundedOneInput`
>
> >
>
> > > > or
>
> >
>
> > > > >
>
> >
>
> > > > > > `BoundedMultiInput` interfaces) and the closing/shutdown
>
> procedure
>
> > of
>
> >
>
> > > > the
>
> >
>
> > > > >
>
> >
>
> > > > > > operators. Currently it works as follows:
>
> >
>
> > > > >
>
> >
>
> > > > > > 0. Task receives EndOfPartitionEvent (or source finishes)
>
> >
>
> > > > >
>
> >
>
> > > > > > 1. `endOfInput` is called on the first operator in the chain
>
> >
>
> > > > >
>
> >
>
> > > > > > 2. We quiesce the processing timers
>
> >
>
> > > > >
>
> >
>
> > > > > > (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for
>
> > the
>
> >
>
> > > > > first
>
> >
>
> > > > >
>
> >
>
> > > > > > operator, so no new timers will be triggered
>
> >
>
> > > > >
>
> >
>
> > > > > > 3. We wait for the already fired timers to finish executing
>
> > (spinning
>
> >
>
> > > > >
>
> >
>
> > > > > > mailbox loop)
>
> >
>
> > > > >
>
> >
>
> > > > > > 4. We are closing the first operator
>
> >
>
> > > > >
>
> >
>
> > > > > > 5. We go to the next (second) operator in the chain and repeat
>
> the
>
> >
>
> > > > steps
>
> >
>
> > > > > 1.
>
> >
>
> > > > >
>
> >
>
> > > > > > to 5.
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > This is because operators can emit data after processing
>
> >
>
> > > `endOfInput`,
>
> >
>
> > > > > from
>
> >
>
> > > > >
>
> >
>
> > > > > > timers, async mailbox actions and inside the `close` method
>
> itself.
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > Now the problem is to support the final checkpoint with 2pc, we
>
> > need
>
> >
>
> > > > >
>
> >
>
> > > > > > trigger `snapshotState` and `notifyCheckpointComplete` call at
>
> the
>
> >
>
> > > very
>
> >
>
> > > > >
>
> >
>
> > > > > > least only after `endOfInput` call on the operator. Probably the
>
> > best
>
> >
>
> > > > > place
>
> >
>
> > > > >
>
> >
>
> > > > > > would be in between steps 3. and 4. However that means, we would
>
> be
>
> >
>
> > > > > forced
>
> >
>
> > > > >
>
> >
>
> > > > > > to wait for steps 1. to 3. to finish, then wait for a next
>
> > checkpoint
>
> >
>
> > > > to
>
> >
>
> > > > >
>
> >
>
> > > > > > trigger AND complete, before finally closing the head operator,
>
> and
>
> >
>
> > > > only
>
> >
>
> > > > >
>
> >
>
> > > > > > then we can start closing the next operator in the chain:
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > 0. Task receives EndOfPartitionEvent (or source finishes)
>
> >
>
> > > > >
>
> >
>
> > > > > > 1. `endOfInput` is called on the first operator in the chain
>
> >
>
> > > > >
>
> >
>
> > > > > > 2. We quiesce the processing timers
>
> >
>
> > > > >
>
> >
>
> > > > > > (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for
>
> > the
>
> >
>
> > > > > first
>
> >
>
> > > > >
>
> >
>
> > > > > > operator, so no new timers will be triggered
>
> >
>
> > > > >
>
> >
>
> > > > > > 3. We wait for the already fired timers to finish executing
>
> > (spinning
>
> >
>
> > > > >
>
> >
>
> > > > > > mailbox loop)
>
> >
>
> > > > >
>
> >
>
> > > > > > *3b. We wait for one more checkpoint to trigger and for the
>
> >
>
> > > > >
>
> >
>
> > > > > > `notifyCheckpointComplete` RPC.*
>
> >
>
> > > > >
>
> >
>
> > > > > > 4. We are closing the first operator
>
> >
>
> > > > >
>
> >
>
> > > > > > 5. We go to the next (second) operator in the chain and repeat
>
> the
>
> >
>
> > > > steps
>
> >
>
> > > > > 1.
>
> >
>
> > > > >
>
> >
>
> > > > > > to 5.
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > That means, we can close one operator per successful checkpoint.
>
> To
>
> >
>
> > > > close
>
> >
>
> > > > >
>
> >
>
> > > > > > 10 operators, we would need 10 successful checkpoints.
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > I was thinking about different approaches to this problem, and I
>
> >
>
> > > > couldn't
>
> >
>
> > > > >
>
> >
>
> > > > > > find any viable ones. All I could think of would break the
>
> current
>
> >
>
> > > > >
>
> >
>
> > > > > > `@Public` API and/or would be ugly/confusing for the users.
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > For example a relatively simple solution, to introduce a
>
> `preClose`
>
> >
>
> > > or
>
> >
>
> > > > >
>
> >
>
> > > > > > `flush` method to the operators, with a contract that after
>
> >
>
> > > > >
>
> >
>
> > > > > > `flush`, operators would be forbidden from emitting more records,
>
> > so
>
> >
>
> > > > that
>
> >
>
> > > > >
>
> >
>
> > > > > > we can replace step 4. with this `flush` call, and then having a
>
> >
>
> > > single
>
> >
>
> > > > >
>
> >
>
> > > > > > checkpoint to finish 2pc for all of the operators inside the
>
> chain,
>
> >
>
> > > > > doesn't
>
> >
>
> > > > >
>
> >
>
> > > > > > work. Sheer fact of adding this `flush` method and changing the
>
> >
>
> > > > contract
>
> >
>
> > > > >
>
> >
>
> > > > > > would break the current API and Yun Gao has pointed out to me,
>
> that
>
> >
>
> > > we
>
> >
>
> > > > >
>
> >
>
> > > > > > either already support, or want to support operators that are
>
> >
>
> > > emitting
>
> >
>
> > > > >
>
> >
>
> > > > > > records from within the `notifyCheckpointComplete` call:
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > > Yun Gao:
>
> >
>
> > > > >
>
> >
>
> > > > > > > like with the new sink api there might be writer -> committer
>
> ->
>
> >
>
> > > > global
>
> >
>
> > > > >
>
> >
>
> > > > > > committer, the committer would need to wait for the last
>
> checkpoint
>
> >
>
> > > to
>
> >
>
> > > > >
>
> >
>
> > > > > > commit
>
> >
>
> > > > >
>
> >
>
> > > > > > > the last piece of data, and after that it also need to emit the
>
> >
>
> > > list
>
> >
>
> > > > of
>
> >
>
> > > > >
>
> >
>
> > > > > > transactions get committed to global committer to do some
>
> >
>
> > > finalization
>
> >
>
> > > > >
>
> >
>
> > > > > > logic.
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > So it wouldn't solve the problem (at least not fully).
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > I don't know if anyone has any better ideas how to solve this
>
> >
>
> > > problem?
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > Piotrek
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > pt., 15 sty 2021 o 14:57 Yun Gao
>
> >
>
> > > > >
>
> >
>
> > > > > > napisał(a):
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > > Hi Aljoscha,
>
> >
>
> > > > >
>
> >
>
> > > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > > I think so since we seems to do not have other divergence and
>
> new
>
> >
>
> > > > >
>
> >
>
> > > > > > > objections now. I'll open the vote then. Very thanks!
>
> >
>
> > > > >
>
> >
>
> > > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > > Best,
>
> >
>
> > > > >
>
> >
>
> > > > > > > Yun
>
> >
>
> > > > >
>
> >
>
> > > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > >
>
> > ------------------------------------------------------------------
>
> >
>
> > > > >
>
> >
>
> > > > > > > From:Aljoscha Krettek
>
> >
>
> > > > >
>
> >
>
> > > > > > > Send Time:2021 Jan. 15 (Fri.) 21:24
>
> >
>
> > > > >
>
> >
>
> > > > > > > To:dev
>
> >
>
> > > > >
>
> >
>
> > > > > > > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
>
> >
>
> > > > > Finished
>
> >
>
> > > > >
>
> >
>
> > > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > > Thanks for the summary! I think we can now move towards a
>
> [VOTE]
>
> >
>
> > > > > thread,
>
> >
>
> > > > >
>
> >
>
> > > > > > > right?
>
> >
>
> > > > >
>
> >
>
> > > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > > On 2021/01/15 13:43, Yun Gao wrote:
>
> >
>
> > > > >
>
> >
>
> > > > > > > >1) For the problem that the "new" root task coincidently
>
> > finished
>
> >
>
> > > > >
>
> >
>
> > > > > > > >before getting triggered successfully, we have listed two
>
> > options
>
> >
>
> > > in
>
> >
>
> > > > >
>
> >
>
> > > > > > > >the FLIP-147[1], for the first version, now we are not tend to
>
> > go
>
> >
>
> > > > with
>
> >
>
> > > > >
>
> >
>
> > > > > > > >the first option that JM would re-compute and re-trigger new
>
> >
>
> > > sources
>
> >
>
> > > > >
>
> >
>
> > > > > > > >when it realized some tasks are not triggered successfully.
>
> This
>
> >
>
> > > > > option
>
> >
>
> > > > >
>
> >
>
> > > > > > > >would avoid the complexity of adding new PRC and duplicating
>
> > task
>
> >
>
> > > > >
>
> >
>
> > > > > > > >states, and in average case it would not cause too much
>
> > overhead.
>
> >
>
> > > > >
>
> >
>
> > > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > > You wrote "we are *not* tend to go with the first option", but
>
> I
>
> >
>
> > > > think
>
> >
>
> > > > >
>
> >
>
> > > > > > > you meant wo write "we tend to *now* go with the first option",
>
> >
>
> > > > right?
>
> >
>
> > > > >
>
> >
>
> > > > > > > That's also how it is in the FLIP, I just wanted to clarify for
>
> > the
>
> >
>
> > > > >
>
> >
>
> > > > > > > mailing list.
>
> >
>
> > > > >
>
> >
>
> > > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > > >
>
> >
>
> > > > >
>
> >
>
> > > > >
>
> >
>
> > > >
>
> >
>
> > >
>
> >
>
> >
>
>

Reply via email to