Good afternoon dear Community,

Since few days I'm really struggling to understand the reason behind this
KryoException. Here the stack trace.

2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask      
           
- Error in task code:  CHAIN GroupReduce (GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat
ion$.main(MatrixMultiplication.scala:46)) (1/1)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
(GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B
lockMatrix.scala:103)) -> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))'
, caused an error: E
rror obtaining the sorted input: Thread 'SortMerger spilling thread'
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger spilling thread' terminated due to an exception:
java.lang.IndexOu
tOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block)
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
        at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)
        at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
        at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
        at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
        at
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.get(ArrayList.java:429)
        at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 11 more
2017-06-07 10:18:52,594 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage
stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)]
2017-06-07 10:18:52,766 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281
2017-06-07 10:18:52,766 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace:
57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB
(used/committed/max)]
2017-06-07 10:18:52,766 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97],
[G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1]
2017-06-07 10:18:52,841 INFO  org.apache.flink.runtime.taskmanager.Task         
           
- CHAIN GroupReduce (GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))
(1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
(GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))'
, caused an error: Error obtaining the sorted input: Thread 'SortMerger
spilling thread' terminated due to an exception:
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger spilling thread' terminated due to an exception:
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
        at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)
        at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
        at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
        at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
        at
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.get(ArrayList.java:429)
        at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 11 more


What I'm doing basically is a product between matrices: I load the matrices
COO formatted; the Block class is the following (really much inspired to
this https://issues.apache.org/jira/browse/FLINK-3920).


import breeze.linalg.{Matrix => BreezeMatrix}
import org.apache.flink.ml.math.Breeze._
import org.apache.flink.ml.math.{Matrix, SparseMatrix}

class Block(val blockData: Matrix) extends MatrixLayout with Serializable {

  def data: Matrix = blockData

  def toBreeze: BreezeMatrix[Double] = blockData.asBreeze

  def numRows: Int = data.numRows

  def numCols: Int = data.numCols

  def *(other: Block): Block = {

    require(this.numCols == other.numRows)

    Block((blockData.asBreeze * other.toBreeze).fromBreeze)
  }

  def +(other: Block): Block =
    Block((blockData.asBreeze + other.toBreeze).fromBreeze)

  def unary_+(other: Block): Block = this + other

  override def equals(other: Any): Boolean = {
    other match {
      case block: Block => this.blockData.equalsMatrix(block.blockData)
      case _ => false
    }
  }

}

The block matrix is a matrix of blocks, the implicated group reduce function
it's the last step of the product function.

class SumGroupOfBlocks(blockMapper: BlockMapper)
  extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)),
(BlockID, Block)] {

  override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int,
Int, Block))], out: Collector[(BlockID, Block)])
    : Unit = {

    val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect {
      case ((i, j, left), (x, y, right)) => (i, y, left * right)
    }.toSeq

    val reducedGroup = multipliedGroup.reduce((left, right) => {
      val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right)

      (i, j, leftBlock + rightBlock)
    })

    out.collect(blockMapper.blockIdFromCoo(reducedGroup._1,
reducedGroup._2), reducedGroup._3)
  }
}

The above described exception happens when I try to increase the matrices
sizes over 2000x2000 (rowsXcols). It means that my code works with 1000x1000
matrices, but not with 2000x2000 matrices and above.

I think it worths to mention also that the IndexOutOfBoundsException is
always seeking for index 109 (on different matrices sizes) and the size of
the Array is changing in a range (5-7). It looks like somehow the serialized
message are truncated right before their delivery.

I tried to follow several solutions, not in order what has not been worked:

- employing flink-1.2.0, flink-1.3.0
- updating flink kryo library to 3.0.3
- running on parallelism 1
- explicitly register my custom classes to Kryo
- varying the size of my blocks
- trying to increase akka.framesize

I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM.
6GB task manager heap size.
16384 numOfBuffers and 16384 networkBufferSize.

If I run the code on my laptop on 2000x2000 matrices, it works, likely due
to jumping off remote serialization.

I really hope someone could help here. It's becoming really painful...

Thank you so much.

Cheers, Andrea



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to