What I am interested in is whether I should use rocksDB to replace
fileBackend.
RocksDB's performance is not good, while it's state size can be very large.
Currently, my job's state is about 10GB, and I use 10 TaskManagers in
different machines, each 100G memory. I do not think I should use rocksDB,
is it right?

yidan zhao <hinobl...@gmail.com> 于2021年2月9日周二 下午3:50写道:

> I have a related question.
> Since fileStateBackend uses heap as the state storage and the checkpoint
> is finally stored in the filesystem, so whether the JobManager/TaskManager
> memory will limit the state size? The state size is limited by TM's memory
> * number of TMs? or limited by JM's memory.
>
>
> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2021年2月8日周一 下午6:05写道:
>
>> Hi,
>>
>> I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the
>> value on update.
>> As for "value()", it may (de)serialize it and return a copy if there is
>> an ongoing async snapshot in progress (to protect from modifications). This
>> shouldn't happen often though.
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Feb 8, 2021 at 3:24 AM Yun Tang <myas...@live.com> wrote:
>>
>>> Hi,
>>>
>>> MemoryStateBackend and FsStateBackend both hold keyed state in
>>> HeapKeyedStateBackend [1], and the main structure to store data is
>>> StateTable [2] which holds POJO format objects. That is to say, the object
>>> would not be serialized when calling update().
>>> On the other hand, RocksDB statebackend would store value with
>>> serialized bytes.
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
>>> [2]
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>>>
>>> Best
>>> Yun Tang
>>>
>>> ------------------------------
>>> *From:* Colletta, Edward <edward.colle...@fmr.com>
>>> *Sent:* Sunday, February 7, 2021 19:53
>>> *To:* user@flink.apache.org <user@flink.apache.org>
>>> *Subject:* question on ValueState
>>>
>>>
>>> Using FsStateBackend.
>>>
>>>
>>>
>>> I was under the impression that ValueState.value will serialize an
>>> object which is stored in the local state backend, copy the serialized
>>> object and deserializes it.  Likewise update() would do the same steps
>>> copying the object back to local state backend.    And as a consequence,
>>> storing collections in ValueState is much less efficient than using
>>> ListState or MapState if possible.
>>>
>>>
>>>
>>> However, I am looking at some code I wrote a while ago which made the
>>> assumption that the value() method just returned a reference to the
>>> object.  The code only calls update() when creating the object if value()
>>> returns null.    Yet the code works, all changes to the object stored in
>>> state are visible the next time value() is called.   I have some sample
>>> code below.
>>>
>>>
>>>
>>> Can someone clarify what really happens when value() is called?
>>>
>>>
>>>
>>>
>>>
>>>    public void processElement(M in, Context ctx, Collector<Long> out)
>>> throws Exception {
>>>
>>>         MyWindow myWindow;
>>>
>>>         myWindow = windowState.value();
>>>
>>>         if (myWindow == null) {
>>>
>>>
>>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>>> + interval) / interval) * interval);
>>>
>>>             myWindow = new MyWindow(0L, slide, windowSize);
>>>
>>>             windowState.update(myWindow);
>>>
>>>             myWindow.eq.add(0L);
>>>
>>>         }
>>>
>>>
>>> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator()
>>> + in.value);
>>>
>>>     }
>>>
>>>
>>>
>>>     @Override
>>>
>>>     public void onTimer(long timestamp, OnTimerContext ctx,
>>> Collector<Long> out) throws Exception {
>>>
>>>
>>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>>> + interval) / interval) * interval);
>>>
>>>         MyWindow myWindow = windowState.value();
>>>
>>>         myWindow.slide(0L);
>>>
>>>         out.collect(myWindow.globalAccum);
>>>
>>>     }
>>>
>>>
>>>
>>>
>>>
>>

Reply via email to