Hi Shai,

Your suggestion makes a lot of sense. I did not realize Kryo allows changing 
that, thanks for correcting!
It’s definitely reasonable to provide a way to proxy that setting through the 
`StreamExecutionEnvironment`, if Kryo itself has the functionality already.
I’ve filed a JIRA for this feature: 
https://issues.apache.org/jira/browse/FLINK-6857.

Cheers,
Gordon

On 6 June 2017 at 6:07:15 PM, Shai Kaplan (shai.kap...@microsoft.com) wrote:

Actually Kryo does allow overriding that by calling 
kryo.setDefaultSerializer(), it would be nice if Flink would provide a way to 
override getKryoInstance() or to subscribe a callback for after Kryo is 
initialized, or simply let me define a default serializer and then call 
kryo.setDefaultSerializer(), the same way it "forwards" the 
addDefaultSerializer calls.

I could register the TaggedFieldSerializer for my specific classes, but this is 
error prone, because at some point I might add a new class to the state and 
forget to register it, and then I won't be able to change it.

 

 

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]  
Sent: Tuesday, June 06, 2017 6:52 PM
To: user@flink.apache.org
Subject: RE: KryoException: Encountered unregistered class ID

 

Ah, I see what you’re trying to achieve. Then I don’t think that is possible 
then, simply because Kryo doesn’t allow overriding that.

 

But wouldn’t you be able to just, through Flink’s `StreamExecutionEnvironment`, 
register the TaggedFieldSerializer for your to-be-migrated specific class?

The downside of course is that you always need that registered as the default 
serializer for the class, but I think that’s the only possible way at the 
moment.

 

On 6 June 2017 at 5:45:12 PM, Shai Kaplan (shai.kap...@microsoft.com) wrote:

No. This way you can set default serializers for specific classes, I want to 
change the default serializer that Kryo uses when it can't find a registered 
serializer for a class.

See Kryo.getDefaultSerializer(), notice the last line calls 
newDefaultSerializer() which is hardcoded to be FieldSerializer.

 

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]  
Sent: Tuesday, June 06, 2017 6:13 PM
To: user@flink.apache.org
Subject: RE: KryoException: Encountered unregistered class ID

 

StreamExecutionEnvironment.addDefaultKryoSerializer(YourClass.class, 
TaggedFieldSerializer.class) should work.

You can also specify it directly: 
StreamExecutionEnvironment.registerTypeWithKryoSerializer(YourClass.class, 
TaggedFieldSerializer.class).

 

Does the above work?

On 6 June 2017 at 5:09:21 PM, Shai Kaplan (shai.kap...@microsoft.com) wrote:

I understand that my problem arises from Kryo using FieldSerializer as the 
default serializer.

Looking at Kryo's documentation 
(https://github.com/EsotericSoftware/kryo#compatibility), this could be easily 
solved by setting the default serializer to TaggedFieldSerializer. Flink, 
however, doesn't let me access Kryo directly (specifically after 
initialization), I can only add extra serializers using 
StreamExecutionEnvironment. addDefaultKryoSerializer, but the default will 
still be FieldSerializer. Is there any way I can interfere with 
checkKryoInitialized() and set the default serializer?

 

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]  
Sent: Sunday, June 04, 2017 3:28 PM
To: Shai Kaplan <shai.kap...@microsoft.com>; user@flink.apache.org
Subject: Re: KryoException: Encountered unregistered class ID

 

Hi Shai,

 

Flink’s Kryo registrations do not allow specifying the registration ID. They 
simply start from ID 10 ( < 10 is reserved by Kryo for primitive types).

 

My guess at what you’re observing here is that when trying to deserialize your 
newly changed class instance, it also tries to read the extra field, which did 
not exist before.

This extra read caused the interpreted ID of the next to-be-read instance to be 
messed up. Therefore, it isn’t that the ID of that class has changed, but 
simply that the deserialization is incorrectly reading extra bytes for the 
previous instance and the ID of the next instance is read at the wrong position.

 

The main issue here is that Kryo itself doesn’t handle serializer upgrades, 
i.e. the new serializer for your class created by Kryo will try to read that 
extra field even though it previously did not exist.

I would suggest two possible solutions here:

1. Simply let the original class stay untouched, and have a new class for your 
updated schema. When reading the old state, Kryo will use the correct 
serializer to read instances of the old class.

2. Directly change the old class, but you have to register a custom serializer 
for that class, which can avoid the new fields if necessary when reading (i.e., 
skip reading that field if it simply isn’t there).  

 

Cheers,

Gordon

 

On 4 June 2017 at 1:57:01 PM, Shai Kaplan (shai.kap...@microsoft.com) wrote:

Hi.

I'm running a job from a savepoint. I've changed one of the classes stored in 
state. When I try to load the value from the state I get 
"com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
97".

I tried to understand if the problem arise from the nature of the change, or 
simply because there was a change in the class, so I changed the class so that 
the only difference from the previous version is a new boolean field, and the 
problem still occur (the ID number changes when I change the class). If I 
revert the class back to its old version, everything is fine.

 

I'm not sure if the class ID that I'm seeing is the right one, or is it just 
some random number received from reading the wrong place in the serialized 
object, or something like that. When I change the boolean to String the number 
in the exception changes to 41188…

 

What should I do to be able to restore from the state a class that now has a 
new field? Should I manually register the class? With what ID?

Reply via email to