Good afternoon dear Community,
Since few days I'm really struggling to understand the reason behind this
KryoException. Here the stack trace.
2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask
- Error in task code: CHAIN GroupReduce (GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat
ion$.main(MatrixMultiplication.scala:46)) (1/1)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
(GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B
lockMatrix.scala:103)) -> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))'
, caused an error: E
rror obtaining the sorted input: Thread 'SortMerger spilling thread'
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger spilling thread' terminated due to an exception:
java.lang.IndexOu
tOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 11 more
2017-06-07 10:18:52,594 INFO
org.apache.flink.runtime.taskmanager.TaskManager - Memory usage
stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)]
2017-06-07 10:18:52,766 INFO
org.apache.flink.runtime.taskmanager.TaskManager - Direct
memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281
2017-06-07 10:18:52,766 INFO
org.apache.flink.runtime.taskmanager.TaskManager - Off-heap
pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace:
57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB
(used/committed/max)]
2017-06-07 10:18:52,766 INFO
org.apache.flink.runtime.taskmanager.TaskManager - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97],
[G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1]
2017-06-07 10:18:52,841 INFO org.apache.flink.runtime.taskmanager.Task
- CHAIN GroupReduce (GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))
(1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
(GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))'
, caused an error: Error obtaining the sorted input: Thread 'SortMerger
spilling thread' terminated due to an exception:
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger spilling thread' terminated due to an exception:
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 11 more
What I'm doing basically is a product between matrices: I load the matrices
COO formatted; the Block class is the following (really much inspired to
this https://issues.apache.org/jira/browse/FLINK-3920).
import breeze.linalg.{Matrix => BreezeMatrix}
import org.apache.flink.ml.math.Breeze._
import org.apache.flink.ml.math.{Matrix, SparseMatrix}
class Block(val blockData: Matrix) extends MatrixLayout with Serializable {
def data: Matrix = blockData
def toBreeze: BreezeMatrix[Double] = blockData.asBreeze
def numRows: Int = data.numRows
def numCols: Int = data.numCols
def *(other: Block): Block = {
require(this.numCols == other.numRows)
Block((blockData.asBreeze * other.toBreeze).fromBreeze)
}
def +(other: Block): Block =
Block((blockData.asBreeze + other.toBreeze).fromBreeze)
def unary_+(other: Block): Block = this + other
override def equals(other: Any): Boolean = {
other match {
case block: Block => this.blockData.equalsMatrix(block.blockData)
case _ => false
}
}
}
The block matrix is a matrix of blocks, the implicated group reduce function
it's the last step of the product function.
class SumGroupOfBlocks(blockMapper: BlockMapper)
extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)),
(BlockID, Block)] {
override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int,
Int, Block))], out: Collector[(BlockID, Block)])
: Unit = {
val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect {
case ((i, j, left), (x, y, right)) => (i, y, left * right)
}.toSeq
val reducedGroup = multipliedGroup.reduce((left, right) => {
val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right)
(i, j, leftBlock + rightBlock)
})
out.collect(blockMapper.blockIdFromCoo(reducedGroup._1,
reducedGroup._2), reducedGroup._3)
}
}
The above described exception happens when I try to increase the matrices
sizes over 2000x2000 (rowsXcols). It means that my code works with 1000x1000
matrices, but not with 2000x2000 matrices and above.
I think it worths to mention also that the IndexOutOfBoundsException is
always seeking for index 109 (on different matrices sizes) and the size of
the Array is changing in a range (5-7). It looks like somehow the serialized
message are truncated right before their delivery.
I tried to follow several solutions, not in order what has not been worked:
- employing flink-1.2.0, flink-1.3.0
- updating flink kryo library to 3.0.3
- running on parallelism 1
- explicitly register my custom classes to Kryo
- varying the size of my blocks
- trying to increase akka.framesize
I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM.
6GB task manager heap size.
16384 numOfBuffers and 16384 networkBufferSize.
If I run the code on my laptop on 2000x2000 matrices, it works, likely due
to jumping off remote serialization.
I really hope someone could help here. It's becoming really painful...
Thank you so much.
Cheers, Andrea
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at
Nabble.com.