Hi Arvid,
Very thanks for the insightful comments! I added the responses for this issue 
under the quota: 
>> 1) You call the tasks that get the barriers injected leaf nodes, which would 
>> make the > sinks the root nodes. That is very similar to how graphs in 
>> relational algebra are labeled. However, I got the feeling that in Flink, we 
>> rather iterate from sources to sink, making the sources root nodes and the 
>> sinks the leaf nodes. However, I have no clue how it's done in similar 
>> cases, so please take that hint cautiously.
>> 2) I'd make the algorithm to find the subtasks iterative and react in 
>> CheckpointCoordinator. Let's assume that we inject the barrier at all root 
>> subtasks (initially all sources). So in the iterative algorithm, whenever 
>> root A finishes, it looks at all connected subtasks B if they have any 
>> upstream task left. If not B becomes a new root. That would require to only 
>> touch a part of the job graph, but would require some callback from 
>> JobManager to CheckpointCoordinator.

I think I should have used a bad name of "leaf nodes", in fact I think we 
should have the same thoughts that we start with the source nodes to find all 
the nodes whose precedent nodes are all finished. It would be much better to 
call these nodes (which we would trigger) as "root nodes". I'll modify the FLIP 
to change the names to "root nodes".
>> 2b) We also need to be careful for out-of-sync updates: if the root is about 
>> to finish, we could send the barrier to it from CheckpointCoordinator, but 
>> at the time it arrives, the subtask is finished already.
Exactly. When the checkpoint triggers a task but found the task is not there, 
it may then further check if the task has been finished, if so, it should then 
re-check its descendants to see if there are new "root nodes" to trigger.
>> 3) An implied change is that checkpoints are not aborted anymore at 
>> EndOfPartition, which is good, but might be explicitly added.
Yes, currently barrier alignment would fail the current checkpoint on 
EndOfPartition, and we would modify the behavior.
>> 4) The interaction between unaligned checkpoint and EndOfPartition is a bit 
>> ambiguous: What happens when an unaligned checkpoint is started and then one 
>> input channel contains the EndOfPartition event? From the written 
>> description, it sounds to me like, we move back to an aligned checkpoint for 
>> the whole receiving task. However, that is neither easily possible nor 
>> necessary. Imho it would be enough to also store the EndOfPartition in the 
>> channel state.

Very thanks for the suggestions on this issue and in fact I did stuck on it for 
some time. Previously for me one implementation detail issue is that 
EndOfPartition seems not be able to overtake the previous buffers easily as 
CheckpointBarrier does, otherwise it might start destroying the input channels 
if all EndOfPartitions are received.
Therefore, although we could also persistent the channels with EndOfPartition:
1. Start persisting the channels when CheckpointUnaligner received barrier (if 
not all precendant tasks are finished) or received triggering (if all 
precendant tasks are finished).
2. The persisting actually stops when onBuffer received EndOfPartition.
After the last channel stopped persisting, CheckpointUnaligner still need to 
wait till all the previous buffers are processed before complete the 
allBarriersReceivedFuture. Therefore it would not be able to accelerate the 
checkpoint in this case.
After some rethinking today currently I think we might inserts some additional 
virtual events into receivedBuffer when received EndOfPartition and allows 
these virtual events to overtake the previous buffers. I'll try to double check 
if it is feasible and let me know if there are also other solutions on this 
issue :). 
> 5) I'd expand the recovery section a bit. It would be the first time that we 
> recover an incomplete DAG. Afaik the subtasks are deployed before the state 
> is recovered, so at some point, the subtasks either need to be removed again 
> or maybe we could even avoid them being created in the first place.
I also agree that finally we should not "restarted" the finished tasks in some 
way. It seems not start it in the first place would be better. We should be 
able to bookkeep additional information in the checkpoint meta about which 
operators are fully finished, and the scheduler could restore the status of 
tasks on restoring from previous checkpoints. It would also requires some 
modification in the task side to support input channels that are finished on 
starting.
But in the first version, I think we might simplify this issue by still restart 
all the tasks, but let the finished sources to exit directly? The new Source 
API would terminate directly since there is no pending splits and the legacy 
sources would be dealt specially by skipped execution if the source operator is 
fully finished before. We would be able to turn to the final solution gradually 
in the next steps. 

