Hi Dennis,

On 7 October 2016 at 11:29, <d...@web.de> wrote:

> Hi Vasia,
>
> thanks for your reply.
>
> Currently I am testing it on my normal workstation (16GB Ram) but I also
> tried it on out cluster.
> Both are failing at the same amount of nodes, so I guess it has something
> to do with Gelly
> or with the properties.
>
> The configured memory is default. I did not change it because I thought
> that flink is not the problem
> but I might be wrong.
>

​The default configured​ memory is only 512MB. If you have 16GB but you
don't let Flink know about it, it won't use it.
I see that your vertex values are Tuple2's of HashMap's of Lists and I
suspect these grow big.
If you're running the program from your IDE make sure to add a VM option.
If you're running the program from the command line, make sure to edit the
flink-conf.yaml file. You can find the available configuration options and
what they mean in the docs [1].

​I hope this helps,
-Vasia.​

​[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html
​



>
> The Input should not be much... I wrote an API for Virtuoso which is
> requesting a RDF-graph. But
> I limited it to 10 Data Sets only.
>
> This is my code, it is a bit messy and their might be improvement:
>
>
> public static final class PathMessageFunction
>             extends
>             ScatterFunction<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>, List<String>, String> {
>         @Override
>         public void sendMessages(
>                 Vertex<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>> vertex)
>                 throws Exception {
>
>             // The list "path" collects the ID's of the verticies a
> message was send to.
>
>             List<String> path = new ArrayList<String>();
>             if (super.getSuperstepNumber() == 1) {
>                 path.add(vertex.getId());
>             }
>             if (super.getSuperstepNumber() > 1) {
>                 for (String values : 
> vertex.f1.f1.get(super.getSuperstepNumber()
> - 1)) {
>                             path.add(values + ";" + vertex.getId());
>                 }
>             }
>
>             // The Path-List is send to the next neighbouring Nodes.
>
>             for (Edge<String, String> edge : getEdges()) {
>                 sendMessageTo(edge.getTarget(), path);
>             }
>         }
>     }
>
>
>
>     public static final class PathUpdateFunction
>             extends
>             GatherFunction<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>, List<String>> {
>         @Override
>         public void updateVertex(
>                 Vertex<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>> vertex,
>                 MessageIterator<List<String>> messenger)
>                 throws Exception {
>
>             List<String> newValues = new ArrayList<String>();
>
>             // The Path-List which was send as a message is also stored
> within the vertex value, therefore the Paths are saved to a new List
> "newValues".
>             // This List should not contain the ID of the vertex itself to
> avoid cycles.
>
>             for (List<String> msg : messenger) {
>                 for (String value : msg) {
>                     if (!value.contains(vertex.getId())) {
>                         newValues.add(value);
>                     }
>                 }
>             }
>
>             // Creation of a new HashMap with the new and old values for
> the setNewVertexValue function
>
>             HashMap<Integer, List<String>> newHashMap = vertex.f1.f1;
>             newHashMap.put(super.getSuperstepNumber(), newValues);
>
>
>             HashMap<String, List<String>> multiPaths = new HashMap<String,
> List<String>>();
>
>
>             // Here it gets a bit complicated... However... I try to
> analyze the given paths for possible combinations of them.
>             // For example... I got the path "a;b;c" and the patch
> "c;d;e", so I predict that "a;b;c;d;e" should also be possible.
>
>             for (int i = 0; i < oriList.size(); i++) {
>
>                 String oriTemp = oriList.get(i);
>                 String destTemp = destList.get(i);
>
>                 String oriDest = oriTemp + destTemp;
>
>                 List<String> tempList = new ArrayList<String>();
>                 List<String> setsWithOrigin = new ArrayList<String>();
>                 List<String> setsWithDestination = new ArrayList<String>();
>                 for (Entry<Integer, List<String>> entry :
> newHashMap.entrySet()) {
>                     for (String value : entry.getValue()) {
>                         if (value.contains(oriTemp)) {
>                             setsWithOrigin.add(value);
>                         }
>                         if (value.contains(destTemp)) {
>                             setsWithDestination.add(value);
>                         }
>                     }
>                 }
>                 for (String originIter : setsWithOrigin) {
>                     for (String destinationIter : setsWithDestination) {
>                         String concat = "";
>                         if ((originIter.indexOf(oriTemp) == 0 &&
> destinationIter
>                                 .indexOf(destTemp) == 0)) {
>                             String reverse = destinationIter;
>                             if (destinationIter.length() > 1) {
>                                 reverse = "";
>                                 int d = destinationIter.length();
>                                 for (int a = 0; a <
> destinationIter.length(); a++) {
>                                     reverse = reverse
>                                             + destinationIter.substring(d
> - 1,
>                                                     d);
>                                     d--;
>                                 }
>                             }
>                             concat = originIter + ";" + vertex.getId() +
> ";"
>                                     + reverse;
>                         }
>                         if (isFormatValid(concat) && concat.length() > 0) {
>                             if (!tempList.contains(concat)) {
>                                 tempList.add(concat);
>                             }
>                         }
>                     }
>                 }
>                 multiPaths.put(oriDest, tempList);
>             }
>
>
>             // The combined paths are also saved into a HashMap which is
> additionally set as a Vertex Value
>             // Later the paths are filtered for redundance
>
>             Tuple2<HashMap<String, List<String>>, HashMap<Integer,
> List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>(
>                     multiPaths, newHashMap);
>
>
>             setNewVertexValue(testTuple3);
>         }
>     }
>
>
> Let me know if you need any further information.
> Thanks in advance.
>
> All the best,
> Dennis
>
>
> Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr
> Von: "Vasiliki Kalavri" <vasilikikala...@gmail.com>
> An: dev@flink.apache.org
> Betreff: Re: Flink Gelly
> Hi Dennis,
>
> can you give us some details about your setup? e.g. where you are running
> your job, your input size, the configured memory, etc. It would also be
> helpful if you could share your code. Getting an out of memory error with
> just 100 nodes seems weird.
>
> Best,
> -Vasia.
>
> On 6 October 2016 at 13:29, <d...@web.de> wrote:
>
> >
> > Dear ladies and gentlemen,
> >
> > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso
> > Graph into
> > Flink's Gelly and I want to analyze it for the different paths one can
> > take to link
> > the different nodes. Therefore I am using the ScatterGatherIteration.
> > However, my code just works with about ten to twenty nodes. When I try to
> > upload
> > a hundred nodes, the following error occurs:
> >
> > Exception in thread "main" org.apache.flink.runtime.
> > client.JobExecutionException: Job execution failed.
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> > mcV$sp(JobManager.scala:822)
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> JobManager.scala:768)
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> JobManager.scala:768)
> > at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > liftedTree1$1(Future.scala:24)
> > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> > Future.scala:24)
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > AbstractDispatcher.scala:401)
> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > runTask(ForkJoinPool.java:1339)
> > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > ForkJoinPool.java:1979)
> > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > ForkJoinWorkerThread.java:107)
> > Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed.
> > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow
> > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory:
> > 33685504 Message: null
> > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > insertRecordIntoPartition(CompactingHashTable.java:457)
> > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > insertOrReplaceRecord(CompactingHashTable.java:392)
> > at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect
> > or.collect(SolutionSetUpdateOutputCollector.java:54)
> > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue(
> > GatherFunction.java:123)
> > at org.apache.flink.quickstart.PathRank$PathUpdateFunction.
> > updateVertex(PathRank.java:357)
> > at org.apache.flink.graph.spargel.ScatterGatherIteration$
> > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr
> > iver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> > AbstractIterativeTask.java:146)
> > at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> > IterationTailTask.java:107)
> > at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:351)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> > at java.lang.Thread.run(Thread.java:745)
> >
> >
> > I tried to google it a bit, and this problems seems to occur often when
> > using Gelly. I hope you have any ideas or approaches how I can handle
> this
> > error.
> >
> > Thank you in advance!
> > All the best,
> > Dennis
> >
>

Reply via email to