Thanks for the hint.

I focused on it and get a strange behavior.
If I change from EdgeDirection.ALL (what I need) to EdgeDirection.OUT (or .IN), 
everything seems okey. The sublist operation was still active.

Then I replaced the sublist with the entire list and there was no exception 
(EdgeDirection.All/IN/OUT worked). In case of .ALL the algorithm ran until 
„maximumNumberOfIterations“. How can I limit it and how can I set the 
termination conditions?


Best,
Marc

Am 10.05.2017 um 18:18 schrieb Stephan Ewen 
<se...@apache.org<mailto:se...@apache.org>>:

Looks like java.util.ArrayList$SubList does not work out of the box with Kryo / 
Flink.

Try registering a custom serializer for it...

On Wed, May 10, 2017 at 4:16 PM, Kaepke, Marc 
<marc.kae...@haw-hamburg.de<mailto:marc.kae...@haw-hamburg.de>> wrote:
Hi,

a part of my bachelor thesis is an implementation of the Semi-Clustering 
algorithm [1].
I’m using the Scatter-Gather-Iteration. Each vertex has to know its neighbors 
and the edge-value between of that. Because Gelly’s vertex doesn’t provide both 
information, I wrote an CustomVertexValue class.  An object contains a set of 
edges and a list of another custom class called SemiCluster, which contains a 
list of vertex and a double value.
Now I’m able to create vertices like Vertex<Double, CustomVertexValue>.

Unfortunately I get an exception if I run my Scatter-Gather-Iteration.

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:900)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: null
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at 
org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:203)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at 
org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
due to an exception: null
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.NullPointerException
at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1230)
at java.util.ArrayList$SubList.size(ArrayList.java:1040)
at java.util.AbstractList.add(AbstractList.java:108)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:246)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at 
org.apache.flink.runtime.io<http://org.apache.flink.runtime.io/>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
at 
org.apache.flink.runtime.io<http://org.apache.flink.runtime.io/>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at 
org.apache.flink.runtime.io<http://org.apache.flink.runtime.io/>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:973)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Process finished with exit code 1

———————————

Maybe Gelly isn’t able to handle my CustomVertexValue!?

The ScatterFunctions works fine.


Thanks a lot
Best,
Marc


[1] http://www.dcs.bbk.ac.uk/~dell/teaching/cc/paper/sigmod10/p135-malewicz.pdf 
chapter 5.4 (page 141)




Reply via email to