[ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16233608#comment-16233608
 ] 

Stefan Richter edited comment on FLINK-7873 at 11/1/17 3:36 AM:
----------------------------------------------------------------

The first proposal of this JIRA had some issues. The current proposal is 
actually similar to something we are already working on, in its most simple 
incarnation.

While the suggested approach works, it still lacks some important points. For 
example, recovery needs to know the checkpoint ID and that the key-groups did 
not change, so that we can identify the correct local state to recover.

I would also make cleanup not depend on TTL but on notifications about new 
completed checkpoints and always keep the latest confirmed checkpoint as long 
as no newer checkpoint was confirmed.

There are still some problems, e.g. with cleanup under crashes, must be handled 
correctly. We also want to release local state when a job is canceled or 
stopped by the user. Another point that we want to address in the full 
implementation is at some point reporting local state to the job manager, so 
that we can make scheduling decisions based on state locality.

Then we also need to have proper ways to configure this local checkpointing, 
because for some cases it also introduces additional overhead of writing the 
data to local disk (e.g. RocksDB full checkpoint). For other types like RocksDB 
incremental checkpoints, this comes basically for free because there is already 
a local copy as part of an intermediate step. For the future there might also 
be other recovery modes that don't work from local disk, e.g. recovering memory 
state backends from memory objects (if the TM JVM did not crash) or supporting 
network mounted "local" disk for TMs.

First steps towards local recovery implementation are starting in this PR: 
https://github.com/apache/flink/pull/4745 and there is more to come soon.


was (Author: srichter):
The first proposal of this JIRA had some issues. The current proposal is 
actually similar to something we are already working on, in its most simple 
incarnation.

While the suggested approach works, it still lacks some important points. For 
example, recovery needs to know the checkpoint ID and that the key-groups did 
not change, so that we can identify the correct local state to recover.

I would also make cleanup not depend on TTL but on notifications about new 
completed checkpoints and always keep the latest confirmed checkpoint as long 
as no newer checkpoint was confirmed.

There are still some problems, e.g. with cleanup under crashes, must be handled 
correctly. Another point that we want to address in the full implementation is 
at some point reporting local state to the job manager, so that we can make 
scheduling decisions based on state locality.

Then we also need to have proper ways to configure this local checkpointing, 
because for some cases it also introduces additional overhead of writing the 
data to local disk (e.g. RocksDB full checkpoint). For other types like RocksDB 
incremental checkpoints, this comes basically for free because there is already 
a local copy as part of an intermediate step. For the future there might also 
be other recovery modes that don't work from local disk, e.g. recovering memory 
state backends from memory objects (if the TM JVM did not crash) or supporting 
network mounted "local" disk for TMs.

First steps towards local recovery implementation are starting in this PR: 
https://github.com/apache/flink/pull/4745 and there is more to come soon.

> Introduce CheckpointCacheManager for reading checkpoint data locally when 
> performing failover
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7873
>                 URL: https://issues.apache.org/jira/browse/FLINK-7873
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.2
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>            Priority: Major
>
> Why i introduce this:
>     Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwith, and obtain a faster 
> recover.
> Solution:
>     TaskManager do the cache job and manage the cached data itself. It simple 
> use a TTL-like method to manage cache entry's dispose, we dispose a entry if 
> it wasn't be touched for a X time, once we touch a entry we reset the TTL for 
> it. In this way, all jobs is done by TaskManager, it transparent to 
> JobManager. The only problem is that we may dispose a entry that maybe 
> useful, in this case, we have to read from remote data finally, but users can 
> avoid this by set a proper TTL value according to checkpoint interval and 
> other things.
> Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to