Kaushik

I'm also running a triangle counting program of my own at the moment and
have encountered similar problems.

Missing working appears to indicate that one of the workers failed (AFAIK)
and you'll need to look at the logs for the other map tasks to determine why
this is.  You appear to have provided those logs and it looks to be the same
problem I had of out of memory.  Your algorithm appears to be roughly
similar to mine so what you'll be hitting is that even with this relatively
small graph you get a massive explosion of messages by the later super steps
which exhausts memory (in my graph the theoretical maximum messages by the
last super step was ~3 billion).

In order for me to get a successful run on a single node cluster you'll
likely have to do some/all of the following:
* Enable out of core messages by adding appropriate configuration, see
http://giraph.apache.org/ooc.html (I set max messages in memory to 10,000)
* Increase the heap size for the map reduce processes, this can be done with
the mapred.child.java.opts setting (I used ­Xmx768M ­Xms256M).  Ideally you
should pick values such that the max memory times the number of workers will
fit into available RAM as otherwise you'll start hitting swap files and
grind to a halt
* Increase the number of workers you use (the ­w argument you pass to
Giraph), you'll need to have also configured your Hadoop cluster to support
at least one more map tasks than the number of workers (if not using
external Zookeeper) or at least as many map tasks as the number of workers
if using external Zookeeper.  Bear in mind that using more workers doesn't
necessarily help that much on a single node cluster but it does mean you can
keep the heap size settings down lower.
Even with all this my algorithm took 16 minutes to run on a graph with 10K
vertices and 150K edges, if you have better luck I'd be interested to know.

Hope this helps,

Rob

From:  Kaushik Patnaik <kaushikpatn...@gmail.com>
Reply-To:  <user@giraph.apache.org>
Date:  Wednesday, 4 December 2013 04:18
To:  <user@giraph.apache.org>
Subject:  Missing chosen worker error on particular superstep

