StreamExecutionEnvironment.addDefaultKryoSerializer(YourClass.class, 
TaggedFieldSerializer.class) should work.
You can also specify it directly: 
StreamExecutionEnvironment.registerTypeWithKryoSerializer(YourClass.class, 
TaggedFieldSerializer.class).

Does the above work?
On 6 June 2017 at 5:09:21 PM, Shai Kaplan (shai.kap...@microsoft.com) wrote:

I understand that my problem arises from Kryo using FieldSerializer as the 
default serializer.

Looking at Kryo's documentation 
(https://github.com/EsotericSoftware/kryo#compatibility), this could be easily 
solved by setting the default serializer to TaggedFieldSerializer. Flink, 
however, doesn't let me access Kryo directly (specifically after 
initialization), I can only add extra serializers using 
StreamExecutionEnvironment. addDefaultKryoSerializer, but the default will 
still be FieldSerializer. Is there any way I can interfere with 
checkKryoInitialized() and set the default serializer?

 

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]  
Sent: Sunday, June 04, 2017 3:28 PM
To: Shai Kaplan <shai.kap...@microsoft.com>; user@flink.apache.org
Subject: Re: KryoException: Encountered unregistered class ID

 

Hi Shai,

 

Flink’s Kryo registrations do not allow specifying the registration ID. They 
simply start from ID 10 ( < 10 is reserved by Kryo for primitive types).

 

My guess at what you’re observing here is that when trying to deserialize your 
newly changed class instance, it also tries to read the extra field, which did 
not exist before.

This extra read caused the interpreted ID of the next to-be-read instance to be 
messed up. Therefore, it isn’t that the ID of that class has changed, but 
simply that the deserialization is incorrectly reading extra bytes for the 
previous instance and the ID of the next instance is read at the wrong position.

 

The main issue here is that Kryo itself doesn’t handle serializer upgrades, 
i.e. the new serializer for your class created by Kryo will try to read that 
extra field even though it previously did not exist.

I would suggest two possible solutions here:

1. Simply let the original class stay untouched, and have a new class for your 
updated schema. When reading the old state, Kryo will use the correct 
serializer to read instances of the old class.

2. Directly change the old class, but you have to register a custom serializer 
for that class, which can avoid the new fields if necessary when reading (i.e., 
skip reading that field if it simply isn’t there).  

 

Cheers,

Gordon

 

On 4 June 2017 at 1:57:01 PM, Shai Kaplan (shai.kap...@microsoft.com) wrote:

Hi.

I'm running a job from a savepoint. I've changed one of the classes stored in 
state. When I try to load the value from the state I get 
"com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
97".

I tried to understand if the problem arise from the nature of the change, or 
simply because there was a change in the class, so I changed the class so that 
the only difference from the previous version is a new boolean field, and the 
problem still occur (the ID number changes when I change the class). If I 
revert the class back to its old version, everything is fine.

 

I'm not sure if the class ID that I'm seeing is the right one, or is it just 
some random number received from reading the wrong place in the serialized 
object, or something like that. When I change the boolean to String the number 
in the exception changes to 41188…

 

What should I do to be able to restore from the state a class that now has a 
new field? Should I manually register the class? With what ID?

Reply via email to