Hi again,
 
I tried to change some configs and set the available amount memory to 4096mb 
but there is no difference at all. Furthermore I monitored the usage of of RAM 
by
the JVM and it does not go beyond 3gb at all. 

I still don't think that the memory is the propblem. For me it looks like an 
internal
problem with Gelly. 

Best,
Dennis 
 

Gesendet: Freitag, 07. Oktober 2016 um 12:29 Uhr
Von: "Vasiliki Kalavri" <vasilikikala...@gmail.com>
An: dev@flink.apache.org
Betreff: Re: Re: Flink Gelly
Hi Dennis,

On 7 October 2016 at 11:29, <d...@web.de> wrote:

> Hi Vasia,
>
> thanks for your reply.
>
> Currently I am testing it on my normal workstation (16GB Ram) but I also
> tried it on out cluster.
> Both are failing at the same amount of nodes, so I guess it has something
> to do with Gelly
> or with the properties.
>
> The configured memory is default. I did not change it because I thought
> that flink is not the problem
> but I might be wrong.
>

​The default configured​ memory is only 512MB. If you have 16GB but you
don't let Flink know about it, it won't use it.
I see that your vertex values are Tuple2's of HashMap's of Lists and I
suspect these grow big.
If you're running the program from your IDE make sure to add a VM option.
If you're running the program from the command line, make sure to edit the
flink-conf.yaml file. You can find the available configuration options and
what they mean in the docs [1].

​I hope this helps,
-Vasia.​

