Hi Piotr, Thank you for raising your concern. Unfortunately, I do not have a better idea than doing closing of operators intermittently with checkpoints (= multiple last checkpoints).
However, two ideas on how to improve the overall user experience: 1. If an operator is not relying on notifyCheckpointComplete, we can close it faster (without waiting for a checkpoint). In general, I'd assume that almost all non-sinks behave that way. 2. We may increase the checkpointing frequency for the last checkpoints. We need to avoid overloading checkpoint storages and task managers, but I assume the more operators are closed, the lower the checkpointing interval can be. For 1, I'd propose to add (name TBD): default boolean StreamOperator#requiresFinalCheckpoint() { return true; } This means all operators are conservatively (=slowly) closed. For most operators, we can then define their behavior by overriding in AbstractUdfStreamOperator @Override boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() { return userFunction instanceof CheckpointListener; } This idea can be further refined in also adding requiresFinalCheckpoint to CheckpointListener to exclude all operators with UDFs that implement CheckpointListener but do not need it for 2pc. @Override boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() { return userFunction instanceof CheckpointListener && ((CheckpointListener) userFunction).requiresFinalCheckpoint(); } That approach would also work for statebackends/snapshot strategies that require some 2pc. If we can contain it to the @PublicEvolving StreamOperator, it would be better of course. Best, Arvid On Fri, Feb 12, 2021 at 11:36 AM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hey, > > I would like to raise a concern about implementation of the final > checkpoints taking into account operators/functions that are implementing > two phase commit (2pc) protocol for exactly-once processing with some > external state (kept outside of the Flink). Primarily exactly-once sinks. > > First of all, as I understand it, this is not planned in the first version > of this FLIP. I'm fine with that, however I would strongly emphasize this > in every place we will be mentioning FLIP-147 efforts. This is because me, > as a user, upon hearing "Flink supports checkpointing with bounded inputs" > I would expect 2pc to work properly and to commit the external side effects > upon finishing. As it is now, I (as a user) would be surprised with a > silent data loss (of not committed trailing data). This is just a remark, > that we need to attach this warning to every blog post/documentation/user > mailing list response related to "Support Checkpoints After Tasks > Finished". Also I would suggest to prioritize the follow up of supporting > 2pc. > > Secondly, I think we are missing how difficult and problematic will be 2pc > support with the final checkpoint. > > For starters, keep in mind that currently 2pc can be implemented by users > using both `@Public` APIs as functions and `@PublicEvolving` operators in > any place in the job graph. It's not limited to only the sinks. For > example users could easily implement the `AsynFunction` (for > `AsyncWaitOperator`) that is using 2pc based on the `CheckpointListener` > interface. I'm not saying it's common, probably just a tiny minority of > users are doing that (if any at all), but nevertheless that's possible and > currently (implicitly?) supported in Flink. > > Next complication is the support of bounded streams (`BoundedOneInput` or > `BoundedMultiInput` interfaces) and the closing/shutdown procedure of the > operators. Currently it works as follows: > 0. Task receives EndOfPartitionEvent (or source finishes) > 1. `endOfInput` is called on the first operator in the chain > 2. We quiesce the processing timers > (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the first > operator, so no new timers will be triggered > 3. We wait for the already fired timers to finish executing (spinning > mailbox loop) > 4. We are closing the first operator > 5. We go to the next (second) operator in the chain and repeat the steps 1. > to 5. > > This is because operators can emit data after processing `endOfInput`, from > timers, async mailbox actions and inside the `close` method itself. > > Now the problem is to support the final checkpoint with 2pc, we need > trigger `snapshotState` and `notifyCheckpointComplete` call at the very > least only after `endOfInput` call on the operator. Probably the best place > would be in between steps 3. and 4. However that means, we would be forced > to wait for steps 1. to 3. to finish, then wait for a next checkpoint to > trigger AND complete, before finally closing the head operator, and only > then we can start closing the next operator in the chain: > > 0. Task receives EndOfPartitionEvent (or source finishes) > 1. `endOfInput` is called on the first operator in the chain > 2. We quiesce the processing timers > (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the first > operator, so no new timers will be triggered > 3. We wait for the already fired timers to finish executing (spinning > mailbox loop) > *3b. We wait for one more checkpoint to trigger and for the > `notifyCheckpointComplete` RPC.* > 4. We are closing the first operator > 5. We go to the next (second) operator in the chain and repeat the steps 1. > to 5. > > That means, we can close one operator per successful checkpoint. To close > 10 operators, we would need 10 successful checkpoints. > > I was thinking about different approaches to this problem, and I couldn't > find any viable ones. All I could think of would break the current > `@Public` API and/or would be ugly/confusing for the users. > > For example a relatively simple solution, to introduce a `preClose` or > `flush` method to the operators, with a contract that after > `flush`, operators would be forbidden from emitting more records, so that > we can replace step 4. with this `flush` call, and then having a single > checkpoint to finish 2pc for all of the operators inside the chain, doesn't > work. Sheer fact of adding this `flush` method and changing the contract > would break the current API and Yun Gao has pointed out to me, that we > either already support, or want to support operators that are emitting > records from within the `notifyCheckpointComplete` call: > > > Yun Gao: > > like with the new sink api there might be writer -> committer -> global > committer, the committer would need to wait for the last checkpoint to > commit > > the last piece of data, and after that it also need to emit the list of > transactions get committed to global committer to do some finalization > logic. > > So it wouldn't solve the problem (at least not fully). > > I don't know if anyone has any better ideas how to solve this problem? > > Piotrek > > pt., 15 sty 2021 o 14:57 Yun Gao <yungao...@aliyun.com.invalid> > napisaĆ(a): > > > Hi Aljoscha, > > > > I think so since we seems to do not have other divergence and new > > objections now. I'll open the vote then. Very thanks! > > > > Best, > > Yun > > > > > > ------------------------------------------------------------------ > > From:Aljoscha Krettek <aljos...@apache.org> > > Send Time:2021 Jan. 15 (Fri.) 21:24 > > To:dev <dev@flink.apache.org> > > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished > > > > Thanks for the summary! I think we can now move towards a [VOTE] thread, > > right? > > > > On 2021/01/15 13:43, Yun Gao wrote: > > >1) For the problem that the "new" root task coincidently finished > > >before getting triggered successfully, we have listed two options in > > >the FLIP-147[1], for the first version, now we are not tend to go with > > >the first option that JM would re-compute and re-trigger new sources > > >when it realized some tasks are not triggered successfully. This option > > >would avoid the complexity of adding new PRC and duplicating task > > >states, and in average case it would not cause too much overhead. > > > > You wrote "we are *not* tend to go with the first option", but I think > > you meant wo write "we tend to *now* go with the first option", right? > > That's also how it is in the FLIP, I just wanted to clarify for the > > mailing list. > > > > >