Thanks Aljoscha for the answer above. I am experimenting with savepoints and checkpoints on my end, so that we built fault tolerant application with exactly once semantics.
I have been able to test various scenarios, but have doubts about one use case. My app is running on an emr cluster, and I am trying to test the case when a emr cluster is terminated. I have read that *state.checkpoints.dir *is responsible for storing metadata information, and links to data files in *state.backend.fs.checkpointdir.* For my application I have configured both *state.backend.fs.checkpointdir* and *state.checkpoints.dir* Also I have the following in my main app: env.enableCheckpointing(CHECKPOINT_TIME_MS) val CHECKPOINT_LOCATION = s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb" val backend:RocksDBStateBackend = new RocksDBStateBackend(CHECKPOINT_LOCATION) env.setStateBackend(backend) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE) env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS) env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT) In the application startup logs I can see *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values being loaded. However when the checkpoint happens I dont see any content in the metadata dir. Is there something I am missing? Please let me know. I am using flink version 1.3 Thanks, Vipul On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > Flink does not rely on file system operations to list contents, all > necessary file paths are stored in the meta data file, as you guessed. This > is the reason savepoints also work with file systems that "only" have > read-after-write consistency. > > Best, > Aljoscha > > > On 10. Oct 2017, at 03:01, vipul singh <neoea...@gmail.com> wrote: > > Thanks Stefan for the answers above. These are really helpful. > > I have a few followup questions: > > 1. I see my savepoints are created in a folder, which has a _metadata > file and another file. Looking at the code > > <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191> > it seems like the metadata file contains tasks states, operator state > and master states > > <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>. > What is the purpose of the other file in the savepoint folder? My guess is > it should be a checkpoint file? > 2. I am planning to use s3 as my state backend, so want to ensure that > application restarts are not affected by read-after-write consistency of > s3( if I use s3 as a savepoint backend). I am curious how flink restores > data from the _metadata file, and the other file? Does the _metadata file > contain path to these other files? or would it do a listing on the s3 > folder? > > > Please let me know, > > Thanks, > Vipul > > On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter < > s.rich...@data-artisans.com> wrote: > >> Hi, >> >> I have answered your questions inline: >> >> >> 1. It seems to me that checkpoints can be treated as flink internal >> recovery mechanism, and savepoints act more as user-defined recovery >> points. Would that be a correct assumption? >> >> You could see it that way, but I would describe savepoints more as >> user-defined *restart* points than *recovery* points. Please take a look at >> my answers in this thread, because they cover most of your question: >> >> http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html . >> >> >> 1. While cancelling an application with -s option, it specifies the >> savepoint location. Is there a way during application startup to identify >> the last know savepoint from a folder by itself, and restart from there. >> Since I am saving my savepoints on s3, I want to avoid issues arising from >> *ls* command on s3 due to read-after-write consistency of s3. >> >> I don’t think that this feature exists, you have to specify the savepoint. >> >> >> 1. Suppose my application has a checkpoint at point t1, and say i >> cancel this application sometime in future before the next available >> checkpoint( say t1+x). If I start the application without specifying the >> savepoint, it will start from the last known checkpoint(at t1), which wont >> have the application state saved, since I had cancelled the application. >> Would this is a correct assumption? >> >> If you restart a canceled application it will not consider checkpoints. >> They are only considered in recovery on failure. You need to specify a >> savepoint or externalized checkpoint for restarts to make explicit that you >> intend to restart a job, and not to run a new instance of the job. >> >> >> 1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be >> same as manually saving regular savepoints? >> >> Not the same, because checkpoints and savepoints are different in certain >> aspects, but both methods leave you with something that survives job >> cancelation and can be used to restart from a certain state. >> >> Best, >> Stefan >> >> > > > -- > Thanks, > Vipul > > > -- Thanks, Vipul