Hi Dennis, On 7 October 2016 at 11:29, <d...@web.de> wrote:
> Hi Vasia, > > thanks for your reply. > > Currently I am testing it on my normal workstation (16GB Ram) but I also > tried it on out cluster. > Both are failing at the same amount of nodes, so I guess it has something > to do with Gelly > or with the properties. > > The configured memory is default. I did not change it because I thought > that flink is not the problem > but I might be wrong. > The default configured memory is only 512MB. If you have 16GB but you don't let Flink know about it, it won't use it. I see that your vertex values are Tuple2's of HashMap's of Lists and I suspect these grow big. If you're running the program from your IDE make sure to add a VM option. If you're running the program from the command line, make sure to edit the flink-conf.yaml file. You can find the available configuration options and what they mean in the docs [1]. I hope this helps, -Vasia. [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html > > The Input should not be much... I wrote an API for Virtuoso which is > requesting a RDF-graph. But > I limited it to 10 Data Sets only. > > This is my code, it is a bit messy and their might be improvement: > > > public static final class PathMessageFunction > extends > ScatterFunction<String, Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>, List<String>, String> { > @Override > public void sendMessages( > Vertex<String, Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>> vertex) > throws Exception { > > // The list "path" collects the ID's of the verticies a > message was send to. > > List<String> path = new ArrayList<String>(); > if (super.getSuperstepNumber() == 1) { > path.add(vertex.getId()); > } > if (super.getSuperstepNumber() > 1) { > for (String values : > vertex.f1.f1.get(super.getSuperstepNumber() > - 1)) { > path.add(values + ";" + vertex.getId()); > } > } > > // The Path-List is send to the next neighbouring Nodes. > > for (Edge<String, String> edge : getEdges()) { > sendMessageTo(edge.getTarget(), path); > } > } > } > > > > public static final class PathUpdateFunction > extends > GatherFunction<String, Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>, List<String>> { > @Override > public void updateVertex( > Vertex<String, Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>> vertex, > MessageIterator<List<String>> messenger) > throws Exception { > > List<String> newValues = new ArrayList<String>(); > > // The Path-List which was send as a message is also stored > within the vertex value, therefore the Paths are saved to a new List > "newValues". > // This List should not contain the ID of the vertex itself to > avoid cycles. > > for (List<String> msg : messenger) { > for (String value : msg) { > if (!value.contains(vertex.getId())) { > newValues.add(value); > } > } > } > > // Creation of a new HashMap with the new and old values for > the setNewVertexValue function > > HashMap<Integer, List<String>> newHashMap = vertex.f1.f1; > newHashMap.put(super.getSuperstepNumber(), newValues); > > > HashMap<String, List<String>> multiPaths = new HashMap<String, > List<String>>(); > > > // Here it gets a bit complicated... However... I try to > analyze the given paths for possible combinations of them. > // For example... I got the path "a;b;c" and the patch > "c;d;e", so I predict that "a;b;c;d;e" should also be possible. > > for (int i = 0; i < oriList.size(); i++) { > > String oriTemp = oriList.get(i); > String destTemp = destList.get(i); > > String oriDest = oriTemp + destTemp; > > List<String> tempList = new ArrayList<String>(); > List<String> setsWithOrigin = new ArrayList<String>(); > List<String> setsWithDestination = new ArrayList<String>(); > for (Entry<Integer, List<String>> entry : > newHashMap.entrySet()) { > for (String value : entry.getValue()) { > if (value.contains(oriTemp)) { > setsWithOrigin.add(value); > } > if (value.contains(destTemp)) { > setsWithDestination.add(value); > } > } > } > for (String originIter : setsWithOrigin) { > for (String destinationIter : setsWithDestination) { > String concat = ""; > if ((originIter.indexOf(oriTemp) == 0 && > destinationIter > .indexOf(destTemp) == 0)) { > String reverse = destinationIter; > if (destinationIter.length() > 1) { > reverse = ""; > int d = destinationIter.length(); > for (int a = 0; a < > destinationIter.length(); a++) { > reverse = reverse > + destinationIter.substring(d > - 1, > d); > d--; > } > } > concat = originIter + ";" + vertex.getId() + > ";" > + reverse; > } > if (isFormatValid(concat) && concat.length() > 0) { > if (!tempList.contains(concat)) { > tempList.add(concat); > } > } > } > } > multiPaths.put(oriDest, tempList); > } > > > // The combined paths are also saved into a HashMap which is > additionally set as a Vertex Value > // Later the paths are filtered for redundance > > Tuple2<HashMap<String, List<String>>, HashMap<Integer, > List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>( > multiPaths, newHashMap); > > > setNewVertexValue(testTuple3); > } > } > > > Let me know if you need any further information. > Thanks in advance. > > All the best, > Dennis > > > Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr > Von: "Vasiliki Kalavri" <vasilikikala...@gmail.com> > An: dev@flink.apache.org > Betreff: Re: Flink Gelly > Hi Dennis, > > can you give us some details about your setup? e.g. where you are running > your job, your input size, the configured memory, etc. It would also be > helpful if you could share your code. Getting an out of memory error with > just 100 nodes seems weird. > > Best, > -Vasia. > > On 6 October 2016 at 13:29, <d...@web.de> wrote: > > > > > Dear ladies and gentlemen, > > > > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso > > Graph into > > Flink's Gelly and I want to analyze it for the different paths one can > > take to link > > the different nodes. Therefore I am using the ScatterGatherIteration. > > However, my code just works with about ten to twenty nodes. When I try to > > upload > > a hundred nodes, the following error occurs: > > > > Exception in thread "main" org.apache.flink.runtime. > > client.JobExecutionException: Job execution failed. > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$ > > mcV$sp(JobManager.scala:822) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > JobManager.scala:768) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > JobManager.scala:768) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > > liftedTree1$1(Future.scala:24) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > > Future.scala:24) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > > AbstractDispatcher.scala:401) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > runTask(ForkJoinPool.java:1339) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > ForkJoinPool.java:1979) > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > ForkJoinWorkerThread.java:107) > > Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed. > > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow > > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory: > > 33685504 Message: null > > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > > insertRecordIntoPartition(CompactingHashTable.java:457) > > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > > insertOrReplaceRecord(CompactingHashTable.java:392) > > at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect > > or.collect(SolutionSetUpdateOutputCollector.java:54) > > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue( > > GatherFunction.java:123) > > at org.apache.flink.quickstart.PathRank$PathUpdateFunction. > > updateVertex(PathRank.java:357) > > at org.apache.flink.graph.spargel.ScatterGatherIteration$ > > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr > > iver.run(CoGroupWithSolutionSetSecondDriver.java:218) > > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run( > > AbstractIterativeTask.java:146) > > at org.apache.flink.runtime.iterative.task.IterationTailTask.run( > > IterationTailTask.java:107) > > at org.apache.flink.runtime.operators.BatchTask.invoke( > BatchTask.java:351) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > > at java.lang.Thread.run(Thread.java:745) > > > > > > I tried to google it a bit, and this problems seems to occur often when > > using Gelly. I hope you have any ideas or approaches how I can handle > this > > error. > > > > Thank you in advance! > > All the best, > > Dennis > > >