Some extra information after understanding your case a bit more:

What you would need to do, is to return a 
`CompatibilityResult#requiresMigration(convertSerializer)`, where the 
`convertSerializer` is a serializer that does read the extra information.
The `convertSerializer` will only ever be used to read the old data, and the 
new serializer is used to serialize state so that it is written in the new 
format.

Unfortunately, currently if you return `requiresMigration()`, the job will 
always fail since this is currently not supported yet. State migration is 
currently blocked by [1].

I’m happy to hear feedback regarding the new compatibility / config snapshot 
methods on TypeSerializer.
Let me know if all of this makes sense to you :)

Cheers,
Gordon

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-22-Eager-State-Declaration-td18562.html


On 7 July 2017 at 9:23:32 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote:

Hi Dawid,

First of all, one thing to clarify: TypeSerializer#ensureCompatibility is 
invoked on the new provided state serializer.
Also, a reconfigured compatible serializer should NOT have different 
serialization formats (that would require state migration, i.e. return 
CompatibilityResult#requiresStateMigration).
A reconfigured serializer will be continued to be used by Flink as if nothing 
has changed, can read old AND new data, and still writes in the same format.
In your case, I think you may be interpreting “a reconfigured serializer” 
incorrectly.

Unfortunately it does not work for HeapStateBackend as during restoring the 
StateBackend the method TypeSerializer#ensureCompatibility is not invoked and 
the state value is eagerly deserialized with the not reconfigured serializer. 
For the HeapStateBackend, as of now, this is expected. The main reason for this 
is that currently, new state serializers are provided lazily (i.e. when the 
state descriptor is registered). There is no new serializer available to be 
confronted / reconfigured with the previous TypeSerializerConfigSnapshot at 
restore time.

Therefore, for HeapStateBackend, we are using the old serialized serializer 
(not invoked with #ensureCompatibility) to read everything to state objects. 
This should always work as long as the old serializer can be deserialized 
properly.

Cheers,
Gordon

On 7 July 2017 at 5:57:56 PM, Dawid Wysakowicz (wysakowicz.da...@gmail.com) 
wrote:

Hi devs,

Currently I am working on some changes to serializer for NFA class in CEP 
library. I am trying to understand how the TypeSerializer#ensureCompatibility 
feature works.

What I want to do is in a previous version (e.g. in 1.3.0) some information was 
serialized that now shouldn't. In TypeSerializer#ensureCompatibility I am 
setting a flag based on corresponding ConfigSnapshot version that tells me if 
that additional info should be read.

So let’s get to the point :). Unfortunately it does not work for 
HeapStateBackend as during restoring the StateBackend the method 
TypeSerializer#ensureCompatibility is not invoked and the state value is 
eagerly deserialized with the not reconfigured serializer.
It does work though for RocksDBStateBackend, as while restoring there is no 
deserialisation of the value(lazy deserialization). It is first deserialized 
when accessing (getColumnFamily etc. I suppose) and then the method 
ensuringCompatibility is called and the serializer is properly reconfigured.

My questions are:
- is my serialization plan ok, with setting the flag
- are the different behaviours intended or is it a bug for HeapStateBackend

If it is a bug, I would be willing to fix it(or at least try), but probably I 
will need some guidance.

Regards
Dawid

Reply via email to