Good afternoon dear Community, Since few days I'm really struggling to understand the reason behind this KryoException. Here the stack trace.
2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat ion$.main(MatrixMultiplication.scala:46)) (1/1) java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B lockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' , caused an error: E rror obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOu tOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 11 more 2017-06-07 10:18:52,594 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)] 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace: 57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB (used/committed/max)] 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97], [G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1] 2017-06-07 10:18:52,841 INFO org.apache.flink.runtime.taskmanager.Task - CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46)) (1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED. java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 11 more What I'm doing basically is a product between matrices: I load the matrices COO formatted; the Block class is the following (really much inspired to this https://issues.apache.org/jira/browse/FLINK-3920). import breeze.linalg.{Matrix => BreezeMatrix} import org.apache.flink.ml.math.Breeze._ import org.apache.flink.ml.math.{Matrix, SparseMatrix} class Block(val blockData: Matrix) extends MatrixLayout with Serializable { def data: Matrix = blockData def toBreeze: BreezeMatrix[Double] = blockData.asBreeze def numRows: Int = data.numRows def numCols: Int = data.numCols def *(other: Block): Block = { require(this.numCols == other.numRows) Block((blockData.asBreeze * other.toBreeze).fromBreeze) } def +(other: Block): Block = Block((blockData.asBreeze + other.toBreeze).fromBreeze) def unary_+(other: Block): Block = this + other override def equals(other: Any): Boolean = { other match { case block: Block => this.blockData.equalsMatrix(block.blockData) case _ => false } } } The block matrix is a matrix of blocks, the implicated group reduce function it's the last step of the product function. class SumGroupOfBlocks(blockMapper: BlockMapper) extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)), (BlockID, Block)] { override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int, Int, Block))], out: Collector[(BlockID, Block)]) : Unit = { val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect { case ((i, j, left), (x, y, right)) => (i, y, left * right) }.toSeq val reducedGroup = multipliedGroup.reduce((left, right) => { val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right) (i, j, leftBlock + rightBlock) }) out.collect(blockMapper.blockIdFromCoo(reducedGroup._1, reducedGroup._2), reducedGroup._3) } } The above described exception happens when I try to increase the matrices sizes over 2000x2000 (rowsXcols). It means that my code works with 1000x1000 matrices, but not with 2000x2000 matrices and above. I think it worths to mention also that the IndexOutOfBoundsException is always seeking for index 109 (on different matrices sizes) and the size of the Array is changing in a range (5-7). It looks like somehow the serialized message are truncated right before their delivery. I tried to follow several solutions, not in order what has not been worked: - employing flink-1.2.0, flink-1.3.0 - updating flink kryo library to 3.0.3 - running on parallelism 1 - explicitly register my custom classes to Kryo - varying the size of my blocks - trying to increase akka.framesize I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM. 6GB task manager heap size. 16384 numOfBuffers and 16384 networkBufferSize. If I run the code on my laptop on 2000x2000 matrices, it works, likely due to jumping off remote serialization. I really hope someone could help here. It's becoming really painful... Thank you so much. Cheers, Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.