Hi,

I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the value
on update.
As for "value()", it may (de)serialize it and return a copy if there is an
ongoing async snapshot in progress (to protect from modifications). This
shouldn't happen often though.

Regards,
Roman


On Mon, Feb 8, 2021 at 3:24 AM Yun Tang <myas...@live.com> wrote:

> Hi,
>
> MemoryStateBackend and FsStateBackend both hold keyed state in
> HeapKeyedStateBackend [1], and the main structure to store data is
> StateTable [2] which holds POJO format objects. That is to say, the object
> would not be serialized when calling update().
> On the other hand, RocksDB statebackend would store value with serialized
> bytes.
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* Colletta, Edward <edward.colle...@fmr.com>
> *Sent:* Sunday, February 7, 2021 19:53
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* question on ValueState
>
>
> Using FsStateBackend.
>
>
>
> I was under the impression that ValueState.value will serialize an object
> which is stored in the local state backend, copy the serialized object and
> deserializes it.  Likewise update() would do the same steps copying the
> object back to local state backend.    And as a consequence, storing
> collections in ValueState is much less efficient than using ListState or
> MapState if possible.
>
>
>
> However, I am looking at some code I wrote a while ago which made the
> assumption that the value() method just returned a reference to the
> object.  The code only calls update() when creating the object if value()
> returns null.    Yet the code works, all changes to the object stored in
> state are visible the next time value() is called.   I have some sample
> code below.
>
>
>
> Can someone clarify what really happens when value() is called?
>
>
>
>
>
>    public void processElement(M in, Context ctx, Collector<Long> out)
> throws Exception {
>
>         MyWindow myWindow;
>
>         myWindow = windowState.value();
>
>         if (myWindow == null) {
>
>
> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
> + interval) / interval) * interval);
>
>             myWindow = new MyWindow(0L, slide, windowSize);
>
>             windowState.update(myWindow);
>
>             myWindow.eq.add(0L);
>
>         }
>
>
> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator()
> + in.value);
>
>     }
>
>
>
>     @Override
>
>     public void onTimer(long timestamp, OnTimerContext ctx,
> Collector<Long> out) throws Exception {
>
>
> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
> + interval) / interval) * interval);
>
>         MyWindow myWindow = windowState.value();
>
>         myWindow.slide(0L);
>
>         out.collect(myWindow.globalAccum);
>
>     }
>
>
>
>
>

Reply via email to