[ 
https://issues.apache.org/jira/browse/FLINK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14288922#comment-14288922
 ] 

ASF GitHub Bot commented on FLINK-1391:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/323#discussion_r23435527
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
 ---
    @@ -237,6 +244,25 @@ private void checkKryoInitialized() {
                        // Throwable and all subclasses should be serialized 
via java serialization
                        kryo.addDefaultSerializer(Throwable.class, new 
JavaSerializer());
     
    +                   // If the type we have to serialize as a GenricType is 
implementing SpecificRecordBase,
    +                   // we have to register the avro serializer
    +                   // This rule only applies if users explicitly use the 
GenericTypeInformation for the avro types
    +                   // usually, we are able to handle Avro POJOs with the 
POJO serializer.
    +                   if(SpecificRecordBase.class.isAssignableFrom(type)) {
    +                           ClassTag<SpecificRecordBase> tag = 
scala.reflect.ClassTag$.MODULE$.apply(type);
    +                           this.kryo.register(type, 
com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag));
    +
    +                   }
    +                   // Avro POJOs contain java.util.List which have 
GenericData.Array as their runtime type
    +                   // because Kryo is not able to serialize them properly, 
we use this serializer for them
    +                   this.kryo.register(GenericData.Array.class, new 
SpecificInstanceCollectionSerializer(ArrayList.class));
    --- End diff --
    
    Should we make this registration conditional so that it only happens when 
we have encountered an Avro type?


> Kryo fails to properly serialize avro collection types
> ------------------------------------------------------
>
>                 Key: FLINK-1391
>                 URL: https://issues.apache.org/jira/browse/FLINK-1391
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 0.8, 0.9
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>
> Before FLINK-610, Avro was the default generic serializer.
> Now, special types coming from Avro are handled by Kryo .. which seems to 
> cause errors like:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.NullPointerException
>       at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:143)
>       at 
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:148)
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:244)
>       at 
> org.apache.flink.runtime.plugable.DeserializationDelegate.read(DeserializationDelegate.java:56)
>       at 
> org.apache.flink.runtime.io.network.serialization.AdaptiveSpanningRecordDeserializer.getNextRecord(AdaptiveSpanningRecordDeserializer.java:71)
>       at 
> org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:189)
>       at 
> org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176)
>       at 
> org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51)
>       at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53)
>       at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170)
>       at 
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
>       at java.lang.Thread.run(Thread.java:744)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to