Hi Fabian, thanks for your reply. I guess I will have to change something in my code in general to keep the Hash Tables rather small.
Have a nice weekend! Best, Dennis Gesendet: Freitag, 07. Oktober 2016 um 13:31 Uhr Von: "Fabian Hueske" <fhue...@gmail.com> An: "dev@flink.apache.org" <dev@flink.apache.org> Betreff: Re: Re: Re: Flink Gelly Hi, the exception > java.lang.RuntimeException: Memory ran out. Compaction failed. says that the hash table ran out of memory. Gelly is implemented on top of Flink's DataSet API. So this would rather be a problem with DataSet than Gelly. I think Vasia is right about the memory configuration. Providing more memory usually is the solution to this problem. Another thing you can do is to try a hashtable in JVM memory instead of managed memory. See the options to set the solution set to unmanaged memory in the docs [1] Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/libs/gelly.html#iterative-graph-processing 2016-10-07 13:21 GMT+02:00 <d...@web.de>: > > > Hi again, > > I tried to change some configs and set the available amount memory to > 4096mb > but there is no difference at all. Furthermore I monitored the usage of of > RAM by > the JVM and it does not go beyond 3gb at all. > > I still don't think that the memory is the propblem. For me it looks like > an internal > problem with Gelly. > > Best, > Dennis > > > Gesendet: Freitag, 07. Oktober 2016 um 12:29 Uhr > Von: "Vasiliki Kalavri" <vasilikikala...@gmail.com> > An: dev@flink.apache.org > Betreff: Re: Re: Flink Gelly > Hi Dennis, > > On 7 October 2016 at 11:29, <d...@web.de> wrote: > > > Hi Vasia, > > > > thanks for your reply. > > > > Currently I am testing it on my normal workstation (16GB Ram) but I also > > tried it on out cluster. > > Both are failing at the same amount of nodes, so I guess it has something > > to do with Gelly > > or with the properties. > > > > The configured memory is default. I did not change it because I thought > > that flink is not the problem > > but I might be wrong. > > > > The default configured memory is only 512MB. If you have 16GB but you > don't let Flink know about it, it won't use it. > I see that your vertex values are Tuple2's of HashMap's of Lists and I > suspect these grow big. > If you're running the program from your IDE make sure to add a VM option. > If you're running the program from the command line, make sure to edit the > flink-conf.yaml file. You can find the available configuration options and > what they mean in the docs [1]. > > I hope this helps, > -Vasia. > > [1]: > https://ci.apache.org/projects/flink/flink-docs-[https://ci.apache.org/projects/flink/flink-docs-] > release-1.1/setup/config.html > > > > > > > > The Input should not be much... I wrote an API for Virtuoso which is > > requesting a RDF-graph. But > > I limited it to 10 Data Sets only. > > > > This is my code, it is a bit messy and their might be improvement: > > > > > > public static final class PathMessageFunction > > extends > > ScatterFunction<String, Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>, List<String>, String> { > > @Override > > public void sendMessages( > > Vertex<String, Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>> vertex) > > throws Exception { > > > > // The list "path" collects the ID's of the verticies a > > message was send to. > > > > List<String> path = new ArrayList<String>(); > > if (super.getSuperstepNumber() == 1) { > > path.add(vertex.getId()); > > } > > if (super.getSuperstepNumber() > 1) { > > for (String values : vertex.f1.f1.get(super.getSuperstepNumber() > > - 1)) { > > path.add(values + ";" + vertex.getId()); > > } > > } > > > > // The Path-List is send to the next neighbouring Nodes. > > > > for (Edge<String, String> edge : getEdges()) { > > sendMessageTo(edge.getTarget(), path); > > } > > } > > } > > > > > > > > public static final class PathUpdateFunction > > extends > > GatherFunction<String, Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>, List<String>> { > > @Override > > public void updateVertex( > > Vertex<String, Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>> vertex, > > MessageIterator<List<String>> messenger) > > throws Exception { > > > > List<String> newValues = new ArrayList<String>(); > > > > // The Path-List which was send as a message is also stored > > within the vertex value, therefore the Paths are saved to a new List > > "newValues". > > // This List should not contain the ID of the vertex itself to > > avoid cycles. > > > > for (List<String> msg : messenger) { > > for (String value : msg) { > > if (!value.contains(vertex.getId())) { > > newValues.add(value); > > } > > } > > } > > > > // Creation of a new HashMap with the new and old values for > > the setNewVertexValue function > > > > HashMap<Integer, List<String>> newHashMap = vertex.f1.f1; > > newHashMap.put(super.getSuperstepNumber(), newValues); > > > > > > HashMap<String, List<String>> multiPaths = new HashMap<String, > > List<String>>(); > > > > > > // Here it gets a bit complicated... However... I try to > > analyze the given paths for possible combinations of them. > > // For example... I got the path "a;b;c" and the patch > > "c;d;e", so I predict that "a;b;c;d;e" should also be possible. > > > > for (int i = 0; i < oriList.size(); i++) { > > > > String oriTemp = oriList.get(i); > > String destTemp = destList.get(i); > > > > String oriDest = oriTemp + destTemp; > > > > List<String> tempList = new ArrayList<String>(); > > List<String> setsWithOrigin = new ArrayList<String>(); > > List<String> setsWithDestination = new ArrayList<String>(); > > for (Entry<Integer, List<String>> entry : > > newHashMap.entrySet()) { > > for (String value : entry.getValue()) { > > if (value.contains(oriTemp)) { > > setsWithOrigin.add(value); > > } > > if (value.contains(destTemp)) { > > setsWithDestination.add(value); > > } > > } > > } > > for (String originIter : setsWithOrigin) { > > for (String destinationIter : setsWithDestination) { > > String concat = ""; > > if ((originIter.indexOf(oriTemp) == 0 && > > destinationIter > > .indexOf(destTemp) == 0)) { > > String reverse = destinationIter; > > if (destinationIter.length() > 1) { > > reverse = ""; > > int d = destinationIter.length(); > > for (int a = 0; a < > > destinationIter.length(); a++) { > > reverse = reverse > > + destinationIter.substring(d > > - 1, > > d); > > d--; > > } > > } > > concat = originIter + ";" + vertex.getId() + > > ";" > > + reverse; > > } > > if (isFormatValid(concat) && concat.length() > 0) { > > if (!tempList.contains(concat)) { > > tempList.add(concat); > > } > > } > > } > > } > > multiPaths.put(oriDest, tempList); > > } > > > > > > // The combined paths are also saved into a HashMap which is > > additionally set as a Vertex Value > > // Later the paths are filtered for redundance > > > > Tuple2<HashMap<String, List<String>>, HashMap<Integer, > > List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>( > > multiPaths, newHashMap); > > > > > > setNewVertexValue(testTuple3); > > } > > } > > > > > > Let me know if you need any further information. > > Thanks in advance. > > > > All the best, > > Dennis > > > > > > Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr > > Von: "Vasiliki Kalavri" <vasilikikala...@gmail.com> > > An: dev@flink.apache.org > > Betreff: Re: Flink Gelly > > Hi Dennis, > > > > can you give us some details about your setup? e.g. where you are running > > your job, your input size, the configured memory, etc. It would also be > > helpful if you could share your code. Getting an out of memory error with > > just 100 nodes seems weird. > > > > Best, > > -Vasia. > > > > On 6 October 2016 at 13:29, <d...@web.de> wrote: > > > > > > > > Dear ladies and gentlemen, > > > > > > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso > > > Graph into > > > Flink's Gelly and I want to analyze it for the different paths one can > > > take to link > > > the different nodes. Therefore I am using the ScatterGatherIteration. > > > However, my code just works with about ten to twenty nodes. When I try > to > > > upload > > > a hundred nodes, the following error occurs: > > > > > > Exception in thread "main" org.apache.flink.runtime. > > > client.JobExecutionException: Job execution failed. > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$ > > > mcV$sp(JobManager.scala:822) > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > > JobManager.scala:768) > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > > JobManager.scala:768) > > > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > > > liftedTree1$1(Future.scala:24) > > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > > > Future.scala:24) > > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > > > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > > > AbstractDispatcher.scala:401) > > > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > ForkJoinTask.java:260) > > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > > runTask(ForkJoinPool.java:1339) > > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > > ForkJoinPool.java:1979) > > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > > ForkJoinWorkerThread.java:107) > > > Caused by: java.lang.RuntimeException: Memory ran out. Compaction > failed. > > > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow > > > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory: > > > 33685504 Message: null > > > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > > > insertRecordIntoPartition(CompactingHashTable.java:457) > > > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > > > insertOrReplaceRecord(CompactingHashTable.java:392) > > > at org.apache.flink.runtime.iterative.io. > SolutionSetUpdateOutputCollect > > > or.collect(SolutionSetUpdateOutputCollector.java:54) > > > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue( > > > GatherFunction.java:123) > > > at org.apache.flink.quickstart.PathRank$PathUpdateFunction. > > > updateVertex(PathRank.java:357) > > > at org.apache.flink.graph.spargel.ScatterGatherIteration$ > > > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > > > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr > > > iver.run(CoGroupWithSolutionSetSecondDriver.java:218) > > > at org.apache.flink.runtime.operators.BatchTask.run( > BatchTask.java:486) > > > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run( > > > AbstractIterativeTask.java:146) > > > at org.apache.flink.runtime.iterative.task.IterationTailTask.run( > > > IterationTailTask.java:107) > > > at org.apache.flink.runtime.operators.BatchTask.invoke( > > BatchTask.java:351) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > I tried to google it a bit, and this problems seems to occur often when > > > using Gelly. I hope you have any ideas or approaches how I can handle > > this > > > error. > > > > > > Thank you in advance! > > > All the best, > > > Dennis > > > > > >