Hi Marc, Does the problem still persist? Could you try running it with the custom Kryo serialiser that Flavio posted here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/com-esotericsoftware-kryo-KryoException-and-java-lang-IndexOutOfBoundsException-tp7459p7461.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/com-esotericsoftware-kryo-KryoException-and-java-lang-IndexOutOfBoundsException-tp7459p7461.html>
Or, better yet. Try implementing your custom type in such a way that it doesn’t need to Kryo serialiser but can work with the PojoSerializer. For this you need to ensure that the type as a zero-argument constructor and that all fields are either public or have public getters/setters. Best, Aljoscha > On 8. May 2017, at 19:39, Kaepke, Marc <marc.kae...@haw-hamburg.de> wrote: > > Hi, > > did some had an answer or solution? > > Best > Marc > >> Am 05.05.2017 um 20:05 schrieb Kaepke, Marc <marc.kae...@haw-hamburg.de >> <mailto:marc.kae...@haw-hamburg.de>>: >> >> Hi everyone, >> >> what does mean that following exception, if I run my gelly program? >> >> Exception in thread "main" >> org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:900) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >> Thread 'SortMerger Reading Thread' terminated due to an exception: null >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094) >> at >> org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:203) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) >> at >> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) >> at >> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >> terminated due to an exception: null >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >> Caused by: java.lang.NullPointerException >> at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1230) >> at java.util.ArrayList$SubList.size(ArrayList.java:1040) >> at java.util.AbstractList.add(AbstractList.java:108) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:246) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> at >> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109) >> at >> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) >> at >> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42) >> at >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:973) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >> >> Process finished with exit code 1 >> ————————————————————— >> >> During an iteration (GatherFunction) I will update a custom class >> >> if(!allNewSemiClusters.isEmpty()) { >> for(SemiCluster semiCluster : allNewSemiClusters) { >> existingSemiClusters.add(semiCluster); >> } >> I don’t get the error above, if I comment out the second to last line. >> >> >> Any ideas? >> >> >> Best, >> Marc >> >> >