Hi Arvid,

Very thanks for the feedbacks! I'll try to answer the questions inline:

> I'm also concerned about the notion of a final checkpoint. What happens
> when this final checkpoint times out (checkpoint timeout > async timeout)
> or fails for a different reason? I'm currently more inclined to just let
> checkpoints work until the whole graph is completed (and thought this was
> the initial goal of the whole FLIP to being with).

I think we are still on the same page that we would like to trigger checkpoint 
periodically until the whole job is finished.
I think in generaly we do not must force the checkpoint aligned with subtask 
finished, namely for example one operator 
might have the lifecycle that "taking one checkpoint -> emit some records -> 
taking another checkpoint -> emit more records -> finish",
and do not need to must have to wait for one more checkpoint before finished. 
The second checkpoint just happens to be the "final" checkpoint of this 
operator. 
The only exception is that for sink operator that must wait for one more 
checkpoint to commit the last piece of data before finished, this kind of 
operators
would be dealt with separately to force them to wait for checkpont before 
finished.

> However, I have the impression that you think mostly in terms of tasks and
> I mostly think in terms of subtasks. I especially want to have proper
> support for bounded sources where one partition is much larger than the
>  other partitions (might be in conjunction with unbounded sources such that
> checkpointing is plausible to begin with). Hence, most of the subtasks are
> finished with one struggler remaining. In this case, the barriers are
> inserted now only in the struggling source subtask and potentially in any
> running downstream subtask.
> As far as I have understood, this would require barriers to be inserted
> downstream leading to similar race conditions.

I might not fully understand the issue, but I'd like to further detail
the expected process here: 

Source (subtask 0) ---+
                                     |
Source (subtask 1) ---+--> Async I/O (subtask 0) -> Sink (subtask 0).
                                    |
Source (subtask 2) ---+


The async I/O subtask would have three input channels. 

case 1) Support source subtask 0 and 1 are finished and the Async I/O would 
received EndOfPartition from the corresponding 
channels. Now we happen to trigger a checkpoint, we in the remaining execution 
graph, the subtask 2 is the "source" of the 
graph. Then we would trigger source subtask 2 to start the checkpoint, source 
subtask 2 takes snapshot and emit barriers to 
Async I/O sutask. Async I/O subtask would found that 2/3 of its input channels 
have received Eof and received barrier from 
the remaining channel, then it knows the barriers are aligned, then it takes 
the snapshot and emit the barrier to the sink subtasks.

case 2) Suppose the job continue to run and now source subtask 2 is also 
finished and now we are going to take another checkpoint, 
then we found that in the remaining execution graph the new "source" now is the 
Async I/O subtask. Then we would trigger this
Async I/O instead (this is different from the current implementation). The 
Async I/O received the trigger and take its snapshot and 
emit barrier to the following sink subtask. (Of couse here the Async I/O 
subtask should have some method to wait till it received EoF
from all the input channels before taking snapshot to keep consistent, but I 
think we could ignore the detail implementations first).

For the race condition, it might happen if 
a) in case 1, the CheckpontCoordinator trigger Source subtask 2, but source 
subtask 2 report finished before the trigger RPC gets into the resided 
TaskManager.
b) in case 2, the CheckpointCoordinator trigger Async I/O, but Async I/O 
subtask report finished before the trigger RPC gets into the resided 
TaskManager.

In this case, if we do not deal with specially, based on the current 
implementation, the trigger RPC would just be ignored, and the checkpoint would 
finally 
failed due to timeout since no tasks would report its state. But we would be 
able to remedy this checkpont: since the Source subtask 2 and the Async I/O
subtask would report FINISHED status to JobMaster after we tries to trigger the 
tasks, and before the task has reported its snapshot for this checkpont. 
The CheckpontCoordinator would listen to the notification, when it received the 
notification, it would iterates its pending checkpoints to see 
if it has trigger this task but received FINISHED before its snapshot. If so, 
it would recompute the subtasks to trigger, and re-trigger the following tasks. 
Of couse this is one possible implementation and we might have other solutions 
to this problem. Do you think the process would still have some problems ?

> However, that would
> require subtasks to stay alive until they receive checkpiontCompleted
> callback (which is currently also not guaranteed)

With the above process, I think the task would do not need to wait for 
receiving the checkpontCompleted callback? If it finished, the above process 
would try to trigger its following tasks. 

Best,
Yun








 ------------------Original Mail ------------------
Sender:Arvid Heise <ar...@ververica.com>
Send Date:Wed Jan 6 20:42:56 2021
Recipients:Aljoscha Krettek <aljos...@apache.org>
CC:dev <dev@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
I was actually not thinking about concurrent checkpoints (and actually want
to get rid of them once UC is established, since they are addressing the
same thing).

But your explanation definitely helped me to better understand the race
condition.

However, I have the impression that you think mostly in terms of tasks and
I mostly think in terms of subtasks. I especially want to have proper
support for bounded sources where one partition is much larger than the
other partitions (might be in conjunction with unbounded sources such that
checkpointing is plausible to begin with). Hence, most of the subtasks are
finished with one struggler remaining. In this case, the barriers are
inserted now only in the struggling source subtask and potentially in any
running downstream subtask.
As far as I have understood, this would require barriers to be inserted
downstream leading to similar race conditions.

I'm also concerned about the notion of a final checkpoint. What happens
when this final checkpoint times out (checkpoint timeout > async timeout)
or fails for a different reason? I'm currently more inclined to just let
checkpoints work until the whole graph is completed (and thought this was
the initial goal of the whole FLIP to being with). However, that would
require subtasks to stay alive until they receive checkpiontCompleted
callback (which is currently also not guaranteed)...

On Wed, Jan 6, 2021 at 12:17 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> On 2021/01/06 11:30, Arvid Heise wrote:
> >I'm assuming that this is the normal case. In a A->B graph, as soon as A
> >finishes, B still has a couple of input buffers to process. If you add
> >backpressure or longer pipelines into the mix, it's quite likely that a
> >checkpoint may occur with B being the head.
>
> Ahh, I think I know what you mean. This can happen when the checkpoint
> coordinator issues concurrent checkpoint without waiting for older ones
> to finish. My head is mostly operating under the premise that there is
> at most one concurrent checkpoint.
>
> In the current code base the race conditions that Yun and I are talking
> about cannot occur. Checkpoints can only be triggered at sources and
> they will then travel through the graph. Intermediate operators are
> never directly triggered from the JobManager/CheckpointCoordinator.
>
> When source start to shut down, the JM has to directly inject/trigger
> checkpoints at the now new "sources" of the graph, which have previously
> been intermediate operators.
>
> I want to repeat that I have a suspicion that maybe this is a degenerate
> case and we never want to allow operators to be doing checkpoints when
> they are not connected to at least one running source.  Which means that
> we have to find a solution for declined checkpoints, missing sources.
>
> I'll first show an example where I think we will never have intermediate
> operators running without the sources being running:
>
> Source -> Map -> Sink
>
> Here, when the Source does its final checkpoint and then shuts down,
> that same final checkpoint would travel downstream ahead of the EOF,
> which would in turn cause Map and Sink to also shut down. *We can't have
> the case that Map is still running when we want to take a checkpoint and
> Source is not running*.
>
> A similar case is this one:
>
> Source1 --+
>            |->Map -> Sink
> Source2 --+
>
> Here, if Source1 is finished but Source2 is not, Map is still connected
> to at least one upstream source that is still running. Again. Map would
> never be running and doing checkpoints if neither of Source1 or Source2
> are online.
>
> The cases I see where intermediate operators would keep running despite
> not being connected to any upstream operators are when we purposefully
> keep an operator online despite all inputs having seen EOF. One example
> is async I/O, another is what Yun mentioned where a sink might want to
> wait for another checkpoint to confirm some data. Example:
>
> Source -> Async I/O -> Sink
>
> Here, Async I/O will stay online as long as there are some scheduled
> requests outstanding, even when the Source has shut down. In those
> cases, the checkpoint coordinator would have to trigger new checkpoints
> at Async I/O and not Source, because it has become the new "head" of the
> graph.
>
> For Async I/O at least, we could say that the operator will wait for all
> outstanding requests to finish before it allows the final checkpoint and
> passes the barrier forward.
>
> Best,
> Aljoscha
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to