Hi Biswajit,
You can follow this is:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/best_practices.html#register-a-custom-serializer-for-your-flink-program
Best regards,
Duo
On Sat, Jan 21, 2017 at 9:15 AM, Biswajit Das wrote:
> Hello,
>
> Having an issue with nested protobuf deserialization, event tried with
> register the class with Kryo like beloe but seems like no help , one of
> the options left for me is to write a custom serializer or convert the byte
> array to a Dictionary object .
>
>
> *val clazz =
> Class.forName("java.util.Collections$UnmodifiableCollection");env.getConfig.registerTypeWithKryoSerializer(clazz,classOf[UnmodifiableCollectionsSerializer])*
>
>
> ---
> .ClickSchema$Click)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(Objec
> tField.java:125)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(F
> ieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(Objec
> tField.java:113)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(F
> ieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
> zer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> Serializer.deserialize(StreamRecordSerializer.java:112)
> at org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> Serializer.deserialize(StreamRecordSerializer.java:42)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationD
> elegate.read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.Spilli
> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
> daptiveSpanningRecordDeserializer.java:116)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
> rocessInput(StreamInputProcessor.java:156)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
> run(OneInputStreamTask.java:67)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
> treamTask.java:267)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.UnsupportedOperationException
> at java.util.Collections$UnmodifiableCollection.add(Collections
> .java:1055)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.r
> ead(CollectionSerializer.java:109)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.r
> ead(CollectionSer
>
>
--
Programmer, Geek...