Best,
Yun


------------------------------------------------------------------
From:Arvid Heise <ar...@ververica.com>
Send Time:2020 Oct. 12 (Mon.) 15:38
To:Yun Gao <yungao...@aliyun.com>
Cc:Flink Dev <d...@flink.apache.org>; User-Flink <user@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

Thank you for starting the discussion. This will solve one of the long-standing 
issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit 
closer to Chandy-Lamport again.

A couple of comments:

1) You call the tasks that get the barriers injected leaf nodes, which would 
make the sinks the root nodes. That is very similar to how graphs in relational 
algebra are labeled. However, I got the feeling that in Flink, we rather 
iterate from sources to sink, making the sources root nodes and the sinks the 
leaf nodes. However, I have no clue how it's done in similar cases, so please 
take that hint cautiously.
2) I'd make the algorithm to find the subtasks iterative and react in 
CheckpointCoordinator. Let's assume that we inject the barrier at all root 
subtasks (initially all sources). So in the iterative algorithm, whenever root 
A finishes, it looks at all connected subtasks B if they have any upstream task 
left. If not B becomes a new root. That would require to only touch a part of 
the job graph, but would require some callback from JobManager to 
CheckpointCoordinator. 
2b) We also need to be careful for out-of-sync updates: if the root is about to 
finish, we could send the barrier to it from CheckpointCoordinator, but at the 
time it arrives, the subtask is finished already.
3) An implied change is that checkpoints are not aborted anymore at 
EndOfPartition, which is good, but might be explicitly added.
4) The interaction between unaligned checkpoint and EndOfPartition is a bit 
ambiguous: What happens when an unaligned checkpoint is started and then one 
input channel contains the EndOfPartition event? From the written description, 
it sounds to me like, we move back to an aligned checkpoint for the whole 
receiving task. However, that is neither easily possible nor necessary. Imho it 
would be enough to also store the EndOfPartition in the channel state.
5) I'd expand the recovery section a bit. It would be the first time that we 
recover an incomplete DAG. Afaik the subtasks are deployed before the state is 
recovered, so at some point, the subtasks either need to be removed again or 
maybe we could even avoid them being created in the first place.

[1] https://issues.apache.org/jira/browse/FLINK-2491
On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <yungao...@aliyun.com> wrote:

Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for 
processing bounded and unbounded data in both streaming and blocking modes. 
However, one long-standing problem for the streaming mode is that currently 
Flink does not support checkpoints after some tasks finished, which causes some 
problems for bounded or mixed jobs:
        1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be 
replayed before committed to external systems in streaming mode. If sources are 
bounded and checkpoints are disabled after some tasks are finished, the data 
sent after the last checkpoint would always not be able to be committed. This 
issue has already been reported some times in the user ML[2][3][4] and is 
future brought up when working on FLIP-143: Unified Sink API [5]. 
        2. The jobs with both bounded and unbounded sources might have to 
replay a large amount of records after failover due to no periodic checkpoints 
are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. 
Your Could find more details in FLIP-147[6]. 

Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] 
https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] 
https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] 
https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
 ------------------Original Mail ------------------
Sender:Yun Gao <yungao...@aliyun.com.INVALID>
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev <d...@flink.apache.org>, User-Flink <user@flink.apache.org>
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for 
processing bounded and unbounded data in both streaming and blocking modes. 
However, one long-standing problem for the streaming mode is that currently 
Flink does not support checkpoints after some tasks finished, which causes some 
problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed 
before committed to external systems in streaming mode. If sources are bounded 
and checkpoints are disabled after some tasks are finished, the data sent after 
the last checkpoint would always not be able to be committed. This issue has 
already been reported some times in the user ML[2][3][4] and is future brought 
up when working on FLIP-143: Unified Sink API [5]. 
The jobs with both bounded and unbounded sources might have to replay a large 
amount of records after failover due to no periodic checkpoints are taken after 
the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. 
Your Could find more details in FLIP-147[6]. 
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] 
https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] 
https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] 
https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

-- 
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng

Reply via email to