[ 
https://issues.apache.org/jira/browse/FLINK-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshua Griffith updated FLINK-4719:
-----------------------------------
    Comment: was deleted

(was: I'm also experiencing this issue when using the distinct transform. I 
thought it was an issue with the generic serializers so I wrote and registered 
custom Kryo serializers for each type but I'm still getting this error when 
spilling to disk using Flink 1.3.0-RC0. Does anyone know of a workaround?)

> KryoSerializer random exception
> -------------------------------
>
>                 Key: FLINK-4719
>                 URL: https://issues.apache.org/jira/browse/FLINK-4719
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.1.1
>            Reporter: Flavio Pompermaier
>              Labels: kryo, serialization
>
> There's a random exception that involves somehow the KryoSerializer when 
> using POJOs in Flink jobs reading large volumes of data.
> It is usually thrown in several places, e.g. (the Exceptions reported here 
> can refer to previous versions of Flink...):
> {code}
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at 
> main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted 
> input: Thread 'SortMerger spilling thread' terminated due to an exception: 
> Unable to find class: java.ttil.HashSet
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger spilling thread' terminated due to an exception: Unable to 
> find class: java.ttil.HashSet
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>         at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>         at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>         ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
> terminated due to an exception: Unable to find class: java.ttil.HashSet
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
> java.ttil.HashSet
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>         at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
>         at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
>         at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>         at 
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:348)
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> {code}
> {code}
> Caused by: java.io.IOException: Serializer consumed more bytes than the 
> record had. This indicates broken serialization. If you are using custom 
> serialization types (Value or Writable), check their serialization methods. 
> If you are using a Kryo-serialized type, check the corresponding Kryo 
> serializer.
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>     at java.util.ArrayList.elementData(ArrayList.java:418)
>     at java.util.ArrayList.get(ArrayList.java:431)
>     at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>     at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>     at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>     at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> {code}
> {code}
> java.lang.RuntimeException: Cannot instantiate class.
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>       at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>       at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>       at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>       at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>       at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: 
> it.okkam.flink.test.model.pojo.VdhicleEvent
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>       ... 10 more
> {code}
> {code}
> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>         at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>         at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>         at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>         at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>         at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>         at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>         at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> {code}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>       at java.util.ArrayList.elementData(ArrayList.java:418)
>       at java.util.ArrayList.get(ArrayList.java:431)
>       at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>       at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>       at 
> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
>       at 
> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
>       at 
> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
>       at 
> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:45)
>       at 
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130)
>       at 
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
>       at 
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
>       at 
> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:64)
>       at 
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>       at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to