Hi Till,

It happened during deserialization of a savepoint.

Best regards,
Dmitry

On Fri, Feb 17, 2017 at 2:48 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Dmitry,
>
> curious to know when exactly you observed the IllegalStateException. Did
> it happen after resuming from a savepoint or did it already happen during
> the first run of the program? If the latter is the case, then this might
> indicate a bug where we don’t use the correct ExecutionConfig to
> instantiate the serializers.
>
> Concerning the addDefaultKryoSerializer method, this basically register a
> serializer for a specific type but it does not register the type with Kryo.
> Thus, it should still be beneficial to call registerType for the type for
> which you’ve registered a default serializer. But you can also call
> registerTypeWithKryoSerializer which does both for you.
>
> Cheers,
> Till
> ​
>
> On Fri, Feb 17, 2017 at 12:38 PM, Dmitry Golubets <dgolub...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I was using ```cs.knownDirectSubclasses``` recursively to find and
>> register subclasses, which may have resulted in order mess.
>> Later I changed that to 
>> ````cs.knownDirectSubclasses.toList.sortBy(_.fullName)```
>> which should have fixed the order.
>> But either it didn't or there was another problem, I was getting the
>> error anyway.
>> Interesting, it happend only on a KeyedStream after window, without
>> window it was fine.
>> I didn't change anything else in the job.
>>
>> However I removed ```registerType``` calls completely. Because I didn't
>> notice any performance difference.
>> Do you know if ```registerType``` has any effect at all if I use it
>> together with ```addDefaultKryoSerializer``` for that type?
>>
>>
>> Best regards,
>> Dmitry
>>
>> On Thu, Feb 16, 2017 at 10:40 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> are you changing anything on your job between performing the savepoint
>>> and restoring the savepoint? Flink upgrade, Job upgrade, changing Kryo
>>> version, changing order in which you register Kryo serialisers?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On Fri, 10 Feb 2017 at 18:26 Dmitry Golubets <dgolub...@gmail.com>
>>> wrote:
>>>
>>>> The docs say that it may improve performance.
>>>>
>>>> How true is it, when custom serializers are provided?
>>>> There is also 'disableAutoTypeRegistration' method in the config class,
>>>> implying Flink registers types automatically.
>>>>
>>>> So, given that I have an hierarchy:
>>>> trait A
>>>> class B extends A
>>>> class C extends A
>>>>
>>>> and I do addDefaultKryoSerializer(classOf[A], classOf[ASerializer])
>>>>
>>>> should I care about registering B and C with 'registerType' method?
>>>>
>>>> It worth mentioning that when I registered my message class
>>>> hierarchies, I got:
>>>> java.lang.IllegalStateException: Could not initialize keyed state
>>>> backend.
>>>> java.io.StreamCorruptedException: invalid type code: 00
>>>> on restoring from savepoint
>>>>
>>>> After some debugging I found that 'registerType' was the cause.
>>>> It might be possible that my code called registerType in different
>>>> order.
>>>> Could it be a problem?
>>>>
>>>> Best regards,
>>>> Dmitry
>>>>
>>>
>>
>

Reply via email to