Thanks Aljoscha for the answer above.

I am experimenting with savepoints and checkpoints on my end, so that we
built fault tolerant application with exactly once semantics.

I have been able to test various scenarios, but have doubts about one use
case.

My app is running on an emr cluster, and I am trying to test the case when
a emr cluster is terminated. I have read that *state.checkpoints.dir *is
responsible for storing metadata information, and links to data files in
*state.backend.fs.checkpointdir.*

For my application I have configured both
*state.backend.fs.checkpointdir* and *state.checkpoints.dir*

Also I have the following in my main app:

env.enableCheckpointing(CHECKPOINT_TIME_MS)

val CHECKPOINT_LOCATION =
s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"

val backend:RocksDBStateBackend =
  new RocksDBStateBackend(CHECKPOINT_LOCATION)

env.setStateBackend(backend)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)


In the application startup logs I can see *state.backend.fs.checkpointdir*
and *state.checkpoints.dir, *values being loaded. However when the
checkpoint happens I dont see any content in the metadata dir. Is there
something I am missing? Please let me know. I am using flink version 1.3

Thanks,
Vipul



On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> Flink does not rely on file system operations to list contents, all
> necessary file paths are stored in the meta data file, as you guessed. This
> is the reason savepoints also work with file systems that "only" have
> read-after-write consistency.
>
> Best,
> Aljoscha
>
>
> On 10. Oct 2017, at 03:01, vipul singh <neoea...@gmail.com> wrote:
>
> Thanks Stefan for the answers above. These are really helpful.
>
> I have a few followup questions:
>
>    1. I see my savepoints are created in a folder, which has a _metadata
>    file and another file. Looking at the code
>    
> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191>
>    it seems like the metadata file contains tasks states, operator state
>    and master states
>    
> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>.
>    What is the purpose of the other file in the savepoint folder? My guess is
>    it should be a checkpoint file?
>    2. I am planning to use s3 as my state backend, so want to ensure that
>    application restarts are not affected by read-after-write consistency of
>    s3( if I use s3 as a savepoint backend). I am curious how flink restores
>    data from the _metadata file, and the other file? Does the _metadata file
>    contain path to these other files? or would it do a listing on the s3
>    folder?
>
>
> Please let me know,
>
> Thanks,
> Vipul
>
> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> I have answered your questions inline:
>>
>>
>>    1. It seems to me that checkpoints can be treated as flink internal
>>    recovery mechanism, and savepoints act more as user-defined recovery
>>    points. Would that be a correct assumption?
>>
>> You could see it that way, but I would describe savepoints more as
>> user-defined *restart* points than *recovery* points. Please take a look at
>> my answers in this thread, because they cover most of your question:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html .
>>
>>
>>    1. While cancelling an application with -s option, it specifies the
>>    savepoint location. Is there a way during application startup to identify
>>    the last know savepoint from a folder by itself, and restart from there.
>>    Since I am saving my savepoints on s3, I want to avoid issues arising from
>>    *ls* command on s3 due to read-after-write consistency of s3.
>>
>> I don’t think that this feature exists, you have to specify the savepoint.
>>
>>
>>    1. Suppose my application has a checkpoint at point t1, and say i
>>    cancel this application sometime in future before the next available
>>    checkpoint( say t1+x). If I start the application without specifying the
>>    savepoint, it will start from the last known checkpoint(at t1), which wont
>>    have the application state saved, since I had cancelled the application.
>>    Would this is a correct assumption?
>>
>> If you restart a canceled application it will not consider checkpoints.
>> They are only considered in recovery on failure. You need to specify a
>> savepoint or externalized checkpoint for restarts to make explicit that you
>> intend to restart a job, and not to run a new instance of the job.
>>
>>
>>    1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be
>>    same as manually saving regular savepoints?
>>
>> Not the same, because checkpoints and savepoints are different in certain
>> aspects, but both methods leave you with something that survives job
>> cancelation and can be used to restart from a certain state.
>>
>> Best,
>> Stefan
>>
>>
>
>
> --
> Thanks,
> Vipul
>
>
>


-- 
Thanks,
Vipul

Reply via email to