Hi all, 

I'd like to first detail the issue with emitting records in 
notifyCheckpointComplete for context. For specific usage, 
an example would be for sink, it might want to write some metadata after all 
the transactions are committed
(like write a marker file _SUCCESS to the output directory). This case is 
currently supported via the two level
committers of the new sink API: when received endOfInput(), the Committer wait 
for another checkpoint to 
commits all the pending transactions and emit the list of files to the 
GlobalCommitter. The GlobalCommitter 
would wait for another checkpoint to also write the metadata with 2pc (Although 
sometimes 2pc is not needed 
for writing metadata, it should be only an optimization and still requires the 
Committer do commit before 
notifying the global Committer. Also another note is GlobalCommitter is also 
added for some other cases 
like some sinks want an commiter with dop = 1, like IceBergSink).

However, a more general issue to me is that currently we do not limit users to 
not emit records in 
notifyCheckpointComplete in the API level. The sink case could be viewed as a 
special case, but in addition 
to this one, logically users could also implement their own cases that emits 
records in notifyCheckpointComplete.

Best,
Yun

 ------------------Original Mail ------------------
Sender:Arvid Heise <ar...@apache.org>
Send Date:Fri Feb 12 20:46:04 2021
Recipients:dev <dev@flink.apache.org>
CC:Yun Gao <yungao...@aliyun.com>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Piotr,



Thank you for raising your concern. Unfortunately, I do not have a better

idea than doing closing of operators intermittently with checkpoints (=

multiple last checkpoints).



However, two ideas on how to improve the overall user experience:

1. If an operator is not relying on notifyCheckpointComplete, we can close

it faster (without waiting for a checkpoint). In general, I'd assume that

almost all non-sinks behave that way.

2. We may increase the checkpointing frequency for the last checkpoints. We

need to avoid overloading checkpoint storages and task managers, but I

assume the more operators are closed, the lower the checkpointing interval

can be.



For 1, I'd propose to add (name TBD):



default boolean StreamOperator#requiresFinalCheckpoint() {

 return true;

}



This means all operators are conservatively (=slowly) closed. For most

operators, we can then define their behavior by overriding in

AbstractUdfStreamOperator



@Override

boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() {

 return userFunction instanceof CheckpointListener;

}



This idea can be further refined in also adding requiresFinalCheckpoint to

CheckpointListener to exclude all operators with UDFs that implement

CheckpointListener but do not need it for 2pc.



@Override

boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() {

 return userFunction instanceof CheckpointListener &&

 ((CheckpointListener) userFunction).requiresFinalCheckpoint();

}



That approach would also work for statebackends/snapshot strategies that

require some 2pc.



If we can contain it to the @PublicEvolving StreamOperator, it would be

better of course.



Best,



Arvid



On Fri, Feb 12, 2021 at 11:36 AM Piotr Nowojski 

wrote:



> Hey,

>

> I would like to raise a concern about implementation of the final

> checkpoints taking into account operators/functions that are implementing

> two phase commit (2pc) protocol for exactly-once processing with some

> external state (kept outside of the Flink). Primarily exactly-once sinks.

>

> First of all, as I understand it, this is not planned in the first version

> of this FLIP. I'm fine with that, however I would strongly emphasize this

> in every place we will be mentioning FLIP-147 efforts. This is because me,

> as a user, upon hearing "Flink supports checkpointing with bounded inputs"

> I would expect 2pc to work properly and to commit the external side effects

> upon finishing. As it is now, I (as a user) would be surprised with a

> silent data loss (of not committed trailing data). This is just a remark,

> that we need to attach this warning to every blog post/documentation/user

> mailing list response related to "Support Checkpoints After Tasks

> Finished". Also I would suggest to prioritize the follow up of supporting

> 2pc.

>

> Secondly, I think we are missing how difficult and problematic will be 2pc

> support with the final checkpoint.

>

> For starters, keep in mind that currently 2pc can be implemented by users

> using both `@Public` APIs as functions and `@PublicEvolving` operators in

> any place in the job graph. It's not limited to only the sinks. For

> example users could easily implement the `AsynFunction` (for

> `AsyncWaitOperator`) that is using 2pc based on the `CheckpointListener`

> interface. I'm not saying it's common, probably just a tiny minority of

> users are doing that (if any at all), but nevertheless that's possible and

