Hi Till,
Very thanks for the feedbacks !
> 1) When restarting all tasks independent of the status at checkpoint time 
> (finished, running, scheduled), we might allocate more resources than we 
> actually need to run the remaining job. From a scheduling perspective it 
> would be easier if we already know that certain subtasks don't need to be 
> rescheduled. I believe this can be an optimization, though.
> 2) In the section Compatibility, Deprecation and Migration Plan you mentioned 
> that you want to record operators in the CompletedCheckpoint which are fully 
> finished. How will this information be used for constructing a recovered 
> ExecutionGraph? Why wouldn't the same principle work for the task level?

I think the first two issues should be related. The main reason that with 
external checkpoints the checkpoint might taken from one job and used in 
another jobs, but we do not have a unique ID to match tasks across jobs. 
Furthermore, users may also change the parallelism of JobVertex, or even modify 
the graph structures by adding/removing operators or changing the chain 
relationship between operators. 
On the other side, currently Flink already provides custom UID for operators, 
which makes the operators a stable unit for recovery. The current checkpoints 
are also organized in the unit of operators to support rescale and job 
Upgrading. 
When restarting from a checkpoint with finished operators, we could only starts 
the tasks with operators that are not fully finished (namely some subtasks are 
still running when taking checkpoints). Then during the execution of a single 
task, we only initialize/open/run/close the operators not fully finished. The 
Scheduler should be able to compute if a tasks contains not fully finished 
operators with the current JobGraph and the operator finish states restored 
from the checkpoints.

> 3) How will checkpointing work together with fully bounded jobs and FLIP-1 
> (fine grained recovery)?
Currently I think it should be compatible with fully bounded jobs and FLIP-1 
since it could be viewed as a completion of the current checkpoint mechanism. 
Concretely
1. The batch job (with blocking execution mode) should be not affected since 
checkpoints are not enabled in this case.
2. The bounded job running with pipeline mode would be also supported with 
checkpoints during it is finishing with the modification. As discussed in the 
FLIP it should not affect the current behavior after restored for almost all 
the jobs.
3. The region failover and more fine-grained tasks should also not be affected: 
similar to the previous behavior, after failover, the failover policy 
(full/region/fine-grained) decides which tasks to restart and the checkpoint 
only decides what state are restored for these tasks. The only difference with 
this modification is that these tasks are now might restored from a checkpoints 
taken after some tasks are finished. Since the perviously finished tasks would 
always be skipped by not started or run an empty execution, and the behavior of 
the previously running tasks should keeps unchanged, the overall behavior 
should be not affected.


Best,
Yun


------------------------------------------------------------------
From:Till Rohrmann <trohrm...@apache.org>
Send Time:2020 Oct. 13 (Tue.) 17:25
To:Yun Gao <yungao...@aliyun.com>
Cc:Arvid Heise <ar...@ververica.com>; Flink Dev <d...@flink.apache.org>; 
User-Flink <user@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for starting this discussion Yun Gao,

I have three comments/questions:

1) When restarting all tasks independent of the status at checkpoint time 
(finished, running, scheduled), we might allocate more resources than we 
actually need to run the remaining job. From a scheduling perspective it would 
be easier if we already know that certain subtasks don't need to be 
rescheduled. I believe this can be an optimization, though.

2) In the section Compatibility, Deprecation and Migration Plan you mentioned 
that you want to record operators in the CompletedCheckpoint which are fully 
finished. How will this information be used for constructing a recovered 
ExecutionGraph? Why wouldn't the same principle work for the task level?

3) How will checkpointing work together with fully bounded jobs and FLIP-1 
(fine grained recovery)?

Cheers,
Till
On Tue, Oct 13, 2020 at 9:30 AM Yun Gao <yungao...@aliyun.com> wrote:

Hi Arvid,
Very thanks for the comments!
>>> 4) Yes, the interaction is not trivial and also I have not completely 
>>> thought it through. But in general, I'm currently at the point where I 
>>> think that we also need non-checkpoint related events in unaligned 
>>> checkpoints. So just keep that in mind, that we might converge anyhow at 
>>> this point.
I also agree with that it would be better to keep the unaligned checkpoints 
behavior on EndOfPartition, I will then double check on this issue again. 

>>> In general, what is helping in this case is to remember that there no 
>>> unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we 
>>> can completely ignore the problem on how to store and restore output 
>>> buffers of a completed task (also important for the next point).
Exactly, we should not need to persist the output buffers for the completed 
tasks, and that would simply the implementation a lot.

>>> 5) I think we are on the same page and I completely agree that for the 
>>> MVP/first version, it's completely fine to start and immediately stop. A 
>>> tad better would be even to not even start the procession loop. 
I also agree with this part. We would keep optimizing the implementation after 
the first version. 

Best,
Yun   

Reply via email to