This looks like 0.1 (still using Hadoop RPC). Please try trunk instead.
Avery
On 1/10/13 1:09 AM, pankaj Gulhane wrote:
Hi,
My code is working on smaller (very very small) dataset but if I use
the same code on the large dataset it fails.
Following code is some basic implementation of naive PageRank (just
for testing). When I run with 4-5 vertices it works properly but when
run for thousands of vertices it fails with the following error
<error>
java.lang.IllegalStateException: run: Caught an unrecoverable
exception setup: Offlining servers due to exception...
at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:641)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:668)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:334)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1109)
at org.apache.hadoop.mapred.Child.main(Child.java:264)
Caused by: java.lang.RuntimeException: setup: Offlining servers due to
exception...
at org.apache.giraph.graph.GraphMapper.setup(GraphMapper.java:466)
at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:630)
... 7 more
Caused by: java.lang.IllegalStateException: setup: loadVertices failed
at
org.apache.giraph.graph.BspServiceWorker.setup(BspServiceWorker.java:582)
at org.apache.giraph.graph.GraphMapper.setup(GraphMapper.java:458)
... 8 more
Caused by: java.lang.NullPointerException
at
org.apache.giraph.comm.BasicRPCCommunications.sendPartitionReq(BasicRPCCommunications.java:817)
at
org.apache.giraph.graph.BspServiceWorker.loadVertices(BspServiceWorker.java:304)
at
org.apache.giraph.graph.BspServiceWorker.setup(BspServiceWorker.java:575)
... 9 more
</error>
<code>
public class PageRank implements Tool{
/** Configuration from Configurable */
private Configuration conf;
public static String SUPERSTEP_COUNT =
"PageRankBenchmark.superstepCount";
public static class PageRankHashMapVertex extends HashMapVertex<
LongWritable, DoubleWritable, DoubleWritable,
DoubleWritable> {
@Override
public void compute(Iterator<DoubleWritable> msgIterator) {
if (getSuperstep() >= 1) {
double sum = 0;
while (msgIterator.hasNext()) {
sum += msgIterator.next().get();
}
DoubleWritable vertexValue =
new DoubleWritable((0.15f / getNumVertices())
+ 0.85f *
sum);
setVertexValue(vertexValue);
}
if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT,4)) {
long edges = getNumOutEdges();
sendMsgToAllEdges(
new DoubleWritable(getVertexValue().get() /
edges));
}
voteToHalt();
}
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public int run(String[] args) throws Exception {
GiraphJob job = new GiraphJob(getConf(), getClass().getName());
// job.setJarByClass(getClass());
job.setVertexClass(PageRankHashMapVertex.class);
job.setVertexInputFormatClass(LongDoubleDoubleAdjacencyListVertexInputFormat.class);
job.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
job.setWorkerConfiguration(200, 200, 100.0f);
job.setJobName("Testing PG");
job.getConfiguration().setInt(SUPERSTEP_COUNT, 2);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return (job.run(true) == true ? 0 : 1);
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new PageRank(), args));
}
}
</code>
Any pointers/help on the mistake I may be doing would be great?
Thanks,
Pankaj
PS: I am running on a cluster with more than 400 mapper slots.