> currently (implicitly?) supported in Flink.

>

> Next complication is the support of bounded streams (`BoundedOneInput` or

> `BoundedMultiInput` interfaces) and the closing/shutdown procedure of the

> operators. Currently it works as follows:

> 0. Task receives EndOfPartitionEvent (or source finishes)

> 1. `endOfInput` is called on the first operator in the chain

> 2. We quiesce the processing timers

> (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the first

> operator, so no new timers will be triggered

> 3. We wait for the already fired timers to finish executing (spinning

> mailbox loop)

> 4. We are closing the first operator

> 5. We go to the next (second) operator in the chain and repeat the steps 1.

> to 5.

>

> This is because operators can emit data after processing `endOfInput`, from

> timers, async mailbox actions and inside the `close` method itself.

>

> Now the problem is to support the final checkpoint with 2pc, we need

> trigger `snapshotState` and `notifyCheckpointComplete` call at the very

> least only after `endOfInput` call on the operator. Probably the best place

> would be in between steps 3. and 4. However that means, we would be forced

> to wait for steps 1. to 3. to finish, then wait for a next checkpoint to

> trigger AND complete, before finally closing the head operator, and only

> then we can start closing the next operator in the chain:

>

> 0. Task receives EndOfPartitionEvent (or source finishes)

> 1. `endOfInput` is called on the first operator in the chain

> 2. We quiesce the processing timers

> (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the first

> operator, so no new timers will be triggered

> 3. We wait for the already fired timers to finish executing (spinning

> mailbox loop)

> *3b. We wait for one more checkpoint to trigger and for the

> `notifyCheckpointComplete` RPC.*

> 4. We are closing the first operator

> 5. We go to the next (second) operator in the chain and repeat the steps 1.

> to 5.

>

> That means, we can close one operator per successful checkpoint. To close

> 10 operators, we would need 10 successful checkpoints.

>

> I was thinking about different approaches to this problem, and I couldn't

> find any viable ones. All I could think of would break the current

> `@Public` API and/or would be ugly/confusing for the users.

>

> For example a relatively simple solution, to introduce a `preClose` or

> `flush` method to the operators, with a contract that after

> `flush`, operators would be forbidden from emitting more records, so that

> we can replace step 4. with this `flush` call, and then having a single

> checkpoint to finish 2pc for all of the operators inside the chain, doesn't

> work. Sheer fact of adding this `flush` method and changing the contract

> would break the current API and Yun Gao has pointed out to me, that we

> either already support, or want to support operators that are emitting

> records from within the `notifyCheckpointComplete` call:

>

> > Yun Gao:

> > like with the new sink api there might be writer -> committer -> global

> committer, the committer would need to wait for the last checkpoint to

> commit

> > the last piece of data, and after that it also need to emit the list of

> transactions get committed to global committer to do some finalization

> logic.

>

> So it wouldn't solve the problem (at least not fully).

>

> I don't know if anyone has any better ideas how to solve this problem?

>

> Piotrek

>

> pt., 15 sty 2021 o 14:57 Yun Gao 

> napisaƂ(a):

>

> > Hi Aljoscha,

> >

> > I think so since we seems to do not have other divergence and new

> > objections now. I'll open the vote then. Very thanks!

> >

> > Best,

> > Yun

> >

> >

> > ------------------------------------------------------------------

> > From:Aljoscha Krettek 

> > Send Time:2021 Jan. 15 (Fri.) 21:24

> > To:dev 

> > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

> >

> > Thanks for the summary! I think we can now move towards a [VOTE] thread,

> > right?

> >

> > On 2021/01/15 13:43, Yun Gao wrote:

> > >1) For the problem that the "new" root task coincidently finished

> > >before getting triggered successfully, we have listed two options in

> > >the FLIP-147[1], for the first version, now we are not tend to go with

> > >the first option that JM would re-compute and re-trigger new sources

> > >when it realized some tasks are not triggered successfully. This option

> > >would avoid the complexity of adding new PRC and duplicating task

> > >states, and in average case it would not cause too much overhead.

> >

> > You wrote "we are *not* tend to go with the first option", but I think

> > you meant wo write "we tend to *now* go with the first option", right?

> > That's also how it is in the FLIP, I just wanted to clarify for the

> > mailing list.

> >

> >

>

Reply via email to