Hi Yun,

4) Yes, the interaction is not trivial and also I have not completely
thought it through. But in general, I'm currently at the point where I
think that we also need non-checkpoint related events in unaligned
checkpoints. So just keep that in mind, that we might converge anyhow at
this point.

In general, what is helping in this case is to remember that there no
unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we
can completely ignore the problem on how to store and restore output
buffers of a completed task (also important for the next point).

5) I think we are on the same page and I completely agree that for the
MVP/first version, it's completely fine to start and immediately stop. A
tad better would be even to not even start the procession loop.

On Mon, Oct 12, 2020 at 6:18 PM Yun Gao <yungao...@aliyun.com> wrote:

> Hi Arvid,
>
> Very thanks for the insightful comments! I added the responses for this
> issue under the quota:
>
> >> 1) You call the tasks that get the barriers injected leaf nodes, which
> would make the > sinks the root nodes. That is very similar to how graphs
> in relational algebra are labeled. However, I got the feeling that in
> Flink, we rather iterate from sources to sink, making the sources root
> nodes and the sinks the leaf nodes. However, I have no clue how it's done
> in similar cases, so please take that hint cautiously.
>
> >> 2) I'd make the algorithm to find the subtasks iterative and react in
> CheckpointCoordinator. Let's assume that we inject the barrier at all root
> subtasks (initially all sources). So in the iterative algorithm, whenever
> root A finishes, it looks at all connected subtasks B if they have any
> upstream task left. If not B becomes a new root. That would require to only
> touch a part of the job graph, but would require some callback from
> JobManager to CheckpointCoordinator.
>
>
> I think I should have used a bad name of "leaf nodes", in fact I think we
> should have the same thoughts that we start with the source nodes to find
> all the nodes whose precedent nodes are all finished. It would be much
> better to call these nodes (which we would trigger) as "root nodes". I'll
> modify the FLIP to change the names to "root nodes".
>
> >> 2b) We also need to be careful for out-of-sync updates: if the root is
> about to finish, we could send the barrier to it from
> CheckpointCoordinator, but at the time it arrives, the subtask is finished
> already.
>
> Exactly. When the checkpoint triggers a task but found the task is not
> there, it may then further check if the task has been finished, if so, it
> should then re-check its descendants to see if there are new "root nodes"
> to trigger.
>
> >> 3) An implied change is that checkpoints are not aborted anymore at
> EndOfPartition, which is good, but might be explicitly added.
>
> Yes, currently barrier alignment would fail the current checkpoint on
> EndOfPartition, and we would modify the behavior.
>
> >> 4) The interaction between unaligned checkpoint and EndOfPartition is a
> bit ambiguous: What happens when an unaligned checkpoint is started and
> then one input channel contains the EndOfPartition event? From the written
> description, it sounds to me like, we move back to an aligned checkpoint
> for the whole receiving task. However, that is neither easily possible nor
> necessary. Imho it would be enough to also store the EndOfPartition in the
> channel state.
>
>
> Very thanks for the suggestions on this issue and in fact I did stuck on
> it for some time. Previously for me one implementation detail issue is that
> EndOfPartition seems not be able to overtake the previous buffers easily as
> CheckpointBarrier does, otherwise it might start destroying the input
> channels if all EndOfPartitions are received.
>
> Therefore, although we could also persistent the channels with
> EndOfPartition:
>
> 1. Start persisting the channels when CheckpointUnaligner received barrier
> (if not all precendant tasks are finished) or received triggering (if all
> precendant tasks are finished).
>
> 2. The persisting actually stops when onBuffer received EndOfPartition.
>
> After the last channel stopped persisting, CheckpointUnaligner still need
> to wait till all the previous buffers are processed before complete the
> allBarriersReceivedFuture. Therefore it would not be able to accelerate the
> checkpoint in this case.
>
> After some rethinking today currently I think we might inserts some
> additional virtual events into receivedBuffer when received EndOfPartition
> and allows these virtual events to overtake the previous buffers. I'll try
> to double check if it is feasible and let me know if there are also other
> solutions on this issue :).
>
> > 5) I'd expand the recovery section a bit. It would be the first time
> that we recover an incomplete DAG. Afaik the subtasks are deployed before
> the state is recovered, so at some point, the subtasks either need to be
> removed again or maybe we could even avoid them being created in the first
> place.
>
> I also agree that finally we should not "restarted" the finished tasks in
> some way. It seems not start it in the first place would be better. We
> should be able to bookkeep additional information in the checkpoint meta
> about which operators are fully finished, and the scheduler could restore
> the status of tasks on restoring from previous checkpoints. It would also
> requires some modification in the task side to support input channels that
> are finished on starting.
>
> But in the first version, I think we might simplify this issue by still
> restart all the tasks, but let the finished sources to exit directly? The
> new Source API would terminate directly since there is no pending splits
> and the legacy sources would be dealt specially by skipped execution if the
> source operator is fully finished before. We would be able to turn to the
> final solution gradually in the next steps.
>
>
> Best,
>
> Yun
>
> ------------------------------------------------------------------
> From:Arvid Heise <ar...@ververica.com>
> Send Time:2020 Oct. 12 (Mon.) 15:38
> To:Yun Gao <yungao...@aliyun.com>
> Cc:Flink Dev <d...@flink.apache.org>; User-Flink <user@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> Hi Yun,
>
> Thank you for starting the discussion. This will solve one of the
> long-standing issues [1] that confuse users. I'm also a big fan of option
> 3. It is also a bit closer to Chandy-Lamport again.
>
> A couple of comments:
>
> 1) You call the tasks that get the barriers injected leaf nodes, which
> would make the sinks the root nodes. That is very similar to how graphs in
> relational algebra are labeled. However, I got the feeling that in Flink,
> we rather iterate from sources to sink, making the sources root nodes and
> the sinks the leaf nodes. However, I have no clue how it's done in similar
> cases, so please take that hint cautiously.
> 2) I'd make the algorithm to find the subtasks iterative and react in
> CheckpointCoordinator. Let's assume that we inject the barrier at all root
> subtasks (initially all sources). So in the iterative algorithm, whenever
> root A finishes, it looks at all connected subtasks B if they have any
> upstream task left. If not B becomes a new root. That would require to only
> touch a part of the job graph, but would require some callback from
> JobManager to CheckpointCoordinator.
> 2b) We also need to be careful for out-of-sync updates: if the root is
> about to finish, we could send the barrier to it from
> CheckpointCoordinator, but at the time it arrives, the subtask is finished
> already.
> 3) An implied change is that checkpoints are not aborted anymore at 
> EndOfPartition,
> which is good, but might be explicitly added.
> 4) The interaction between unaligned checkpoint and EndOfPartition is a
> bit ambiguous: What happens when an unaligned checkpoint is started and
> then one input channel contains the EndOfPartition event? From the
> written description, it sounds to me like, we move back to an aligned
> checkpoint for the whole receiving task. However, that is neither easily
> possible nor necessary. Imho it would be enough to also store the 
> EndOfPartition
> in the channel state.
> 5) I'd expand the recovery section a bit. It would be the first time that
> we recover an incomplete DAG. Afaik the subtasks are deployed before the
> state is recovered, so at some point, the subtasks either need to be
> removed again or maybe we could even avoid them being created in the first
> place.
>
> [1] https://issues.apache.org/jira/browse/FLINK-2491
>
> On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <yungao...@aliyun.com> wrote:
> Hi, devs & users
>
> Very sorry for the spoiled formats, I resent the discussion as follows.
>
>
> As discussed in FLIP-131[1], Flink will make DataStream the unified API for 
> processing bounded and unbounded data in both streaming and blocking modes. 
> However, one long-standing problem for the streaming mode is that currently 
> Flink does not support checkpoints after some tasks finished, which causes 
> some problems for bounded or mixed jobs:
>         1.
> Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed 
> before committed to external systems in streaming mode. If sources are 
> bounded and checkpoints are disabled after some tasks are finished, the data 
> sent after the last checkpoint would always not be able to be committed. This 
> issue has already been reported some times in the user ML[2][3][4] and is 
> future brought up when working on FLIP-143: Unified Sink API [5].
>         2.
> The jobs with both bounded and unbounded sources might have to replay a large 
> amount of records after failover due to no periodic checkpoints are taken 
> after the bounded sources finished.
>
>
> Therefore, we propose to also support checkpoints after some tasks finished. 
> Your Could find more details in FLIP-147[6].
>
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
> [3]
> https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
> [4]
> https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
> [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> [6]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
> ------------------Original Mail ------------------
> *Sender:*Yun Gao <yungao...@aliyun.com.INVALID>
> *Send Date:*Fri Oct 9 14:16:52 2020
> *Recipients:*Flink Dev <d...@flink.apache.org>, User-Flink <
> user@flink.apache.org>
> *Subject:*[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
> Hi, devs & users
>
>
> As discussed in FLIP-131 [1], Flink will make DataStream the unified API for 
> processing bounded and unbounded data in both streaming and blocking modes. 
> However, one long-standing problem for the streaming mode is that currently 
> Flink does not support checkpoints after some tasks finished, which causes 
> some problems for bounded or mixed jobs:
>
> Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed 
> before committed to external systems in streaming mode. If sources are 
> bounded and checkpoints are disabled after some tasks are finished, the data 
> sent after the last checkpoint would always not be able to be committed. This 
> issue has already been reported some times in the user ML[2][3][4] and is 
> future brought up when working on FLIP-143: Unified Sink API [5].
>
> The jobs with both bounded and unbounded sources might have to replay a large 
> amount of records after failover due to no periodic checkpoints are taken 
> after the bounded sources finished.
>
> Therefore, we propose to also support checkpoints after some tasks finished. 
> Your Could find more details in FLIP-147[6].
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
> [3]
> https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
> [4]
> https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
> [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> [6]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to