> Hi,
> 
> I am getting a missing chosen worker error upon running a custom program
> (triangle counting) on a graph dataset of about 7K nodes and 200K edges. The
> program runs fine on the first two supersteps, but throws a missing chosen
> worker error in the third superstep. In the third superstep each node sends
> messages to all its neighbors.
> 
> The program runs fine and outputs results on test datasets of size 100 nodes
> and 1000 edges, still I have pasted it below in case I am missing something.
> 
> I am running the job on a single cluster machine though, could that be a
> problem ?? Will increasing the "mapreduce.task.timeout" help in any manner ?
> I also see a Java Heap Source error in the second map task of the same run.
> Are these errors connected ?
> 
> Regards
> Kaushik
> 
> syslogs from attempt_201312031749_0011_m_000000_0
> 2013-12-03 23:10:50,661 INFO org.apache.giraph.master.MasterThread:
> masterThread: Coordination of superstep 1 took 100.399 seconds ended with
> state THIS_SUPERSTEP_DONE and is now on superstep 2
> 2013-12-03 23:10:50,774 INFO org.apache.giraph.comm.netty.NettyClient:
> connectAllAddresses: Successfully added 0 connections, (0 total connected) 0
> failed, 0 failures total.
> 2013-12-03 23:10:50,774 INFO org.apache.giraph.partition.PartitionBalancer:
> balancePartitionsAcrossWorkers: Using algorithm static
> 2013-12-03 23:10:50,775 INFO org.apache.giraph.partition.PartitionUtils:
> analyzePartitionStats: Vertices - Mean: 7115, Min: Worker(hostname=localhost,
> MRtaskID=1, port=30001) - 7115, Max: Worker(hostname=localhost, MRtaskID=1,
> port=30001) - 7115
> 2013-12-03 23:10:50,775 INFO org.apache.giraph.partition.PartitionUtils:
> analyzePartitionStats: Edges - Mean: 201524, Min: Worker(hostname=localhost,
> MRtaskID=1, port=30001) - 201524, Max: Worker(hostname=localhost, MRtaskID=1,
> port=30001) - 201524
> 2013-12-03 23:10:50,806 INFO org.apache.giraph.master.BspServiceMaster:
> barrierOnWorkerList: 0 out of 1 workers finished on superstep 2 on path
> /_hadoopBsp/job_201312031749_0011/_applicationAttemptsDir/0/_superstepDir/2/_w
> orkerFinishedDir
> 2013-12-03 23:10:50,806 INFO org.apache.giraph.master.BspServiceMaster:
> barrierOnWorkerList: Waiting on [localhost_1]
> 2013-12-03 23:11:01,756 ERROR org.apache.giraph.master.BspServiceMaster:
> superstepChosenWorkerAlive: Missing chosen worker Worker(hostname=localhost,
> MRtaskID=1, port=30001) on superstep 2
> 2013-12-03 23:11:01,757 INFO org.apache.giraph.master.MasterThread:
> masterThread: Coordination of superstep 2 took 11.096 seconds ended with state
> WORKER_FAILURE and is now on superstep 2
> 2013-12-03 23:11:01,761 FATAL org.apache.giraph.master.BspServiceMaster:
> getLastGoodCheckpoint: No last good checkpoints can be found, killing the job.
> attempt_201312031749_0011_m_000001_0
> java.lang.IllegalStateException: run: Caught an unrecoverable exception
> waitFor: ExecutionException occurred while waiting for
> org.apache.giraph.utils.ProgressableUtils$FutureWaitable@61db327f
>       at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:101)
>       at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>       at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
>       at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:415)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java
> :1059)
>       at org.apache.hadoop.mapred.Child.main(Child.java:253)
> Caused by: java.lang.IllegalStateException: waitFor: ExecutionException
> occurred while waiting for
> org.apache.giraph.utils.ProgressableUtils$FutureWaitable@61db327f
>       at 
> org.apache.giraph.utils.ProgressableUtils.waitFor(ProgressableUtils.java:181)
>       at 
> org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:1
> 39)
>       at 
> org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:1
> 24)
>       at 
> org.apache.giraph.utils.ProgressableUtils.getFutureResult(ProgressableUtils.ja
> va:87)
>       at 
> org.apache.giraph.utils.ProgressableUtils.getResultsWithNCallables(Progressabl
> eUtils.java:221)
>       at 
> org.apache.giraph.graph.GraphTaskManager.processGraphPartitions(GraphTaskManag
> er.java:736)
>       at 
> org.apache.giraph.graph.GraphTaskManager.execute(GraphTaskManager.java:284)
>       at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:91)
>       ... 7 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.OutOfMemoryError: Java heap space
>       at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:262)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:119)
>       at 
> org.apache.giraph.utils.ProgressableUtils$FutureWaitable.waitFor(ProgressableU
> tils.java:300)
>       at 
> org.apache.giraph.utils.ProgressableUtils.waitFor(ProgressableUtils.java:173)
> 
> Program - 
> public class TriangleCounting extends BasicComputation<
>     DoubleWritable, DoubleWritable, FloatWritable, DoubleWritable> {
>   
>   /** Class logger */
>   private static final Logger LOG = Logger.getLogger(TriangleCounting.class);
>     
>   @Override
>   public void compute(
>       Vertex<DoubleWritable, DoubleWritable, FloatWritable> vertex,
>       Iterable<DoubleWritable> messages) throws IOException {
>                 
>   /** First Superstep releases messages to vertexIds() whose value is greater
> than its value. Both VertexId and Message are double **/
>   if (getSuperstep() == 0) {
>     for (Edge<DoubleWritable, FloatWritable> edge: vertex.getEdges()) {
>         if (edge.getTargetVertexId().compareTo(vertex.getId()) == 1) {
>               sendMessage(edge.getTargetVertexId(), vertex.getId());
>         if (LOG.isDebugEnabled()) {
>           LOG.debug("Vertex " + vertex.getId() + " sent message " +
> vertex.getId() + " to vertex " + edge.getTargetVertexId());
>         }
>         System.out.println("Vertex " + vertex.getId() + " sent message " +
> vertex.getId() + " to vertex " + edge.getTargetVertexId());
>         }
>     }
>   }
>     
>   /** Second superstep releases messages to message.get() < vertex.getId() <
> targetVertexId() **/
>   if (getSuperstep() == 1) {
>       for (DoubleWritable message: messages) {
>           for (Edge<DoubleWritable, FloatWritable> edge: vertex.getEdges()) {
>                 if (edge.getTargetVertexId().compareTo(vertex.getId()) +
> vertex.getId().compareTo(message) == 2) {
>                   sendMessage(edge.getTargetVertexId(), message);
>               if (LOG.isDebugEnabled()) {
>                 LOG.debug("Vertex " + vertex.getId() + " sent message " + 
> message +
> " to vertex " + edge.getTargetVertexId());
>               }
>               System.out.println("Vertex " + vertex.getId() + " sent message 
> " +
> message + " to vertex " + edge.getTargetVertexId());
>                 }
>           }
>     }
>   }
>   /** Sends messages to all its neighbours, the messages it receives **/
>   if (getSuperstep() == 2) {
>     for (DoubleWritable message: messages) {
>               sendMessageToAllEdges(vertex, message);
>       }
>   }
>   
>   if (getSuperstep() == 3) {
>       double Value = 0.0;
>         for (DoubleWritable message: messages) {
>                 if (vertex.getId().equals(message)) {
>               Value += 1.0;
>               System.out.println("Vertex " + vertex.getId() + " received
> message " + message);
>             }
>       }
>       vertex.setValue(new DoubleWritable(Value));
>   } 
>     
>   vertex.voteToHalt();
>   }
> }


Reply via email to