Re: Kryo Deserializer

2017-01-20 Thread 小多
Hi Biswajit,


You can follow this is:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/best_practices.html#register-a-custom-serializer-for-your-flink-program


Best regards,
Duo

On Sat, Jan 21, 2017 at 9:15 AM, Biswajit Das  wrote:

> Hello,
>
> Having an issue with nested protobuf deserialization, event tried with
> register the class with Kryo like beloe but seems like no help ,  one of
> the options left for me is to write a custom serializer or convert the byte
> array to a Dictionary object .
>
>
> *val clazz =
> Class.forName("java.util.Collections$UnmodifiableCollection");env.getConfig.registerTypeWithKryoSerializer(clazz,classOf[UnmodifiableCollectionsSerializer])*
>
>
> ---
> .ClickSchema$Click)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(Objec
> tField.java:125)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(F
> ieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(Objec
> tField.java:113)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(F
> ieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
> zer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> Serializer.deserialize(StreamRecordSerializer.java:112)
> at org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> Serializer.deserialize(StreamRecordSerializer.java:42)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationD
> elegate.read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.Spilli
> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
> daptiveSpanningRecordDeserializer.java:116)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
> rocessInput(StreamInputProcessor.java:156)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
> run(OneInputStreamTask.java:67)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
> treamTask.java:267)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.UnsupportedOperationException
> at java.util.Collections$UnmodifiableCollection.add(Collections
> .java:1055)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.r
> ead(CollectionSerializer.java:109)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.r
> ead(CollectionSer
> 
>



-- 

Programmer, Geek...


Kryo Deserializer

2017-01-20 Thread Biswajit Das
Hello,

Having an issue with nested protobuf deserialization, event tried with
register the class with Kryo like beloe but seems like no help ,  one of
the options left for me is to write a custom serializer or convert the byte
array to a Dictionary object .


*val clazz =
Class.forName("java.util.Collections$UnmodifiableCollection");env.getConfig.registerTypeWithKryoSerializer(clazz,classOf[UnmodifiableCollectionsSerializer])*


---
.ClickSchema$Click)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:112)
at
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:42)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:116)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:156)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException
at
java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSer