Hi Arvid, Very thanks for the insightful comments! I added the responses for this issue under the quota: >> 1) You call the tasks that get the barriers injected leaf nodes, which would >> make the > sinks the root nodes. That is very similar to how graphs in >> relational algebra are labeled. However, I got the feeling that in Flink, we >> rather iterate from sources to sink, making the sources root nodes and the >> sinks the leaf nodes. However, I have no clue how it's done in similar >> cases, so please take that hint cautiously. >> 2) I'd make the algorithm to find the subtasks iterative and react in >> CheckpointCoordinator. Let's assume that we inject the barrier at all root >> subtasks (initially all sources). So in the iterative algorithm, whenever >> root A finishes, it looks at all connected subtasks B if they have any >> upstream task left. If not B becomes a new root. That would require to only >> touch a part of the job graph, but would require some callback from >> JobManager to CheckpointCoordinator.
I think I should have used a bad name of "leaf nodes", in fact I think we should have the same thoughts that we start with the source nodes to find all the nodes whose precedent nodes are all finished. It would be much better to call these nodes (which we would trigger) as "root nodes". I'll modify the FLIP to change the names to "root nodes". >> 2b) We also need to be careful for out-of-sync updates: if the root is about >> to finish, we could send the barrier to it from CheckpointCoordinator, but >> at the time it arrives, the subtask is finished already. Exactly. When the checkpoint triggers a task but found the task is not there, it may then further check if the task has been finished, if so, it should then re-check its descendants to see if there are new "root nodes" to trigger. >> 3) An implied change is that checkpoints are not aborted anymore at >> EndOfPartition, which is good, but might be explicitly added. Yes, currently barrier alignment would fail the current checkpoint on EndOfPartition, and we would modify the behavior. >> 4) The interaction between unaligned checkpoint and EndOfPartition is a bit >> ambiguous: What happens when an unaligned checkpoint is started and then one >> input channel contains the EndOfPartition event? From the written >> description, it sounds to me like, we move back to an aligned checkpoint for >> the whole receiving task. However, that is neither easily possible nor >> necessary. Imho it would be enough to also store the EndOfPartition in the >> channel state. Very thanks for the suggestions on this issue and in fact I did stuck on it for some time. Previously for me one implementation detail issue is that EndOfPartition seems not be able to overtake the previous buffers easily as CheckpointBarrier does, otherwise it might start destroying the input channels if all EndOfPartitions are received. Therefore, although we could also persistent the channels with EndOfPartition: 1. Start persisting the channels when CheckpointUnaligner received barrier (if not all precendant tasks are finished) or received triggering (if all precendant tasks are finished). 2. The persisting actually stops when onBuffer received EndOfPartition. After the last channel stopped persisting, CheckpointUnaligner still need to wait till all the previous buffers are processed before complete the allBarriersReceivedFuture. Therefore it would not be able to accelerate the checkpoint in this case. After some rethinking today currently I think we might inserts some additional virtual events into receivedBuffer when received EndOfPartition and allows these virtual events to overtake the previous buffers. I'll try to double check if it is feasible and let me know if there are also other solutions on this issue :). > 5) I'd expand the recovery section a bit. It would be the first time that we > recover an incomplete DAG. Afaik the subtasks are deployed before the state > is recovered, so at some point, the subtasks either need to be removed again > or maybe we could even avoid them being created in the first place. I also agree that finally we should not "restarted" the finished tasks in some way. It seems not start it in the first place would be better. We should be able to bookkeep additional information in the checkpoint meta about which operators are fully finished, and the scheduler could restore the status of tasks on restoring from previous checkpoints. It would also requires some modification in the task side to support input channels that are finished on starting. But in the first version, I think we might simplify this issue by still restart all the tasks, but let the finished sources to exit directly? The new Source API would terminate directly since there is no pending splits and the legacy sources would be dealt specially by skipped execution if the source operator is fully finished before. We would be able to turn to the final solution gradually in the next steps. Best, Yun ------------------------------------------------------------------ From:Arvid Heise <ar...@ververica.com> Send Time:2020 Oct. 12 (Mon.) 15:38 To:Yun Gao <yungao...@aliyun.com> Cc:Flink Dev <d...@flink.apache.org>; User-Flink <user@flink.apache.org> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Yun, Thank you for starting the discussion. This will solve one of the long-standing issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit closer to Chandy-Lamport again. A couple of comments: 1) You call the tasks that get the barriers injected leaf nodes, which would make the sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously. 2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator. 2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already. 3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added. 4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state. 5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place. [1] https://issues.apache.org/jira/browse/FLINK-2491 On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <yungao...@aliyun.com> wrote: Hi, devs & users Very sorry for the spoiled formats, I resent the discussion as follows. As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs: 1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished. Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. Best, Yun [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 [2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E [3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E [4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E [5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API [6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished ------------------Original Mail ------------------ Sender:Yun Gao <yungao...@aliyun.com.INVALID> Send Date:Fri Oct 9 14:16:52 2020 Recipients:Flink Dev <d...@flink.apache.org>, User-Flink <user@flink.apache.org> Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi, devs & users As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs: Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished. Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. Best, Yun [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 [2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E [3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E [4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E [5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API [6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany --Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng