StreamExecutionEnvironment.addDefaultKryoSerializer(YourClass.class, TaggedFieldSerializer.class) should work. You can also specify it directly: StreamExecutionEnvironment.registerTypeWithKryoSerializer(YourClass.class, TaggedFieldSerializer.class).
Does the above work? On 6 June 2017 at 5:09:21 PM, Shai Kaplan (shai.kap...@microsoft.com) wrote: I understand that my problem arises from Kryo using FieldSerializer as the default serializer. Looking at Kryo's documentation (https://github.com/EsotericSoftware/kryo#compatibility), this could be easily solved by setting the default serializer to TaggedFieldSerializer. Flink, however, doesn't let me access Kryo directly (specifically after initialization), I can only add extra serializers using StreamExecutionEnvironment. addDefaultKryoSerializer, but the default will still be FieldSerializer. Is there any way I can interfere with checkKryoInitialized() and set the default serializer? From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org] Sent: Sunday, June 04, 2017 3:28 PM To: Shai Kaplan <shai.kap...@microsoft.com>; user@flink.apache.org Subject: Re: KryoException: Encountered unregistered class ID Hi Shai, Flink’s Kryo registrations do not allow specifying the registration ID. They simply start from ID 10 ( < 10 is reserved by Kryo for primitive types). My guess at what you’re observing here is that when trying to deserialize your newly changed class instance, it also tries to read the extra field, which did not exist before. This extra read caused the interpreted ID of the next to-be-read instance to be messed up. Therefore, it isn’t that the ID of that class has changed, but simply that the deserialization is incorrectly reading extra bytes for the previous instance and the ID of the next instance is read at the wrong position. The main issue here is that Kryo itself doesn’t handle serializer upgrades, i.e. the new serializer for your class created by Kryo will try to read that extra field even though it previously did not exist. I would suggest two possible solutions here: 1. Simply let the original class stay untouched, and have a new class for your updated schema. When reading the old state, Kryo will use the correct serializer to read instances of the old class. 2. Directly change the old class, but you have to register a custom serializer for that class, which can avoid the new fields if necessary when reading (i.e., skip reading that field if it simply isn’t there). Cheers, Gordon On 4 June 2017 at 1:57:01 PM, Shai Kaplan (shai.kap...@microsoft.com) wrote: Hi. I'm running a job from a savepoint. I've changed one of the classes stored in state. When I try to load the value from the state I get "com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 97". I tried to understand if the problem arise from the nature of the change, or simply because there was a change in the class, so I changed the class so that the only difference from the previous version is a new boolean field, and the problem still occur (the ID number changes when I change the class). If I revert the class back to its old version, everything is fine. I'm not sure if the class ID that I'm seeing is the right one, or is it just some random number received from reading the wrong place in the serialized object, or something like that. When I change the boolean to String the number in the exception changes to 41188… What should I do to be able to restore from the state a class that now has a new field? Should I manually register the class? With what ID?