​[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html
​



>
> The Input should not be much... I wrote an API for Virtuoso which is
> requesting a RDF-graph. But
> I limited it to 10 Data Sets only.
>
> This is my code, it is a bit messy and their might be improvement:
>
>
> public static final class PathMessageFunction
> extends
> ScatterFunction<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>, List<String>, String> {
> @Override
> public void sendMessages(
> Vertex<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>> vertex)
> throws Exception {
>
> // The list "path" collects the ID's of the verticies a
> message was send to.
>
> List<String> path = new ArrayList<String>();
> if (super.getSuperstepNumber() == 1) {
> path.add(vertex.getId());
> }
> if (super.getSuperstepNumber() > 1) {
> for (String values : vertex.f1.f1.get(super.getSuperstepNumber()
> - 1)) {
> path.add(values + ";" + vertex.getId());
> }
> }
>
> // The Path-List is send to the next neighbouring Nodes.
>
> for (Edge<String, String> edge : getEdges()) {
> sendMessageTo(edge.getTarget(), path);
> }
> }
> }
>
>
>
> public static final class PathUpdateFunction
> extends
> GatherFunction<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>, List<String>> {
> @Override
> public void updateVertex(
> Vertex<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>> vertex,
> MessageIterator<List<String>> messenger)
> throws Exception {
>
> List<String> newValues = new ArrayList<String>();
>
> // The Path-List which was send as a message is also stored
> within the vertex value, therefore the Paths are saved to a new List
> "newValues".
> // This List should not contain the ID of the vertex itself to
> avoid cycles.
>
> for (List<String> msg : messenger) {
> for (String value : msg) {
> if (!value.contains(vertex.getId())) {
> newValues.add(value);
> }
> }
> }
>
> // Creation of a new HashMap with the new and old values for
> the setNewVertexValue function
>
> HashMap<Integer, List<String>> newHashMap = vertex.f1.f1;
> newHashMap.put(super.getSuperstepNumber(), newValues);
>
>
> HashMap<String, List<String>> multiPaths = new HashMap<String,
> List<String>>();
>
>
> // Here it gets a bit complicated... However... I try to
> analyze the given paths for possible combinations of them.
> // For example... I got the path "a;b;c" and the patch
> "c;d;e", so I predict that "a;b;c;d;e" should also be possible.
>
> for (int i = 0; i < oriList.size(); i++) {
>
> String oriTemp = oriList.get(i);
> String destTemp = destList.get(i);
>
> String oriDest = oriTemp + destTemp;
>
> List<String> tempList = new ArrayList<String>();
> List<String> setsWithOrigin = new ArrayList<String>();
> List<String> setsWithDestination = new ArrayList<String>();
> for (Entry<Integer, List<String>> entry :
> newHashMap.entrySet()) {
> for (String value : entry.getValue()) {
> if (value.contains(oriTemp)) {
> setsWithOrigin.add(value);
> }
> if (value.contains(destTemp)) {
> setsWithDestination.add(value);
> }
> }
> }
> for (String originIter : setsWithOrigin) {
> for (String destinationIter : setsWithDestination) {
> String concat = "";
> if ((originIter.indexOf(oriTemp) == 0 &&
> destinationIter
> .indexOf(destTemp) == 0)) {
> String reverse = destinationIter;
> if (destinationIter.length() > 1) {
> reverse = "";
> int d = destinationIter.length();
> for (int a = 0; a <
> destinationIter.length(); a++) {
> reverse = reverse
> + destinationIter.substring(d
> - 1,
> d);
> d--;
> }
> }
> concat = originIter + ";" + vertex.getId() +
> ";"
> + reverse;
> }
> if (isFormatValid(concat) && concat.length() > 0) {
> if (!tempList.contains(concat)) {
> tempList.add(concat);
> }
> }
> }
> }
> multiPaths.put(oriDest, tempList);
> }
>
>
> // The combined paths are also saved into a HashMap which is
> additionally set as a Vertex Value
> // Later the paths are filtered for redundance
>
> Tuple2<HashMap<String, List<String>>, HashMap<Integer,
> List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>(
> multiPaths, newHashMap);
>
>
> setNewVertexValue(testTuple3);
> }
> }
>
>
> Let me know if you need any further information.
> Thanks in advance.
>
> All the best,
> Dennis
>
>
> Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr
> Von: "Vasiliki Kalavri" <vasilikikala...@gmail.com>
> An: dev@flink.apache.org
> Betreff: Re: Flink Gelly
> Hi Dennis,
>
> can you give us some details about your setup? e.g. where you are running
> your job, your input size, the configured memory, etc. It would also be
> helpful if you could share your code. Getting an out of memory error with
> just 100 nodes seems weird.
>
> Best,
> -Vasia.
>
> On 6 October 2016 at 13:29, <d...@web.de> wrote:
>
> >
> > Dear ladies and gentlemen,
> >
> > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso
> > Graph into
> > Flink's Gelly and I want to analyze it for the different paths one can
> > take to link
> > the different nodes. Therefore I am using the ScatterGatherIteration.
> > However, my code just works with about ten to twenty nodes. When I try to
> > upload
> > a hundred nodes, the following error occurs:
> >
> > Exception in thread "main" org.apache.flink.runtime.
> > client.JobExecutionException: Job execution failed.
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> > mcV$sp(JobManager.scala:822)
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> JobManager.scala:768)
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> JobManager.scala:768)
> > at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > liftedTree1$1(Future.scala:24)
> > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> > Future.scala:24)
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > AbstractDispatcher.scala:401)
> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > runTask(ForkJoinPool.java:1339)
> > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > ForkJoinPool.java:1979)
> > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > ForkJoinWorkerThread.java:107)
> > Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed.
> > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow
> > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory:
> > 33685504 Message: null
> > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > insertRecordIntoPartition(CompactingHashTable.java:457)
> > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > insertOrReplaceRecord(CompactingHashTable.java:392)
> > at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect
> > or.collect(SolutionSetUpdateOutputCollector.java:54)
> > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue(
> > GatherFunction.java:123)
> > at org.apache.flink.quickstart.PathRank$PathUpdateFunction.
> > updateVertex(PathRank.java:357)
> > at org.apache.flink.graph.spargel.ScatterGatherIteration$
> > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr
> > iver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> > AbstractIterativeTask.java:146)
> > at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> > IterationTailTask.java:107)
> > at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:351)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> > at java.lang.Thread.run(Thread.java:745)
> >
> >
> > I tried to google it a bit, and this problems seems to occur often when
> > using Gelly. I hope you have any ideas or approaches how I can handle
> this
> > error.
> >
> > Thank you in advance!
> > All the best,
> > Dennis
> >
>

Reply via email to