Thanks Yun for the detailed explanation.
A simple supplementary explanation about the sink case: Maybe we could use
`OperatorCoordinator` to avoid sending the element to the downstream
operator.
But I agree we could not limit the users not to emit records in the
`notiyCheckpointComplete`.

Best,
Guowei


On Tue, Feb 16, 2021 at 2:06 PM Yun Gao <yungao...@aliyun.com.invalid>
wrote:

> Hi all,
>
> I'd like to first detail the issue with emitting records in
> notifyCheckpointComplete for context. For specific usage,
> an example would be for sink, it might want to write some metadata after
> all the transactions are committed
> (like write a marker file _SUCCESS to the output directory). This case is
> currently supported via the two level
> committers of the new sink API: when received endOfInput(), the Committer
> wait for another checkpoint to
> commits all the pending transactions and emit the list of files to the
> GlobalCommitter. The GlobalCommitter
> would wait for another checkpoint to also write the metadata with 2pc
> (Although sometimes 2pc is not needed
> for writing metadata, it should be only an optimization and still requires
> the Committer do commit before
> notifying the global Committer. Also another note is GlobalCommitter is
> also added for some other cases
> like some sinks want an commiter with dop = 1, like IceBergSink).
>
> However, a more general issue to me is that currently we do not limit
> users to not emit records in
> notifyCheckpointComplete in the API level. The sink case could be viewed
> as a special case, but in addition
> to this one, logically users could also implement their own cases that
> emits records in notifyCheckpointComplete.
>
> Best,
> Yun
>
>  ------------------Original Mail ------------------
> Sender:Arvid Heise <ar...@apache.org>
> Send Date:Fri Feb 12 20:46:04 2021
> Recipients:dev <dev@flink.apache.org>
> CC:Yun Gao <yungao...@aliyun.com>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
> Hi Piotr,
>
>
>
> Thank you for raising your concern. Unfortunately, I do not have a better
>
> idea than doing closing of operators intermittently with checkpoints (=
>
> multiple last checkpoints).
>
>
>
> However, two ideas on how to improve the overall user experience:
>
> 1. If an operator is not relying on notifyCheckpointComplete, we can close
>
> it faster (without waiting for a checkpoint). In general, I'd assume that
>
> almost all non-sinks behave that way.
>
> 2. We may increase the checkpointing frequency for the last checkpoints. We
>
> need to avoid overloading checkpoint storages and task managers, but I
>
> assume the more operators are closed, the lower the checkpointing interval
>
> can be.
>
>
>
> For 1, I'd propose to add (name TBD):
>
>
>
> default boolean StreamOperator#requiresFinalCheckpoint() {
>
>  return true;
>
> }
>
>
>
> This means all operators are conservatively (=slowly) closed. For most
>
> operators, we can then define their behavior by overriding in
>
> AbstractUdfStreamOperator
>
>
>
> @Override
>
> boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() {
>
>  return userFunction instanceof CheckpointListener;
>
> }
>
>
>
> This idea can be further refined in also adding requiresFinalCheckpoint to
>
> CheckpointListener to exclude all operators with UDFs that implement
>
> CheckpointListener but do not need it for 2pc.
>
>
>
> @Override
>
> boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() {
>
>  return userFunction instanceof CheckpointListener &&
>
>  ((CheckpointListener) userFunction).requiresFinalCheckpoint();
>
> }
>
>
>
> That approach would also work for statebackends/snapshot strategies that
>
> require some 2pc.
>
>
>
> If we can contain it to the @PublicEvolving StreamOperator, it would be
>
> better of course.
>
>
>
> Best,
>
>
>
> Arvid
>
>
>
> On Fri, Feb 12, 2021 at 11:36 AM Piotr Nowojski
>
> wrote:
>
>
>
> > Hey,
>
> >
>
> > I would like to raise a concern about implementation of the final
>
> > checkpoints taking into account operators/functions that are implementing
>
> > two phase commit (2pc) protocol for exactly-once processing with some
>
> > external state (kept outside of the Flink). Primarily exactly-once sinks.
>
> >
>
> > First of all, as I understand it, this is not planned in the first
> version
>
> > of this FLIP. I'm fine with that, however I would strongly emphasize this
>
> > in every place we will be mentioning FLIP-147 efforts. This is because
> me,
>
> > as a user, upon hearing "Flink supports checkpointing with bounded
> inputs"
>
> > I would expect 2pc to work properly and to commit the external side
> effects
>
> > upon finishing. As it is now, I (as a user) would be surprised with a
>
> > silent data loss (of not committed trailing data). This is just a remark,
>
> > that we need to attach this warning to every blog post/documentation/user
>
> > mailing list response related to "Support Checkpoints After Tasks
>
> > Finished". Also I would suggest to prioritize the follow up of supporting
>
> > 2pc.
>
> >
>
> > Secondly, I think we are missing how difficult and problematic will be
> 2pc
>
> > support with the final checkpoint.
>
> >
>
> > For starters, keep in mind that currently 2pc can be implemented by users
>
> > using both `@Public` APIs as functions and `@PublicEvolving` operators in
>
> > any place in the job graph. It's not limited to only the sinks. For
>
> > example users could easily implement the `AsynFunction` (for
>
> > `AsyncWaitOperator`) that is using 2pc based on the `CheckpointListener`
>
> > interface. I'm not saying it's common, probably just a tiny minority of
>
> > users are doing that (if any at all), but nevertheless that's possible
> and
>
> > currently (implicitly?) supported in Flink.
>
> >
>
> > Next complication is the support of bounded streams (`BoundedOneInput` or
>
> > `BoundedMultiInput` interfaces) and the closing/shutdown procedure of the
>
> > operators. Currently it works as follows:
>
> > 0. Task receives EndOfPartitionEvent (or source finishes)
>
> > 1. `endOfInput` is called on the first operator in the chain
>
> > 2. We quiesce the processing timers
>
> > (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the
> first
>
> > operator, so no new timers will be triggered
>
> > 3. We wait for the already fired timers to finish executing (spinning
>
> > mailbox loop)
>
> > 4. We are closing the first operator
>
> > 5. We go to the next (second) operator in the chain and repeat the steps
> 1.
>
> > to 5.
>
> >
>
> > This is because operators can emit data after processing `endOfInput`,
> from
>
> > timers, async mailbox actions and inside the `close` method itself.
>
> >
>
> > Now the problem is to support the final checkpoint with 2pc, we need
>
> > trigger `snapshotState` and `notifyCheckpointComplete` call at the very
>
> > least only after `endOfInput` call on the operator. Probably the best
> place
>
> > would be in between steps 3. and 4. However that means, we would be
> forced
>
> > to wait for steps 1. to 3. to finish, then wait for a next checkpoint to
>
> > trigger AND complete, before finally closing the head operator, and only
>
> > then we can start closing the next operator in the chain:
>
> >
>
> > 0. Task receives EndOfPartitionEvent (or source finishes)
>
> > 1. `endOfInput` is called on the first operator in the chain
>
> > 2. We quiesce the processing timers
>
> > (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the
> first
>
> > operator, so no new timers will be triggered
>
> > 3. We wait for the already fired timers to finish executing (spinning
>
> > mailbox loop)
>
> > *3b. We wait for one more checkpoint to trigger and for the
>
> > `notifyCheckpointComplete` RPC.*
>
> > 4. We are closing the first operator
>
> > 5. We go to the next (second) operator in the chain and repeat the steps
> 1.
>
> > to 5.
>
> >
>
> > That means, we can close one operator per successful checkpoint. To close
>
> > 10 operators, we would need 10 successful checkpoints.
>
> >
>
> > I was thinking about different approaches to this problem, and I couldn't
>
> > find any viable ones. All I could think of would break the current
>
> > `@Public` API and/or would be ugly/confusing for the users.
>
> >
>
> > For example a relatively simple solution, to introduce a `preClose` or
>
> > `flush` method to the operators, with a contract that after
>
> > `flush`, operators would be forbidden from emitting more records, so that
>
> > we can replace step 4. with this `flush` call, and then having a single
>
> > checkpoint to finish 2pc for all of the operators inside the chain,
> doesn't
>
> > work. Sheer fact of adding this `flush` method and changing the contract
>
> > would break the current API and Yun Gao has pointed out to me, that we
>
> > either already support, or want to support operators that are emitting
>
> > records from within the `notifyCheckpointComplete` call:
>
> >
>
> > > Yun Gao:
>
> > > like with the new sink api there might be writer -> committer -> global
>
> > committer, the committer would need to wait for the last checkpoint to
>
> > commit
>
> > > the last piece of data, and after that it also need to emit the list of
>
> > transactions get committed to global committer to do some finalization
>
> > logic.
>
> >
>
> > So it wouldn't solve the problem (at least not fully).
>
> >
>
> > I don't know if anyone has any better ideas how to solve this problem?
>
> >
>
> > Piotrek
>
> >
>
> > pt., 15 sty 2021 o 14:57 Yun Gao
>
> > napisaƂ(a):
>
> >
>
> > > Hi Aljoscha,
>
> > >
>
> > > I think so since we seems to do not have other divergence and new
>
> > > objections now. I'll open the vote then. Very thanks!
>
> > >
>
> > > Best,
>
> > > Yun
>
> > >
>
> > >
>
> > > ------------------------------------------------------------------
>
> > > From:Aljoscha Krettek
>
> > > Send Time:2021 Jan. 15 (Fri.) 21:24
>
> > > To:dev
>
> > > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
> Finished
>
> > >
>
> > > Thanks for the summary! I think we can now move towards a [VOTE]
> thread,
>
> > > right?
>
> > >
>
> > > On 2021/01/15 13:43, Yun Gao wrote:
>
> > > >1) For the problem that the "new" root task coincidently finished
>
> > > >before getting triggered successfully, we have listed two options in
>
> > > >the FLIP-147[1], for the first version, now we are not tend to go with
>
> > > >the first option that JM would re-compute and re-trigger new sources
>
> > > >when it realized some tasks are not triggered successfully. This
> option
>
> > > >would avoid the complexity of adding new PRC and duplicating task
>
> > > >states, and in average case it would not cause too much overhead.
>
> > >
>
> > > You wrote "we are *not* tend to go with the first option", but I think
>
> > > you meant wo write "we tend to *now* go with the first option", right?
>
> > > That's also how it is in the FLIP, I just wanted to clarify for the
>
> > > mailing list.
>
> > >
>
> > >
>
> >
>
>

Reply via email to