Hi Yun, thanks for the detailed example. It feels like Aljoscha and you are also not fully aligned yet. For me, it sounded as if Aljoscha would like to avoid sending RPC to non-source subtasks.
I think we are still on the same page that we would like to trigger > checkpoint periodically until the whole job is finished. > I think in generaly we do not must force the checkpoint aligned with > subtask finished, namely for example one operator > might have the lifecycle that "taking one checkpoint -> emit some records > -> taking another checkpoint -> emit more records -> finish", > and do not need to must have to wait for one more checkpoint before > finished. The second checkpoint just happens to be the "final" checkpoint > of this operator. > The only exception is that for sink operator that must wait for one more > checkpoint to commit the last piece of data before finished, this kind of > operators > would be dealt with separately to force them to wait for checkpont before > finished > Yes that sounds good. I was concerned of any "holding back" of barriers in async I/O. I'd just hold back the EOP until all async threads finished and forward barriers in the normal way. That would then also be my solution for sinks - hold back EOP (=finish) until checkpoint is done. My concern here is still that we would need to have a reliable mechanism to notify checkpoint completed. Maybe we can use the GlobalCommitter? In this case, if we do not deal with specially, based on the current > implementation, the trigger RPC would just be ignored, and the checkpoint > would finally > failed due to timeout since no tasks would report its state. But we would > be able to remedy this checkpont: since the Source subtask 2 and the Async > I/O > subtask would report FINISHED status to JobMaster after we tries to > trigger the tasks, and before the task has reported its snapshot for this > checkpont. > The CheckpontCoordinator would listen to the notification, when it > received the notification, it would iterates its pending checkpoints to see > if it has trigger this task but received FINISHED before its snapshot. If > so, it would recompute the subtasks to trigger, and re-trigger the > following tasks. > Of couse this is one possible implementation and we might have other > solutions to this problem. Do you think the process would still have some > problems ? > Here I'm just concerned that we would overload JM. Especially if it's cascading: A is triggered in A->B->C but finishes, JM computes B and resends RPC but at that time B is also finished. Hence I was thinking of using TMs instead and only fall back to JM if TM has exited. On Wed, Jan 6, 2021 at 3:29 PM Yun Gao <yungao...@aliyun.com> wrote: > Hi Arvid, > > Very thanks for the feedbacks! I'll try to answer the questions inline: > > > I'm also concerned about the notion of a final checkpoint. What happens > > when this final checkpoint times out (checkpoint timeout > async timeout) > > or fails for a different reason? I'm currently more inclined to just let > > checkpoints work until the whole graph is completed (and thought this was > > the initial goal of the whole FLIP to being with). > > I think we are still on the same page that we would like to trigger > checkpoint periodically until the whole job is finished. > I think in generaly we do not must force the checkpoint aligned with > subtask finished, namely for example one operator > might have the lifecycle that "taking one checkpoint -> emit some records > -> taking another checkpoint -> emit more records -> finish", > and do not need to must have to wait for one more checkpoint before > finished. The second checkpoint just happens to be the "final" checkpoint > of this operator. > The only exception is that for sink operator that must wait for one more > checkpoint to commit the last piece of data before finished, this kind of > operators > would be dealt with separately to force them to wait for checkpont before > finished. > > > > However, I have the impression that you think mostly in terms of tasks and > > I mostly think in terms of subtasks. I especially want to have proper > > support for bounded sources where one partition is much larger than the > > > other partitions (might be in conjunction with unbounded sources such that > > > checkpointing is plausible to begin with). Hence, most of the subtasks are > > finished with one struggler remaining. In this case, the barriers are > > inserted now only in the struggling source subtask and potentially in any > > running downstream subtask. > > As far as I have understood, this would require barriers to be inserted > > downstream leading to similar race conditions. > > I might not fully understand the issue, but I'd like to further detail > the expected process here: > > Source (subtask 0) ---+ > | > Source (subtask 1) ---+--> Async I/O (subtask 0) -> Sink (subtask 0). > | > Source (subtask 2) ---+ > > > The async I/O subtask would have three input channels. > > case 1) Support source subtask 0 and 1 are finished and the Async I/O > would received EndOfPartition from the corresponding > channels. Now we happen to trigger a checkpoint, we in the remaining > execution graph, the subtask 2 is the "source" of the > graph. Then we would trigger source subtask 2 to start the checkpoint, > source subtask 2 takes snapshot and emit barriers to > Async I/O sutask. Async I/O subtask would found that 2/3 of its input > channels have received Eof and received barrier from > the remaining channel, then it knows the barriers are aligned, then it > takes the snapshot and emit the barrier to the sink subtasks. > > case 2) Suppose the job continue to run and now source subtask 2 is also > finished and now we are going to take another checkpoint, > then we found that in the remaining execution graph the new "source" now > is the Async I/O subtask. Then we would trigger this > Async I/O instead (this is different from the current implementation). > The Async I/O received the trigger and take its snapshot and > emit barrier to the following sink subtask. (Of couse here the Async I/O > subtask should have some method to wait till it received EoF > from all the input channels before taking snapshot to keep consistent, but > I think we could ignore the detail implementations first). > > For the race condition, it might happen if > a) in case 1, the CheckpontCoordinator trigger Source subtask 2, but > source subtask 2 report finished before the trigger RPC gets into the > resided TaskManager. > b) in case 2, the CheckpointCoordinator trigger Async I/O, but Async I/O > subtask report finished before the trigger RPC gets into the resided > TaskManager. > > In this case, if we do not deal with specially, based on the current > implementation, the trigger RPC would just be ignored, and the checkpoint > would finally > failed due to timeout since no tasks would report its state. But we would > be able to remedy this checkpont: since the Source subtask 2 and the Async > I/O > subtask would report FINISHED status to JobMaster after we tries to > trigger the tasks, and before the task has reported its snapshot for this > checkpont. > The CheckpontCoordinator would listen to the notification, when it > received the notification, it would iterates its pending checkpoints to see > if it has trigger this task but received FINISHED before its snapshot. If > so, it would recompute the subtasks to trigger, and re-trigger the > following tasks. > Of couse this is one possible implementation and we might have other > solutions to this problem. Do you think the process would still have some > problems ? > > > However, that would > > require subtasks to stay alive until they receive checkpiontCompleted > > callback (which is currently also not guaranteed) > > With the above process, I think the task would do not need to wait for > receiving the checkpontCompleted callback? If it finished, the above > process > would try to trigger its following tasks. > > Best, > Yun > > > > > > > > ------------------Original Mail ------------------ > *Sender:*Arvid Heise <ar...@ververica.com> > *Send Date:*Wed Jan 6 20:42:56 2021 > *Recipients:*Aljoscha Krettek <aljos...@apache.org> > *CC:*dev <dev@flink.apache.org> > *Subject:*Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished > >> >> I was actually not thinking about concurrent checkpoints (and actually want >> to get rid of them once UC is established, since they are addressing the >> same thing). >> >> But your explanation definitely helped me to better understand the race >> condition. >> >> However, I have the impression that you think mostly in terms of tasks and >> I mostly think in terms of subtasks. I especially want to have proper >> support for bounded sources where one partition is much larger than the >> other partitions (might be in conjunction with unbounded sources such that >> checkpointing is plausible to begin with). Hence, most of the subtasks are >> finished with one struggler remaining. In this case, the barriers are >> inserted now only in the struggling source subtask and potentially in any >> running downstream subtask. >> As far as I have understood, this would require barriers to be inserted >> downstream leading to similar race conditions. >> >> I'm also concerned about the notion of a final checkpoint. What happens >> when this final checkpoint times out (checkpoint timeout > async timeout) >> or fails for a different reason? I'm currently more inclined to just let >> checkpoints work until the whole graph is completed (and thought this was >> the initial goal of the whole FLIP to being with). However, that would >> require subtasks to stay alive until they receive checkpiontCompleted >> callback (which is currently also not guaranteed)... >> >> On Wed, Jan 6, 2021 at 12:17 PM Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >> > On 2021/01/06 11:30, Arvid Heise wrote: >> >> > >I'm assuming that this is the normal case. In a A->B graph, as soon as A >> > >finishes, B still has a couple of input buffers to process. If you add >> > >backpressure or longer pipelines into the mix, it's quite likely that a >> > >checkpoint may occur with B being the head. >> > >> > Ahh, I think I know what you mean. This can happen when the checkpoint >> > coordinator issues concurrent checkpoint without waiting for older ones >> > to finish. My head is mostly operating under the premise that there is >> > at most one concurrent checkpoint. >> > >> > In the current code base the race conditions that Yun and I are talking >> > about cannot occur. Checkpoints can only be triggered at sources and >> > they will then travel through the graph. Intermediate operators are >> > never directly triggered from the JobManager/CheckpointCoordinator. >> > >> > When source start to shut down, the JM has to directly inject/trigger >> > checkpoints at the now new "sources" of the graph, which have previously >> > been intermediate operators. >> > >> > I want to repeat that I have a suspicion that maybe this is a degenerate >> > case and we never want to allow operators to be doing checkpoints when >> > they are not connected to at least one running source. Which means that >> > we have to find a solution for declined checkpoints, missing sources. >> > >> > I'll first show an example where I think we will never have intermediate >> > operators running without the sources being running: >> > >> > Source -> Map -> Sink >> > >> > Here, when the Source does its final checkpoint and then shuts down, >> > that same final checkpoint would travel downstream ahead of the EOF, >> > which would in turn cause Map and Sink to also shut down. *We can't have >> > the case that Map is still running when we want to take a checkpoint and >> > Source is not running*. >> > >> > A similar case is this one: >> > >> > Source1 --+ >> > |->Map -> Sink >> > Source2 --+ >> > >> > Here, if Source1 is finished but Source2 is not, Map is still connected >> > to at least one upstream source that is still running. Again. Map would >> > never be running and doing checkpoints if neither of Source1 or Source2 >> > are online. >> > >> > The cases I see where intermediate operators would keep running despite >> > not being connected to any upstream operators are when we purposefully >> > keep an operator online despite all inputs having seen EOF. One example >> > is async I/O, another is what Yun mentioned where a sink might want to >> > wait for another checkpoint to confirm some data. Example: >> > >> > Source -> Async I/O -> Sink >> > >> > Here, Async I/O will stay online as long as there are some scheduled >> > requests outstanding, even when the Source has shut down. In those >> > cases, the checkpoint coordinator would have to trigger new checkpoints >> > at Async I/O and not Source, because it has become the new "head" of the >> > graph. >> > >> > For Async I/O at least, we could say that the operator will wait for all >> > outstanding requests to finish before it allows the final checkpoint and >> > passes the barrier forward. >> > >> > Best, >> > Aljoscha >> > >> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng