I have a related question.
Since fileStateBackend uses heap as the state storage and the checkpoint is
finally stored in the filesystem, so whether the JobManager/TaskManager
memory will limit the state size? The state size is limited by TM's memory
* number of TMs? or limited by JM's memory.


Khachatryan Roman <khachatryan.ro...@gmail.com> 于2021年2月8日周一 下午6:05写道:

> Hi,
>
> I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the
> value on update.
> As for "value()", it may (de)serialize it and return a copy if there is an
> ongoing async snapshot in progress (to protect from modifications). This
> shouldn't happen often though.
>
> Regards,
> Roman
>
>
> On Mon, Feb 8, 2021 at 3:24 AM Yun Tang <myas...@live.com> wrote:
>
>> Hi,
>>
>> MemoryStateBackend and FsStateBackend both hold keyed state in
>> HeapKeyedStateBackend [1], and the main structure to store data is
>> StateTable [2] which holds POJO format objects. That is to say, the object
>> would not be serialized when calling update().
>> On the other hand, RocksDB statebackend would store value with serialized
>> bytes.
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
>> [2]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>>
>> Best
>> Yun Tang
>>
>> ------------------------------
>> *From:* Colletta, Edward <edward.colle...@fmr.com>
>> *Sent:* Sunday, February 7, 2021 19:53
>> *To:* user@flink.apache.org <user@flink.apache.org>
>> *Subject:* question on ValueState
>>
>>
>> Using FsStateBackend.
>>
>>
>>
>> I was under the impression that ValueState.value will serialize an object
>> which is stored in the local state backend, copy the serialized object and
>> deserializes it.  Likewise update() would do the same steps copying the
>> object back to local state backend.    And as a consequence, storing
>> collections in ValueState is much less efficient than using ListState or
>> MapState if possible.
>>
>>
>>
>> However, I am looking at some code I wrote a while ago which made the
>> assumption that the value() method just returned a reference to the
>> object.  The code only calls update() when creating the object if value()
>> returns null.    Yet the code works, all changes to the object stored in
>> state are visible the next time value() is called.   I have some sample
>> code below.
>>
>>
>>
>> Can someone clarify what really happens when value() is called?
>>
>>
>>
>>
>>
>>    public void processElement(M in, Context ctx, Collector<Long> out)
>> throws Exception {
>>
>>         MyWindow myWindow;
>>
>>         myWindow = windowState.value();
>>
>>         if (myWindow == null) {
>>
>>
>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>> + interval) / interval) * interval);
>>
>>             myWindow = new MyWindow(0L, slide, windowSize);
>>
>>             windowState.update(myWindow);
>>
>>             myWindow.eq.add(0L);
>>
>>         }
>>
>>
>> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator()
>> + in.value);
>>
>>     }
>>
>>
>>
>>     @Override
>>
>>     public void onTimer(long timestamp, OnTimerContext ctx,
>> Collector<Long> out) throws Exception {
>>
>>
>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>> + interval) / interval) * interval);
>>
>>         MyWindow myWindow = windowState.value();
>>
>>         myWindow.slide(0L);
>>
>>         out.collect(myWindow.globalAccum);
>>
>>     }
>>
>>
>>
>>
>>
>

Reply via email to