Yes it looks very similar to the Exception I experienced ( https://issues.apache.org/jira/browse/FLINK-6398) but my error was more related to Row serialization/deserialization (see [1]) while this looks more like something related to Kryo. However also with Flink 1.3.0 the error seems to appear from what I understood so it can't be the same problem.
Once I had a very similar problems (see [2] and [3]) and I was able to avoid the problem removing the reuse of input and output within KryoSerializer (as discuss in [3])/ I hope that could help.. Best, Flavio [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ UnilateralSortMerger-error-again-td12680.html [2] http://apache-flink-user-mailing-list-archive.2336050. n4.nabble.com/Weird-Kryo-exception-Unable-to-find-class-java-ttil-HashSet- td6927i20.html [3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/com-esotericsoftware-kryo-KryoException-and-java-lang-IndexOutOfBoundsException-td7459.html On Thu, Jun 8, 2017 at 11:39 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > @Flavio, doesn’t this look like the exception you often encountered a > while back? If I remember correctly that was fixed by Kurt, right? > > Best, > Aljoscha > > On 7. Jun 2017, at 18:11, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > > Hi Andrea, > > I did some quick issue searching, and it seems like this is a frequently > asked issue on Kryo: https://github.com/EsotericSoftware/kryo/issues/428. > > I can’t be sure at the moment if the resolution / workaround mentioned in > there makes sense, I’ll have to investigate a bit more. > > Also, to clarify: from the stack trace, it seems like you’re simply using > whatever serializer Kryo defaults to (i.e. FieldSerializer), and not > registering your own, is that correct? > > In the meanwhile, could you also try the following and rebuild Flink, and > test to see if it works?: > on https://github.com/apache/flink/blob/master/flink-core/ > src/main/java/org/apache/flink/api/java/typeutils/ > runtime/kryo/KryoSerializer.java#L349, change setReferences to false. > > Cheers, > Gordon > > > On 7 June 2017 at 3:39:55 PM, Andrea Spina (andrea.sp...@radicalbit.io) > wrote: > > Good afternoon dear Community, > > Since few days I'm really struggling to understand the reason behind this > KryoException. Here the stack trace. > > 2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask > > - Error in task code: CHAIN GroupReduce (GroupReduce at > my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times( > BlockMatrix.scala:103)) > -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat > ion$.main(MatrixMultiplication.scala:46)) (1/1) > java.lang.Exception: The data preparation for task 'CHAIN GroupReduce > (GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$ > times(B > lockMatrix.scala:103)) -> Map (Map at > my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main( > MatrixMultiplication.scala:46))' > , caused an error: E > rror obtaining the sorted input: Thread 'SortMerger spilling thread' > terminated due to an exception: java.lang.IndexOutOfBoundsException: > Index: > 109, Size: 5 > Serialization trace: > blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger spilling thread' terminated due to an exception: > java.lang.IndexOu > tOfBoundsException: Index: 109, Size: 5 > Serialization trace: > blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator( > UnilateralSortMerger.java:619) > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) > > at > org.apache.flink.runtime.operators.GroupReduceDriver. > prepare(GroupReduceDriver.java:99) > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger spilling thread' > terminated due to an exception: java.lang.IndexOutOfBoundsException: > Index: > 109, Size: 5 > Serialization trace: > blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > ThreadBase.run(UnilateralSortMerger.java:799) > Caused by: com.esotericsoftware.kryo.KryoException: > java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 > Serialization trace: > blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) > at > com.esotericsoftware.kryo.serializers.ObjectField.read( > ObjectField.java:125) > at > com.esotericsoftware.kryo.serializers.FieldSerializer. > read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:250) > at > org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:264) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy( > KryoSerializer.java:274) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy( > TupleSerializerBase.java:98) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy( > TupleSerializerBase.java:98) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy( > TupleSerializerBase.java:98) > at > org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput( > NormalizedKeySorter.java:519) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > SpillingThread.go(UnilateralSortMerger.java:1344) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > ThreadBase.run(UnilateralSortMerger.java:796) > Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject( > MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) > at > com.esotericsoftware.kryo.serializers.ObjectField.read( > ObjectField.java:106) > ... 11 more > 2017-06-07 10:18:52,594 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Memory usage > stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB > (used/committed/max)] > 2017-06-07 10:18:52,766 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Direct > memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281 > 2017-06-07 10:18:52,766 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Off-heap > pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace: > 57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB > (used/committed/max)] > 2017-06-07 10:18:52,766 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Garbage > collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97], > > [G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1] > 2017-06-07 10:18:52,841 INFO org.apache.flink.runtime.taskmanager.Task > - CHAIN GroupReduce (GroupReduce at > my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times( > BlockMatrix.scala:103)) > -> Map (Map at > my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main( > MatrixMultiplication.scala:46)) > (1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED. > java.lang.Exception: The data preparation for task 'CHAIN GroupReduce > (GroupReduce at > my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times( > BlockMatrix.scala:103)) > -> Map (Map at > my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main( > MatrixMultiplication.scala:46))' > , caused an error: Error obtaining the sorted input: Thread 'SortMerger > spilling thread' terminated due to an exception: > java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 > Serialization trace: > blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger spilling thread' terminated due to an exception: > java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 > Serialization trace: > blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator( > UnilateralSortMerger.java:619) > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) > > at > org.apache.flink.runtime.operators.GroupReduceDriver. > prepare(GroupReduceDriver.java:99) > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger spilling thread' > terminated due to an exception: java.lang.IndexOutOfBoundsException: > Index: > 109, Size: 5 > Serialization trace: > blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) > at > com.esotericsoftware.kryo.serializers.ObjectField.read( > ObjectField.java:125) > at > com.esotericsoftware.kryo.serializers.FieldSerializer. > read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:250) > at > org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:264) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy( > KryoSerializer.java:274) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy( > TupleSerializerBase.java:98) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy( > TupleSerializerBase.java:98) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy( > TupleSerializerBase.java:98) > at > org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput( > NormalizedKeySorter.java:519) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > SpillingThread.go(UnilateralSortMerger.java:1344) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > ThreadBase.run(UnilateralSortMerger.java:796) > Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject( > MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) > at > com.esotericsoftware.kryo.serializers.ObjectField.read( > ObjectField.java:106) > ... 11 more > > > What I'm doing basically is a product between matrices: I load the matrices > > COO formatted; the Block class is the following (really much inspired to > this https://issues.apache.org/jira/browse/FLINK-3920). > > > import breeze.linalg.{Matrix => BreezeMatrix} > import org.apache.flink.ml.math.Breeze._ > import org.apache.flink.ml.math.{Matrix, SparseMatrix} > > class Block(val blockData: Matrix) extends MatrixLayout with Serializable { > > > def data: Matrix = blockData > > def toBreeze: BreezeMatrix[Double] = blockData.asBreeze > > def numRows: Int = data.numRows > > def numCols: Int = data.numCols > > def *(other: Block): Block = { > > require(this.numCols == other.numRows) > > Block((blockData.asBreeze * other.toBreeze).fromBreeze) > } > > def +(other: Block): Block = > Block((blockData.asBreeze + other.toBreeze).fromBreeze) > > def unary_+(other: Block): Block = this + other > > override def equals(other: Any): Boolean = { > other match { > case block: Block => this.blockData.equalsMatrix(block.blockData) > case _ => false > } > } > > } > > The block matrix is a matrix of blocks, the implicated group reduce > function > it's the last step of the product function. > > class SumGroupOfBlocks(blockMapper: BlockMapper) > extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)), > (BlockID, Block)] { > > override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int, > Int, Block))], out: Collector[(BlockID, Block)]) > : Unit = { > > val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect { > case ((i, j, left), (x, y, right)) => (i, y, left * right) > }.toSeq > > val reducedGroup = multipliedGroup.reduce((left, right) => { > val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right) > > (i, j, leftBlock + rightBlock) > }) > > out.collect(blockMapper.blockIdFromCoo(reducedGroup._1, > reducedGroup._2), reducedGroup._3) > } > } > > The above described exception happens when I try to increase the matrices > sizes over 2000x2000 (rowsXcols). It means that my code works with > 1000x1000 > matrices, but not with 2000x2000 matrices and above. > > I think it worths to mention also that the IndexOutOfBoundsException is > always seeking for index 109 (on different matrices sizes) and the size of > > the Array is changing in a range (5-7). It looks like somehow the > serialized > message are truncated right before their delivery. > > I tried to follow several solutions, not in order what has not been worked: > > > - employing flink-1.2.0, flink-1.3.0 > - updating flink kryo library to 3.0.3 > - running on parallelism 1 > - explicitly register my custom classes to Kryo > - varying the size of my blocks > - trying to increase akka.framesize > > I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM. > 6GB task manager heap size. > 16384 numOfBuffers and 16384 networkBufferSize. > > If I run the code on my laptop on 2000x2000 matrices, it works, likely due > > to jumping off remote serialization. > > I really hope someone could help here. It's becoming really painful... > > Thank you so much. > > Cheers, Andrea > > > > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/Painful- > KryoException-java-lang-IndexOutOfBoundsException-on- > Flink-Batch-Api-scala-tp13558.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. > > >