Hi Biswajit,

You can follow this is:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/best_practices.html#register-a-custom-serializer-for-your-flink-program


Best regards,
Duo

On Sat, Jan 21, 2017 at 9:15 AM, Biswajit Das <biswajit...@gmail.com> wrote:

> Hello,
>
> Having an issue with nested protobuf deserialization, event tried with
> register the class with Kryo like beloe but seems like no help ,  one of
> the options left for me is to write a custom serializer or convert the byte
> array to a Dictionary object .
>
>
> *val clazz =
> Class.forName("java.util.Collections$UnmodifiableCollection");env.getConfig.registerTypeWithKryoSerializer(clazz,classOf[UnmodifiableCollectionsSerializer])*
>
>
> -----------
> .ClickSchema$Click)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(Objec
> tField.java:125)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(F
> ieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(Objec
> tField.java:113)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(F
> ieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
> zer.deserialize(KryoSerializer.java:232)
>     at org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> Serializer.deserialize(StreamRecordSerializer.java:112)
>     at org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> Serializer.deserialize(StreamRecordSerializer.java:42)
>     at org.apache.flink.runtime.plugable.NonReusingDeserializationD
> elegate.read(NonReusingDeserializationDelegate.java:55)
>     at org.apache.flink.runtime.io.network.api.serialization.Spilli
> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
> daptiveSpanningRecordDeserializer.java:116)
>     at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
> rocessInput(StreamInputProcessor.java:156)
>     at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
> run(OneInputStreamTask.java:67)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
> treamTask.java:267)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.UnsupportedOperationException
>     at java.util.Collections$UnmodifiableCollection.add(Collections
> .java:1055)
>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.r
> ead(CollectionSerializer.java:109)
>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.r
> ead(CollectionSer
> ------------------------
>



-- 

Programmer, Geek...

Reply via email to