Hi guys,

thank you for your interest. Yes @Flavio, I tried both 1.2.0 and 1.3.0
versions. 
Following Gordon suggestion I tried to put setReference to false but sadly
it didn't help. What I did then was to declare a custom serializer as the
following:

class BlockSerializer extends Serializer[Block] with Serializable {

    override def read(kryo: Kryo, input: Input, block: Class[Block]): Block
= {
      val serializer = new SparseMatrixSerializer

      val blockData = kryo.readObject(input, classOf[SparseMatrix],
serializer)
      new Block(blockData)
    }

    override def write(kryo: Kryo, output: Output, block: Block): Unit = {
      val serializer = new SparseMatrixSerializer

      kryo.register(classOf[SparseMatrix], serializer)
      kryo.writeObject(output, block.blockData, serializer)

      output.close()
    }

  }

  class SparseMatrixSerializer extends Serializer[SparseMatrix] with
Serializable {

    override def read(kryo: Kryo, input: Input, sparse:
Class[SparseMatrix]): SparseMatrix = {
      val collectionIntSerializer = new CollectionSerializer()
      collectionIntSerializer.setElementClass(classOf[Int], new
IntSerializer)
      val collectionDoubleSerializer = new CollectionSerializer()
      collectionDoubleSerializer.setElementClass(classOf[Double], new
DoubleSerializer)

      val numRows = input.readInt
      val numCols = input.readInt
      val colPtrs = kryo.readObject(input,
classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray
      val rowIndices = kryo.readObject(input,
classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray
      val data = kryo.readObject(input,
classOf[java.util.ArrayList[Double]],
collectionDoubleSerializer).asScala.toArray

      input.close()

      new SparseMatrix(numRows = numRows, numCols = numCols, colPtrs =
colPtrs, rowIndices = rowIndices, data = data)
    }

    override def write(kryo: Kryo, output: Output, sparseMatrix:
SparseMatrix): Unit = {

      val collectionIntSerializer = new CollectionSerializer()
      collectionIntSerializer.setElementClass(classOf[Int], new
IntSerializer)

      val collectionDoubleSerializer = new CollectionSerializer()
      collectionDoubleSerializer.setElementClass(classOf[Double], new
DoubleSerializer)

      kryo.register(classOf[java.util.ArrayList[Int]],
collectionIntSerializer)
      kryo.register(classOf[java.util.ArrayList[Double]],
collectionDoubleSerializer)

      output.writeInt(sparseMatrix.numRows)
      output.writeInt(sparseMatrix.numCols)
      kryo.writeObject(output, sparseMatrix.colPtrs.toList.asJava,
collectionIntSerializer)
      kryo.writeObject(output, sparseMatrix.rowIndices.toList.asJava,
collectionIntSerializer)
      kryo.writeObject(output, sparseMatrix.data.toList.asJava,
collectionDoubleSerializer)

      output.close()
    }

  }

  What I obtained is the same previous exception but on different accessed
index and size.

  Caused by: java.lang.Exception: The data preparation for task 'CHAIN
GroupReduce (GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:189))'
, caused an error: Error obtaining the sorted input: Thread 'SortMerger
Reading Thread' terminated due to an exception: Index: 1, Size: 0
        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1,
Size: 0
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
        at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: Index: 1, Size: 0
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.set(ArrayList.java:444)
        at
com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:680)
        at
my.org.path.benchmarks.matrices.flink.SerializationBlah$BlockSerializer.read(MatrixMultiplication.scala:85)
        at
my.org.path.benchmarks.matrices.flink.SerializationBlah$BlockSerializer.read(MatrixMultiplication.scala:80)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:120)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:31)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:120)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:31)
        at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
        at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
        at
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
        at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)
        at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
        at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)

Does it might help somehow?

Thank you again,

Andrea



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558p13596.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to