OK, I think I get the point.

But another question raises: how task managers merge their rocksdb
snapshot on a global single path?


2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:
> Hi,
>
> The path you give to the constructor must be a path on some distributed 
> filesystem, otherwise the data will be lost when the local machine crashes. 
> As you mentioned correctly.
>
> RocksDB will keep files in a local directory (you can specify this using 
> setDbStoragePath()) and when checkpointing will write to the checkpoint 
> directory that you specified in the constructor.
>
> Best,
> Aljoscha
>
>
>> On 4. Jan 2018, at 14:23, Jinhua Luo <luajit...@gmail.com> wrote:
>>
>> I still do not understand the relationship between rocksdb backend and
>> the filesystem (here I refer to any filesystem impl, including local,
>> hdfs, s3).
>>
>> For example, when I specify the path to rocksdb backend:
>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>
>> What does it mean?
>>
>> Each task manager would save states to /data1/flinkapp on its machine?
>> But it seems no sense. Because when one of the machines crashes, the
>> job manager could not access the states on dead machine.
>> Or, each task manager creates rocksdb instance on temporary path, and
>> send snapshots to job manager, then job manager in turn saves them on
>> /data1/flinkapp on the job manager's machine?
>>
>> Could you give the data flow example?
>>
>> And another question is, when I turn off checkpointing (which is also
>> default cfg), what happens to the states processing?
>>
>>
>>
>> 2018-01-03 0:06 GMT+08:00 Timo Walther <twal...@apache.org>:
>>> Hi Jinhua,
>>>
>>> I will try to answer your questions:
>>>
>>> Flink checkpoints the state of each operator. For a Kafka consumer operator
>>> this is only the offset. For other operators (such as Windows or a
>>> ProcessFunction) the values/list/maps stored in the state are checkpointed.
>>> If you are interested in the internals, I would recommend this page [1].
>>> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>>> the time of writing, only the RocksDBBackend supports incremental
>>> checkpoints. The JobManager can then read from HDFS and restore the operator
>>> on a different machine.
>>>
>>> Feel free to ask further questions.
>>>
>>> Regards,
>>> Timo
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>
>>>
>>>
>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>
>>>> Hi All,
>>>>
>>>> I have two questions:
>>>>
>>>> a) does the records/elements themselves would be checkpointed? or just
>>>> record offset checkpointed? That is, what data included in the
>>>> checkpoint except for states?
>>>>
>>>> b) where flink stores the state globally? so that the job manager
>>>> could restore them on each task manger at failure restart.
>>>>
>>>> For the heap backend, all task managers would send states to job
>>>> manager, and job manager would save it in its heap, correct?
>>>>
>>>> For the fs/rocksdb backend, all task managers would save states
>>>> (incrementally or not) in local path temporarily, and send them (in
>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>> checkpoint?
>>>>
>>>> The path we used to configure backend is the path on the job manager
>>>> machine but not on the task managers' machines? So that's the
>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>> so that we could scale the storage and make it high availability as
>>>> well?
>>>>
>>>> Thank you all.
>>>
>>>
>>>
>

Reply via email to