Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Yun Gao
Hi Arvid, Very thanks for the feedbacks! I'll try to answer the questions inline: > I'm also concerned about the notion of a final checkpoint. What happens > when this final checkpoint times out (checkpoint timeout > async timeout) > or fails for a different reason? I'm currently more inclined to

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Arvid Heise
Hi Yun, thanks for the detailed example. It feels like Aljoscha and you are also not fully aligned yet. For me, it sounded as if Aljoscha would like to avoid sending RPC to non-source subtasks. I think we are still on the same page that we would like to trigger > checkpoint periodically until the

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-06 Thread Yun Gao
Hi Arvid, Very thanks for the deep thoughts ! > If this somehow works, we would not need to change much in the checkpoint > coordinator. He would always inject into sources. We could also ignore the > race conditions as long as the TM lives. Checkpointing times are also not > worse as with the l

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Yun Gao
Hi Roman, Very thanks for the feedbacks! I'll try to answer the issues inline: > 1. Option 1 is said to be not preferable because it wastes resources and adds > complexity (new event). > However, the resources would be wasted for a relatively short time until the > job finishes completely.

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-10 Thread Khachatryan Roman
Thanks a lot for your answers Yun, > In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would > choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would kno

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Yun Gao
Hi Roman, Very thanks for the feedbacks ! > Probably it would be simpler to just decline the RPC-triggered checkpoint > if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY). > But I wonder how significantly this waiting f

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Khachatryan Roman
Hi Yun, > b) With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly. I think UC will be the common case with multiple sources each with DoP > 1. IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishe

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Yun Gao
Hi Roman, Very thanks for the feedbacks and suggestions! > I think UC will be the common case with multiple sources each with DoP > 1. > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. Yes, waiting for Eo

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-13 Thread Yun Gao
Hi all, I updated the FLIP[1] to reflect the major discussed points in the ML thread: 1) For the "new" root tasks finished before it received trigger message, previously we proposed to let JM re-compute and re-trigger the descendant tasks, but after the discussion we realized that it might

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-14 Thread Yun Gao
Hi all, We have some offline discussion together with @Arvid, @Roman and @Aljoscha and I'd like to post some points we discussed: 1) For the problem that the "new" root task coincidently finished before getting triggered successfully, we have listed two options in the FLIP-147[1], for the fi

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-24 Thread Yun Gao
Hi Till, Guowei, Very thanks for initiating the disucssion and the deep thoughts! For the notifyCheckpointComplete, I also agree we could try to avoid emitting new records in notifyCheckpointComplete via using OperatorCoordinator for new sink API. Besides, the hive sink might also need some modi

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-24 Thread Piotr Nowojski
Thanks for the reponses Guowei and Yun, Could you elaborate more/remind me, what does it mean to replace emitting results from the `notifyCheckpointComplete` with `OperatorCoordinator` approach? About the discussion in FLINK-21133 and how it relates to FLIP-147. You are right Yun gao, that in cas

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-24 Thread Kezhu Wang
Hi all, thanks for driving this and especially Piotr for re-active this thread. First, for `notifyCheckpointComplete`, I have strong preference towards "shut down the dataflow pipeline with one checkpoint in total", so I tend to option dropping "send records" from `notifyCheckpointComplete` for ne

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-09 Thread Yun Gao
Hi all, Very thanks for the warm discussions! Regarding the change in the operator lifecycle, I also agree with adding the flush/drain/stopAndFlush/finish method. For the duplication between this method and `endInput` for one input operator, with some offline disucssion with Dawid now I also thi

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-10 Thread Arvid Heise
The whole operator API is only for advanced users and is not marked Public(Evolving). Users have to accept that things change and we have to use that freedom that we don't have in many other parts of the system. The change needs to be very clear in the change notes though. I also don't expect many

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-25 Thread Yun Gao
Hi all, Very thanks for the discussions! A. Regarding how to avoid emitting records in notifyCheckpointComplete: Currently the structure of a new sink is writer -> committer -> global committer and the paralellism of global committer must be one. By design it would be used in several cases:

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-25 Thread Kezhu Wang
d not need to wait for other slow operators to exit. Best, Yun --Original Mail -- *Sender:*Kezhu Wang *Send Date:*Thu Feb 25 15:11:53 2021 *Recipients:*Flink Dev , Piotr Nowojski < piotr.nowoj...@gmail.com> *CC:*Guowei Ma , jingsongl...@gmail.com < jingsongl

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-27 Thread Till Rohrmann
otifyCheckpointComplete, stop-with-savepoint --drain could > be done with one savepoint, and for the normal exit, the operator would not > need to wait for other slow operators to exit. > > Best, > Yun > > > > --Original Mail -- > *Sender:*Ke

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Kezhu Wang
ith one savepoint, and for the normal exit, the operator would not > need to wait for other slow operators to exit. > > Best, > Yun > > > > --Original Mail -- > *Sender:*Kezhu Wang > *Send Date:*Thu Feb 25 15:11:53 2021 > *Recipients:*Fli

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Till Rohrmann
the operator, like in Kezhu's proposal the close() > happens > > at last, but it seems close() might also emit records ( > > so now the operator are closed with op1's close() -> op2's endOfInput() > -> > > op2's close() -> op3's endOfinp

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Kezhu Wang
pens > > at last, but it seems close() might also emit records ( > > so now the operator are closed with op1's close() -> op2's endOfInput() > -> > > op2's close() -> op3's endOfinput -> ...) ? > > > > And on the other side, as Kezhu has also p

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Yun Gao
- From:Kezhu Wang Send Time:2021 Mar. 1 (Mon.) 01:26 To:Till Rohrmann Cc:Piotr Nowojski ; Guowei Ma ; dev ; Yun Gao ; jingsongl...@gmail.com Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished In “stop-with-savepoint —drain”, MAX_WATERMARK is not an is

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-02-28 Thread Yun Gao
e of the operator, like in Kezhu's proposal the close() > happens > > at last, but it seems close() might also emit records ( > > so now the operator are closed with op1's close() -> op2's endOfInput() > -> > > op2's close() -> op3's endOfinput ->

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-02 Thread Piotr Nowojski
t; when the > job finish again, it would re-emit the MAX_WATERMARK? > > Best, > Yun > > > -------------------------- > From:Kezhu Wang > Send Time:2021 Mar. 1 (Mon.) 01:26 > To:Till Rohrmann > Cc:Piotr Nowojski ; Guowei Ma <

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-03 Thread Kezhu Wang
> Yun > > > ------------------ > From:Kezhu Wang > Send Time:2021 Mar. 1 (Mon.) 01:26 > To:Till Rohrmann > Cc:Piotr Nowojski ; Guowei Ma < > guowei@gmail.com>; dev ; Yun Gao < > yungao...@aliyun.com>; jin

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-04 Thread Piotr Nowojski
g > > that EndOfPartitionEvent > > is that > > 1. The EndOfPartitionEvent is currently emitted in Task instead of > > StreamTask, we would need some > > refactors here. > > 2. Currently the InputGate/InputChannel would be released after the > > downstream tasks have received > > EndOfPartitionEvent from all the input channels, thi

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-04 Thread Yun Gao
From:Piotr Nowojski Send Time:2021 Mar. 4 (Thu.) 17:16 To:Kezhu Wang Cc:dev ; Yun Gao ; jingsongl...@gmail.com ; Guowei Ma ; Till Rohrmann Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Kezhu, What do you mean by “end-flushing”? I was sugges

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-04 Thread Kezhu Wang
Task, we would need some > > refactors here. > > 2. Currently the InputGate/InputChannel would be released after the > > downstream tasks have received > > EndOfPartitionEvent from all the input channels, this would makes the > > following checkpoint unable to > >

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-04 Thread Piotr Nowojski
artitionEvent` from `Task` to `StreamTask`. This may >> > > have some interferences with BatchTask or network io stack. >> > > * Or introducing stream task level `EndOfUserRecordsEvent`(from >> PR#14831 >> > > @Yun @Piotr) >> > > * Or special

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-04 Thread Yun Gao
--- From:Piotr Nowojski Send Time:2021 Mar. 4 (Thu.) 22:56 To:Kezhu Wang Cc:Till Rohrmann ; Guowei Ma ; dev ; Yun Gao ; jingsongl...@gmail.com Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Yun and Kezhu

Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-03-29 Thread Till Rohrmann
ctly. Regarding the order, I would still tend to we support the > ordered case, since the sinks' implementation seem to depend > on this functionality. > > Best, > Yun > > -- > From:Piotr Nowojski > Se