Hi Andrea,

I’ve rallied back to this and wanted to check on the status. Have you managed 
to solve this in the end, or is this still a problem for you?

If it’s still a problem, would you be able to provide a complete runnable 
example job that can reproduce the problem (ideally via a git branch I can 
clone and run :))?
This would help me with digging a bit more into the issue. Thanks a lot!

Best,
Gordon


On 8 June 2017 at 6:58:46 PM, Andrea Spina (andrea.sp...@radicalbit.io) wrote:

Hi guys,  

thank you for your interest. Yes @Flavio, I tried both 1.2.0 and 1.3.0  
versions.  
Following Gordon suggestion I tried to put setReference to false but sadly  
it didn't help. What I did then was to declare a custom serializer as the  
following:  

class BlockSerializer extends Serializer[Block] with Serializable {  

override def read(kryo: Kryo, input: Input, block: Class[Block]): Block  
= {  
val serializer = new SparseMatrixSerializer  

val blockData = kryo.readObject(input, classOf[SparseMatrix],  
serializer)  
new Block(blockData)  
}  

override def write(kryo: Kryo, output: Output, block: Block): Unit = {  
val serializer = new SparseMatrixSerializer  

kryo.register(classOf[SparseMatrix], serializer)  
kryo.writeObject(output, block.blockData, serializer)  

output.close()  
}  

}  

class SparseMatrixSerializer extends Serializer[SparseMatrix] with  
Serializable {  

override def read(kryo: Kryo, input: Input, sparse:  
Class[SparseMatrix]): SparseMatrix = {  
val collectionIntSerializer = new CollectionSerializer()  
collectionIntSerializer.setElementClass(classOf[Int], new  
IntSerializer)  
val collectionDoubleSerializer = new CollectionSerializer()  
collectionDoubleSerializer.setElementClass(classOf[Double], new  
DoubleSerializer)  

val numRows = input.readInt  
val numCols = input.readInt  
val colPtrs = kryo.readObject(input,  
classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray  
val rowIndices = kryo.readObject(input,  
classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray  
val data = kryo.readObject(input,  
classOf[java.util.ArrayList[Double]],  
collectionDoubleSerializer).asScala.toArray  

input.close()  

new SparseMatrix(numRows = numRows, numCols = numCols, colPtrs =  
colPtrs, rowIndices = rowIndices, data = data)  
}  

override def write(kryo: Kryo, output: Output, sparseMatrix:  
SparseMatrix): Unit = {  

val collectionIntSerializer = new CollectionSerializer()  
collectionIntSerializer.setElementClass(classOf[Int], new  
IntSerializer)  

val collectionDoubleSerializer = new CollectionSerializer()  
collectionDoubleSerializer.setElementClass(classOf[Double], new  
DoubleSerializer)  

kryo.register(classOf[java.util.ArrayList[Int]],  
collectionIntSerializer)  
kryo.register(classOf[java.util.ArrayList[Double]],  
collectionDoubleSerializer)  

output.writeInt(sparseMatrix.numRows)  
output.writeInt(sparseMatrix.numCols)  
kryo.writeObject(output, sparseMatrix.colPtrs.toList.asJava,  
collectionIntSerializer)  
kryo.writeObject(output, sparseMatrix.rowIndices.toList.asJava,  
collectionIntSerializer)  
kryo.writeObject(output, sparseMatrix.data.toList.asJava,  
collectionDoubleSerializer)  

output.close()  
}  

}  

What I obtained is the same previous exception but on different accessed  
index and size.  

Caused by: java.lang.Exception: The data preparation for task 'CHAIN  
GroupReduce (GroupReduce at  
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
  
-> Map (Map at  
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:189))'
  
, caused an error: Error obtaining the sorted input: Thread 'SortMerger  
Reading Thread' terminated due to an exception: Index: 1, Size: 0  
at  
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)  
at  
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)  
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)  
at java.lang.Thread.run(Thread.java:745)  
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:  
Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1,  
Size: 0  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
  
at  
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)  
at  
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
  
at  
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)  
... 3 more  
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'  
terminated due to an exception: Index: 1, Size: 0  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
  
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0  
at java.util.ArrayList.rangeCheck(ArrayList.java:653)  
at java.util.ArrayList.set(ArrayList.java:444)  
at  
com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
  
at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)  
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:680)  
at  
my.org.path.benchmarks.matrices.flink.SerializationBlah$BlockSerializer.read(MatrixMultiplication.scala:85)
  
at  
my.org.path.benchmarks.matrices.flink.SerializationBlah$BlockSerializer.read(MatrixMultiplication.scala:80)
  
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)  
at  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
  
at  
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:120)
  
at  
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:31)
  
at  
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:120)
  
at  
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
  
at  
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:31)
  
at  
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
  
at  
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
  
at  
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
  
at  
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)
  
at  
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
  
at  
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
  

Does it might help somehow?  

Thank you again,  

Andrea  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558p13596.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  

Reply via email to