[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16268724#comment-16268724 ]
ASF GitHub Bot commented on FLINK-7873: --------------------------------------- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153492082 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java --- @@ -510,6 +512,13 @@ private static void serializeStreamStateHandle( byte[] internalData = byteStreamStateHandle.getData(); dos.writeInt(internalData.length); dos.write(byteStreamStateHandle.getData()); + } else if (stateHandle instanceof CachedStreamStateHandle) { --- End diff -- @StefanRRichter Thanks very much for reviewing my code and Tanks very much for your so detailed expression of your opinion, very happy can be similar to what you think in some places, there are two things I want to explain a bit: 1. About the 1:1 relationship between remote handle and local handle, In fact, I think each local state handle corresponds to a smallest storage unit of a checkpoint. For example, each Backend will generates a `IncrementalKeyedStateHandle` for every increment checkpoint, but `IncrementalKeyedStateHandle` is a composite handle, it contains a collect of sub StateHandle to stores data (meta & sst & misc), in this case the sub StateHanlde is the smallest storage unit and each of them have 1:1 relationship with local state handle and `IncrementalKeyedStateHandle` has 1:N relationship with local state handle(Now, CheckpointStateOutputStream.closeAndGet () returns a remote handle, which I viewed as the smallest storage unit). For incremental checkpoint, it can be optimized indeed, we can provide a green path for it to put cache entry into checkpoint cache, it doesn't need to write data locally when Transmitting data to remote end. I didn't do that because I wanted to provide a unified way to meet up all Backends requirements and I didn't want to change the code of Backend so much. 2. The local handle can be not only a local file, it can also be stored in memory, or other storage medium, or even just a mock (it may apply to CopyOnWriteStateTableSnapshot's problem describe above) as long as inherit CachedStateHandle and implement corresponding classes. IMO map local state to checkpoint id can also work, but I have ome minor questions about that: 1. Can we provide a unified local state way to meet all of the current state backend requirements (of course, the RocksDB can be optimized)? 2. Since the local state is mapped according to the checkpoint id, the key range detection needs to be performed locally again, which is a bit repetitive, can this be avoided with the work on JM. Although I've expressed my ideas, but I think you are more professional than me in this area and your thought should be better than mine. So if you have any planned issues, I would like to close this PR and turn to work on your planned issues, it seems that even thought this PR has some ideas which are similar to yours, but it seem not the base version you expected. But currently, we will still use this version of local checkpoint (it still need addressed some problem as your comments) for production, because the flink 1.4 does not have this feature and we need it very much (Our state size is very huge), With 1.5 release, we will switch and use the community version. > Introduce CheckpointCacheManager for reading checkpoint data locally when > performing failover > --------------------------------------------------------------------------------------------- > > Key: FLINK-7873 > URL: https://issues.apache.org/jira/browse/FLINK-7873 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing > Affects Versions: 1.3.2 > Reporter: Sihua Zhou > Assignee: Sihua Zhou > > Why i introduce this: > Current recover strategy will always read checkpoint data from remote > FileStream (HDFS). This will cost a lot of bandwidth when the state is so big > (e.g. 1T). What's worse, if this job performs recover again and again, it can > eat up all network bandwidth and do a huge hurt to cluster. So, I proposed > that we can cache the checkpoint data locally, and read checkpoint data from > local cache as well as we can, we read the data from remote only if we fail > locally. The advantage is that if a execution is assigned to the same > TaskManager as before, it can save a lot of bandwith, and obtain a faster > recover. > Solution: > TaskManager do the cache job and manage the cached data itself. It simple > use a TTL-like method to manage cache entry's dispose, we dispose a entry if > it wasn't be touched for a X time, once we touch a entry we reset the TTL for > it. In this way, all jobs is done by TaskManager, it transparent to > JobManager. The only problem is that we may dispose a entry that maybe > useful, in this case, we have to read from remote data finally, but users can > avoid this by set a proper TTL value according to checkpoint interval and > other things. > Can someone give me some advice? I would appreciate it very much~ -- This message was sent by Atlassian JIRA (v6.4.14#64029)