Hi, I am getting a missing chosen worker error upon running a custom program (triangle counting) on a graph dataset of about 7K nodes and 200K edges. The program runs fine on the first two supersteps, but throws a missing chosen worker error in the third superstep. In the third superstep each node sends messages to all its neighbors.
The program runs fine and outputs results on test datasets of size 100 nodes and 1000 edges, still I have pasted it below in case I am missing something. I am running the job on a single cluster machine though, could that be a problem ?? Will increasing the "mapreduce.task.timeout" help in any manner ? I also see a Java Heap Source error in the second map task of the same run. Are these errors connected ? Regards Kaushik syslogs from attempt_201312031749_0011_m_000000_0 2013-12-03 23:10:50,661 INFO org.apache.giraph.master.MasterThread: masterThread: Coordination of superstep 1 took 100.399 seconds ended with state THIS_SUPERSTEP_DONE and is now on superstep 2 2013-12-03 23:10:50,774 INFO org.apache.giraph.comm.netty.NettyClient: connectAllAddresses: Successfully added 0 connections, (0 total connected) 0 failed, 0 failures total. 2013-12-03 23:10:50,774 INFO org.apache.giraph.partition.PartitionBalancer: balancePartitionsAcrossWorkers: Using algorithm static 2013-12-03 23:10:50,775 INFO org.apache.giraph.partition.PartitionUtils: analyzePartitionStats: Vertices - Mean: 7115, Min: Worker(hostname=localhost, MRtaskID=1, port=30001) - 7115, Max: Worker(hostname=localhost, MRtaskID=1, port=30001) - 7115 2013-12-03 23:10:50,775 INFO org.apache.giraph.partition.PartitionUtils: analyzePartitionStats: Edges - Mean: 201524, Min: Worker(hostname=localhost, MRtaskID=1, port=30001) - 201524, Max: Worker(hostname=localhost, MRtaskID=1, port=30001) - 201524 2013-12-03 23:10:50,806 INFO org.apache.giraph.master.BspServiceMaster: barrierOnWorkerList: 0 out of 1 workers finished on superstep 2 on path /_hadoopBsp/job_201312031749_0011/_applicationAttemptsDir/0/_superstepDir/2/_workerFinishedDir 2013-12-03 23:10:50,806 INFO org.apache.giraph.master.BspServiceMaster: barrierOnWorkerList: Waiting on [localhost_1] 2013-12-03 23:11:01,756 ERROR org.apache.giraph.master.BspServiceMaster: superstepChosenWorkerAlive: Missing chosen worker Worker(hostname=localhost, MRtaskID=1, port=30001) on superstep 2 2013-12-03 23:11:01,757 INFO org.apache.giraph.master.MasterThread: masterThread: Coordination of superstep 2 took 11.096 seconds ended with state WORKER_FAILURE and is now on superstep 2 2013-12-03 23:11:01,761 FATAL org.apache.giraph.master.BspServiceMaster: getLastGoodCheckpoint: No last good checkpoints can be found, killing the job. attempt_201312031749_0011_m_000001_0 java.lang.IllegalStateException: run: Caught an unrecoverable exception waitFor: ExecutionException occurred while waiting for org.apache.giraph.utils.ProgressableUtils$FutureWaitable@61db327f at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:101) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369) at org.apache.hadoop.mapred.Child$4.run(Child.java:259) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) at org.apache.hadoop.mapred.Child.main(Child.java:253) Caused by: java.lang.IllegalStateException: waitFor: ExecutionException occurred while waiting for org.apache.giraph.utils.ProgressableUtils$FutureWaitable@61db327f at org.apache.giraph.utils.ProgressableUtils.waitFor(ProgressableUtils.java:181) at org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:139) at org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:124) at org.apache.giraph.utils.ProgressableUtils.getFutureResult(ProgressableUtils.java:87) at org.apache.giraph.utils.ProgressableUtils.getResultsWithNCallables(ProgressableUtils.java:221) at org.apache.giraph.graph.GraphTaskManager.processGraphPartitions(GraphTaskManager.java:736) at org.apache.giraph.graph.GraphTaskManager.execute(GraphTaskManager.java:284) at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:91) ... 7 more Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Java heap space at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:262) at java.util.concurrent.FutureTask.get(FutureTask.java:119) at org.apache.giraph.utils.ProgressableUtils$FutureWaitable.waitFor(ProgressableUtils.java:300) at org.apache.giraph.utils.ProgressableUtils.waitFor(ProgressableUtils.java:173) Program - public class TriangleCounting extends BasicComputation< DoubleWritable, DoubleWritable, FloatWritable, DoubleWritable> { /** Class logger */ private static final Logger LOG = Logger.getLogger(TriangleCounting.class); @Override public void compute( Vertex<DoubleWritable, DoubleWritable, FloatWritable> vertex, Iterable<DoubleWritable> messages) throws IOException { /** First Superstep releases messages to vertexIds() whose value is greater than its value. Both VertexId and Message are double **/ if (getSuperstep() == 0) { for (Edge<DoubleWritable, FloatWritable> edge: vertex.getEdges()) { if (edge.getTargetVertexId().compareTo(vertex.getId()) == 1) { sendMessage(edge.getTargetVertexId(), vertex.getId()); if (LOG.isDebugEnabled()) { LOG.debug("Vertex " + vertex.getId() + " sent message " + vertex.getId() + " to vertex " + edge.getTargetVertexId()); } System.out.println("Vertex " + vertex.getId() + " sent message " + vertex.getId() + " to vertex " + edge.getTargetVertexId()); } } } /** Second superstep releases messages to message.get() < vertex.getId() < targetVertexId() **/ if (getSuperstep() == 1) { for (DoubleWritable message: messages) { for (Edge<DoubleWritable, FloatWritable> edge: vertex.getEdges()) { if (edge.getTargetVertexId().compareTo(vertex.getId()) + vertex.getId().compareTo(message) == 2) { sendMessage(edge.getTargetVertexId(), message); if (LOG.isDebugEnabled()) { LOG.debug("Vertex " + vertex.getId() + " sent message " + message + " to vertex " + edge.getTargetVertexId()); } System.out.println("Vertex " + vertex.getId() + " sent message " + message + " to vertex " + edge.getTargetVertexId()); } } } } /** Sends messages to all its neighbours, the messages it receives **/ if (getSuperstep() == 2) { for (DoubleWritable message: messages) { sendMessageToAllEdges(vertex, message); } } if (getSuperstep() == 3) { double Value = 0.0; for (DoubleWritable message: messages) { if (vertex.getId().equals(message)) { Value += 1.0; System.out.println("Vertex " + vertex.getId() + " received message " + message); } } vertex.setValue(new DoubleWritable(Value)); } vertex.voteToHalt(); } }