Yes it looks very similar to the Exception I experienced (
https://issues.apache.org/jira/browse/FLINK-6398) but my error was more
related to Row serialization/deserialization (see [1]) while this looks
more like something related to Kryo. However also with Flink 1.3.0 the
error seems to appear from what I understood so it can't be the same
problem.

Once I had a very similar problems (see [2] and [3]) and I was able to
avoid the problem removing the reuse of input and output within
KryoSerializer (as discuss in [3])/
I hope that could help..

Best,
Flavio

[1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
UnilateralSortMerger-error-again-td12680.html
[2] http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/Weird-Kryo-exception-Unable-to-find-class-java-ttil-HashSet-
td6927i20.html
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/com-esotericsoftware-kryo-KryoException-and-java-lang-IndexOutOfBoundsException-td7459.html

On Thu, Jun 8, 2017 at 11:39 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> @Flavio, doesn’t this look like the exception you often encountered a
> while back? If I remember correctly that was fixed by Kurt, right?
>
> Best,
> Aljoscha
>
> On 7. Jun 2017, at 18:11, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
>
> Hi Andrea,
>
> I did some quick issue searching, and it seems like this is a frequently
> asked issue on Kryo: https://github.com/EsotericSoftware/kryo/issues/428.
>
> I can’t be sure at the moment if the resolution / workaround mentioned in
> there makes sense, I’ll have to investigate a bit more.
>
> Also, to clarify: from the stack trace, it seems like you’re simply using
> whatever serializer Kryo defaults to (i.e. FieldSerializer), and not
> registering your own, is that correct?
>
> In the meanwhile, could you also try the following and rebuild Flink, and
> test to see if it works?:
> on https://github.com/apache/flink/blob/master/flink-core/
> src/main/java/org/apache/flink/api/java/typeutils/
> runtime/kryo/KryoSerializer.java#L349, change setReferences to false.
>
> Cheers,
> Gordon
>
>
> On 7 June 2017 at 3:39:55 PM, Andrea Spina (andrea.sp...@radicalbit.io)
> wrote:
>
> Good afternoon dear Community,
>
> Since few days I'm really struggling to understand the reason behind this
> KryoException. Here the stack trace.
>
> 2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask
>
> - Error in task code: CHAIN GroupReduce (GroupReduce at
> my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(
> BlockMatrix.scala:103))
> -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat
> ion$.main(MatrixMultiplication.scala:46)) (1/1)
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$
> times(B
> lockMatrix.scala:103)) -> Map (Map at
> my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(
> MatrixMultiplication.scala:46))'
> , caused an error: E
> rror obtaining the sorted input: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.lang.IndexOutOfBoundsException:
> Index:
> 109, Size: 5
> Serialization trace:
> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.lang.IndexOu
> tOfBoundsException: Index: 109, Size: 5
> Serialization trace:
> blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(
> UnilateralSortMerger.java:619)
> at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
>
> at
> org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.lang.IndexOutOfBoundsException:
> Index:
> 109, Size: 5
> Serialization trace:
> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
> Serialization trace:
> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(
> ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.
> read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:250)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:264)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(
> KryoSerializer.java:274)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(
> TupleSerializerBase.java:98)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(
> TupleSerializerBase.java:98)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(
> TupleSerializerBase.java:98)
> at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(
> NormalizedKeySorter.java:519)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> SpillingThread.go(UnilateralSortMerger.java:1344)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> at java.util.ArrayList.get(ArrayList.java:429)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
> MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(
> ObjectField.java:106)
> ... 11 more
> 2017-06-07 10:18:52,594 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Memory usage
> stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB
> (used/committed/max)]
> 2017-06-07 10:18:52,766 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Direct
> memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281
> 2017-06-07 10:18:52,766 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Off-heap
> pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace:
> 57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB
> (used/committed/max)]
> 2017-06-07 10:18:52,766 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Garbage
> collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97],
>
> [G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1]
> 2017-06-07 10:18:52,841 INFO org.apache.flink.runtime.taskmanager.Task
> - CHAIN GroupReduce (GroupReduce at
> my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(
> BlockMatrix.scala:103))
> -> Map (Map at
> my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(
> MatrixMultiplication.scala:46))
> (1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED.
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at
> my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(
> BlockMatrix.scala:103))
> -> Map (Map at
> my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(
> MatrixMultiplication.scala:46))'
> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
> spilling thread' terminated due to an exception:
> java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
> Serialization trace:
> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
> Serialization trace:
> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(
> UnilateralSortMerger.java:619)
> at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
>
> at
> org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.lang.IndexOutOfBoundsException:
> Index:
> 109, Size: 5
> Serialization trace:
> blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(
> ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.
> read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:250)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:264)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(
> KryoSerializer.java:274)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(
> TupleSerializerBase.java:98)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(
> TupleSerializerBase.java:98)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(
> TupleSerializerBase.java:98)
> at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(
> NormalizedKeySorter.java:519)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> SpillingThread.go(UnilateralSortMerger.java:1344)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> at java.util.ArrayList.get(ArrayList.java:429)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
> MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(
> ObjectField.java:106)
> ... 11 more
>
>
> What I'm doing basically is a product between matrices: I load the matrices
>
> COO formatted; the Block class is the following (really much inspired to
> this https://issues.apache.org/jira/browse/FLINK-3920).
>
>
> import breeze.linalg.{Matrix => BreezeMatrix}
> import org.apache.flink.ml.math.Breeze._
> import org.apache.flink.ml.math.{Matrix, SparseMatrix}
>
> class Block(val blockData: Matrix) extends MatrixLayout with Serializable {
>
>
> def data: Matrix = blockData
>
> def toBreeze: BreezeMatrix[Double] = blockData.asBreeze
>
> def numRows: Int = data.numRows
>
> def numCols: Int = data.numCols
>
> def *(other: Block): Block = {
>
> require(this.numCols == other.numRows)
>
> Block((blockData.asBreeze * other.toBreeze).fromBreeze)
> }
>
> def +(other: Block): Block =
> Block((blockData.asBreeze + other.toBreeze).fromBreeze)
>
> def unary_+(other: Block): Block = this + other
>
> override def equals(other: Any): Boolean = {
> other match {
> case block: Block => this.blockData.equalsMatrix(block.blockData)
> case _ => false
> }
> }
>
> }
>
> The block matrix is a matrix of blocks, the implicated group reduce
> function
> it's the last step of the product function.
>
> class SumGroupOfBlocks(blockMapper: BlockMapper)
> extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)),
> (BlockID, Block)] {
>
> override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int,
> Int, Block))], out: Collector[(BlockID, Block)])
> : Unit = {
>
> val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect {
> case ((i, j, left), (x, y, right)) => (i, y, left * right)
> }.toSeq
>
> val reducedGroup = multipliedGroup.reduce((left, right) => {
> val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right)
>
> (i, j, leftBlock + rightBlock)
> })
>
> out.collect(blockMapper.blockIdFromCoo(reducedGroup._1,
> reducedGroup._2), reducedGroup._3)
> }
> }
>
> The above described exception happens when I try to increase the matrices
> sizes over 2000x2000 (rowsXcols). It means that my code works with
> 1000x1000
> matrices, but not with 2000x2000 matrices and above.
>
> I think it worths to mention also that the IndexOutOfBoundsException is
> always seeking for index 109 (on different matrices sizes) and the size of
>
> the Array is changing in a range (5-7). It looks like somehow the
> serialized
> message are truncated right before their delivery.
>
> I tried to follow several solutions, not in order what has not been worked:
>
>
> - employing flink-1.2.0, flink-1.3.0
> - updating flink kryo library to 3.0.3
> - running on parallelism 1
> - explicitly register my custom classes to Kryo
> - varying the size of my blocks
> - trying to increase akka.framesize
>
> I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM.
> 6GB task manager heap size.
> 16384 numOfBuffers and 16384 networkBufferSize.
>
> If I run the code on my laptop on 2000x2000 matrices, it works, likely due
>
> to jumping off remote serialization.
>
> I really hope someone could help here. It's becoming really painful...
>
> Thank you so much.
>
> Cheers, Andrea
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Painful-
> KryoException-java-lang-IndexOutOfBoundsException-on-
> Flink-Batch-Api-scala-tp13558.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
>
>

Reply via email to