Hi Piotr,

Thank you for raising your concern. Unfortunately, I do not have a better
idea than doing closing of operators intermittently with checkpoints (=
multiple last checkpoints).

However, two ideas on how to improve the overall user experience:
1. If an operator is not relying on notifyCheckpointComplete, we can close
it faster (without waiting for a checkpoint). In general, I'd assume that
almost all non-sinks behave that way.
2. We may increase the checkpointing frequency for the last checkpoints. We
need to avoid overloading checkpoint storages and task managers, but I
assume the more operators are closed, the lower the checkpointing interval
can be.

For 1, I'd propose to add (name TBD):

default boolean StreamOperator#requiresFinalCheckpoint() {
    return true;
}

This means all operators are conservatively (=slowly) closed. For most
operators, we can then define their behavior by overriding in
AbstractUdfStreamOperator

@Override
boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() {
    return userFunction instanceof CheckpointListener;
}

This idea can be further refined in also adding requiresFinalCheckpoint to
CheckpointListener to exclude all operators with UDFs that implement
CheckpointListener but do not need it for 2pc.

@Override
boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() {
    return userFunction instanceof CheckpointListener &&
            ((CheckpointListener) userFunction).requiresFinalCheckpoint();
}

That approach would also work for statebackends/snapshot strategies that
require some 2pc.

If we can contain it to the @PublicEvolving StreamOperator, it would be
better of course.

Best,

Arvid

On Fri, Feb 12, 2021 at 11:36 AM Piotr Nowojski <pnowoj...@apache.org>
wrote:

> Hey,
>
> I would like to raise a concern about implementation of the final
> checkpoints taking into account operators/functions that are implementing
> two phase commit (2pc) protocol for exactly-once processing with some
> external state (kept outside of the Flink). Primarily exactly-once sinks.
>
> First of all, as I understand it, this is not planned in the first version
> of this FLIP. I'm fine with that, however I would strongly emphasize this
> in every place we will be mentioning FLIP-147 efforts. This is because me,
> as a user, upon hearing "Flink supports checkpointing with bounded inputs"
> I would expect 2pc to work properly and to commit the external side effects
> upon finishing. As it is now, I (as a user) would be surprised with a
> silent data loss (of not committed trailing data). This is just a remark,
> that we need to attach this warning to every blog post/documentation/user
> mailing list response related to "Support Checkpoints After Tasks
> Finished". Also I would suggest to prioritize the follow up of supporting
> 2pc.
>
> Secondly, I think we are missing how difficult and problematic will be 2pc
> support with the final checkpoint.
>
> For starters, keep in mind that currently 2pc can be implemented by users
> using both `@Public` APIs as functions and `@PublicEvolving` operators in
> any place in the job graph. It's not limited to only the sinks. For
> example users could easily implement the `AsynFunction` (for
> `AsyncWaitOperator`) that is using 2pc based on the `CheckpointListener`
> interface. I'm not saying it's common, probably just a tiny minority of
> users are doing that (if any at all), but nevertheless that's possible and
> currently (implicitly?) supported in Flink.
>
> Next complication is the support of bounded streams (`BoundedOneInput` or
> `BoundedMultiInput` interfaces) and the closing/shutdown procedure of the
> operators. Currently it works as follows:
> 0. Task receives EndOfPartitionEvent (or source finishes)
> 1. `endOfInput` is called on the first operator in the chain
> 2. We quiesce the processing timers
> (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the first
> operator, so no new timers will be triggered
> 3. We wait for the already fired timers to finish executing (spinning
> mailbox loop)
> 4. We are closing the first operator
> 5. We go to the next (second) operator in the chain and repeat the steps 1.
> to 5.
>
> This is because operators can emit data after processing `endOfInput`, from
> timers, async mailbox actions and inside the `close` method itself.
>
> Now the problem is to support the final checkpoint with 2pc, we need
> trigger `snapshotState` and `notifyCheckpointComplete` call at the very
> least only after `endOfInput` call on the operator. Probably the best place
> would be in between steps 3. and 4. However that means, we would be forced
> to wait for steps 1. to 3. to finish, then wait for a next checkpoint to
> trigger AND complete, before finally closing the head operator, and only
> then we can start closing the next operator in the chain:
>
> 0. Task receives EndOfPartitionEvent (or source finishes)
> 1. `endOfInput` is called on the first operator in the chain
> 2. We quiesce the processing timers
> (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the first
> operator, so no new timers will be triggered
> 3. We wait for the already fired timers to finish executing (spinning
> mailbox loop)
> *3b. We wait for one more checkpoint to trigger and for the
> `notifyCheckpointComplete` RPC.*
> 4. We are closing the first operator
> 5. We go to the next (second) operator in the chain and repeat the steps 1.
> to 5.
>
> That means, we can close one operator per successful checkpoint. To close
> 10 operators, we would need 10 successful checkpoints.
>
> I was thinking about different approaches to this problem, and I couldn't
> find any viable ones. All I could think of would break the current
> `@Public` API and/or would be ugly/confusing for the users.
>
> For example a relatively simple solution, to introduce a `preClose` or
> `flush` method to the operators, with a contract that after
> `flush`, operators would be forbidden from emitting more records, so that
> we can replace step 4. with this `flush` call, and then having a single
> checkpoint to finish 2pc for all of the operators inside the chain, doesn't
> work. Sheer fact of adding this `flush` method and changing the contract
> would break the current API and Yun Gao has pointed out to me, that we
> either already support, or want to support operators that are emitting
> records from within the `notifyCheckpointComplete` call:
>
> > Yun Gao:
> > like with the new sink api there might be writer -> committer -> global
> committer, the committer would need to wait for the last checkpoint to
> commit
> > the last piece of data, and after that it also need to emit the list of
> transactions get committed to global committer to do some finalization
> logic.
>
> So it wouldn't solve the problem (at least not fully).
>
> I don't know if anyone has any better ideas how to solve this problem?
>
> Piotrek
>
> pt., 15 sty 2021 o 14:57 Yun Gao <yungao...@aliyun.com.invalid>
> napisaƂ(a):
>
> > Hi Aljoscha,
> >
> >     I think so since we seems to do not have other divergence and new
> > objections now. I'll open the vote then. Very thanks!
> >
> > Best,
> >  Yun
> >
> >
> > ------------------------------------------------------------------
> > From:Aljoscha Krettek <aljos...@apache.org>
> > Send Time:2021 Jan. 15 (Fri.) 21:24
> > To:dev <dev@flink.apache.org>
> > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
> >
> > Thanks for the summary! I think we can now move towards a [VOTE] thread,
> > right?
> >
> > On 2021/01/15 13:43, Yun Gao wrote:
> > >1) For the problem that the "new" root task coincidently finished
> > >before getting triggered successfully, we have listed two options in
> > >the FLIP-147[1], for the first version, now we are not tend to go with
> > >the first option that JM would re-compute and re-trigger new sources
> > >when it realized some tasks are not triggered successfully. This option
> > >would avoid the complexity of adding new PRC and duplicating task
> > >states, and in average case it would not cause too much overhead.
> >
> > You wrote "we are *not* tend to go with the first option", but I think
> > you meant wo write "we tend to *now* go with the first option", right?
> > That's also how it is in the FLIP, I just wanted to clarify for the
> > mailing list.
> >
> >
>

Reply via email to