OK, I think I get the point. But another question raises: how task managers merge their rocksdb snapshot on a global single path?
2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>: > Hi, > > The path you give to the constructor must be a path on some distributed > filesystem, otherwise the data will be lost when the local machine crashes. > As you mentioned correctly. > > RocksDB will keep files in a local directory (you can specify this using > setDbStoragePath()) and when checkpointing will write to the checkpoint > directory that you specified in the constructor. > > Best, > Aljoscha > > >> On 4. Jan 2018, at 14:23, Jinhua Luo <luajit...@gmail.com> wrote: >> >> I still do not understand the relationship between rocksdb backend and >> the filesystem (here I refer to any filesystem impl, including local, >> hdfs, s3). >> >> For example, when I specify the path to rocksdb backend: >> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp")); >> >> What does it mean? >> >> Each task manager would save states to /data1/flinkapp on its machine? >> But it seems no sense. Because when one of the machines crashes, the >> job manager could not access the states on dead machine. >> Or, each task manager creates rocksdb instance on temporary path, and >> send snapshots to job manager, then job manager in turn saves them on >> /data1/flinkapp on the job manager's machine? >> >> Could you give the data flow example? >> >> And another question is, when I turn off checkpointing (which is also >> default cfg), what happens to the states processing? >> >> >> >> 2018-01-03 0:06 GMT+08:00 Timo Walther <twal...@apache.org>: >>> Hi Jinhua, >>> >>> I will try to answer your questions: >>> >>> Flink checkpoints the state of each operator. For a Kafka consumer operator >>> this is only the offset. For other operators (such as Windows or a >>> ProcessFunction) the values/list/maps stored in the state are checkpointed. >>> If you are interested in the internals, I would recommend this page [1]. >>> Only the MemoryStateBackend sends entire states to the JobManager (see [2]). >>> But you are right, this is a bottleneck and not very fault-tolerant. >>> Usually, Flink assumes to have some distributed file system (such as HDFS) >>> to which each Flink operator can be checkpointed in a fault-tolerant way. >>> For the RocksDbStateBackend the local files are copied to HDFS as well. At >>> the time of writing, only the RocksDBBackend supports incremental >>> checkpoints. The JobManager can then read from HDFS and restore the operator >>> on a different machine. >>> >>> Feel free to ask further questions. >>> >>> Regards, >>> Timo >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html >>> >>> >>> >>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo: >>> >>>> Hi All, >>>> >>>> I have two questions: >>>> >>>> a) does the records/elements themselves would be checkpointed? or just >>>> record offset checkpointed? That is, what data included in the >>>> checkpoint except for states? >>>> >>>> b) where flink stores the state globally? so that the job manager >>>> could restore them on each task manger at failure restart. >>>> >>>> For the heap backend, all task managers would send states to job >>>> manager, and job manager would save it in its heap, correct? >>>> >>>> For the fs/rocksdb backend, all task managers would save states >>>> (incrementally or not) in local path temporarily, and send them (in >>>> rocksdb snapshot format for the rocksdb case?) to the job manager at >>>> checkpoint? >>>> >>>> The path we used to configure backend is the path on the job manager >>>> machine but not on the task managers' machines? So that's the >>>> bottleneck and single failure point? So it's better to use hdfs path >>>> so that we could scale the storage and make it high availability as >>>> well? >>>> >>>> Thank you all. >>> >>> >>> >