"Can you paste your cluster information ?" What kind of information do you need?How can I get these informations? "What are your message types?" The message type is just LongWritable. I don't use collections during the graph processing. I use collections just to load the input graph but it seems to works perfectly. Is it possible to avoid allocation of primitive writables (like LongWritable) to increase performances and use less memory ? "How you invoke the job?" Here is the command typed in my terminal to start the job : hadoop jar hadoop/jars/test-connexity.jar \ lifo.giraph.test.Main \ /test-connexity \ /test-connexity-output \ 10 The first Giraph argument is the input file, the second is the output file and the last is the number of workers.Please find attached the code of my Giraph application. Main.java configure and start the job. VertexComputation.java compute the data and the thow last file define how to load the input and save the output graph. PS : I'm not English, so I'm sorry if I do some language mistakes. Thanks for your help. Date: Fri, 26 Jul 2013 08:13:22 -0700 From: ach...@apache.org To: user@giraph.apache.org Subject: Re: Scaling Problem Hi guys, At some point, we do need to help with a guide for conserving memory, but this is a generic Java problem. You can work around it by avoiding objects as much as possible by using primitives directly. If you need primitive collections see FastUtils, Trove, etc. Combiners also save a lot of memory for messages. What are your message types? Avery On 7/26/13 6:53 AM, Puneet Jain wrote: Can you paste your cluster information ? I am also struggling to make it work on 75M vertices and 100s of million edges. On Fri, Jul 26, 2013 at 8:02 AM, jerome richard <jeromerichard...@msn.com> wrote: Hi, I encountered a critical scaling problem using Giraph. I made a very simple algorithm to test Giraph on large graphs : a connexity test. It works on relatively large graphs (3 072 441 nodes and 117 185 083 edges) but not on very large graph (52 000 000 nodes and 2 000 000 000 edges). In fact, during the processing of the biggest graph, Giraph core seems to fail after the superstep 14 (15 on some jobs). The input graph size is 30 GB stored as text and the output is also stored as text. 9 working jobs are used to compute the graph. Here is the tracktrace of jobs (this is the same for the 9 jobs): java.lang.IllegalStateException: run: Caught an unrecoverable exception exists: Failed to check /_hadoopBsp/job_201307260439_0006/_applicationAttemptsDir/0/_superstepDir/97/_addressesAndPartitions after 3 tries! at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:101) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Unknown Source) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093) at org.apache.hadoop.mapred.Child.main(Child.java:249) Caused by: java.lang.IllegalStateException: exists: Failed to check /_hadoopBsp/job_201307260439_0006/_applicationAttemptsDir/0/_superstepDir/97/_addressesAndPartitions after 3 tries! at org.apache.giraph.zk.ZooKeeperExt.exists(ZooKeeperExt.java:369) at org.apache.giraph.worker.BspServiceWorker.startSuperstep(BspServiceWorker.java:678) at org.apache.giraph.graph.GraphTaskManager.execute(GraphTaskManager.java:248) at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:91) ... 7 more Could you help me to solve this problem? If you need the code of the program, I can put that here (the code is relatively tiny). Thanks, Jérôme. -- --Puneet
package lifo.giraph.test; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.job.GiraphJob; import org.apache.giraph.io.formats.GiraphFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.fs.Path; import java.util.Map; public class Main { //public static int SOURCE_ID = 0; public static void main(String[] args) throws Exception { if(args.length != 3) { String err = "Must have 3 arguments: <input path> <output path> <number of workers>"; throw new IllegalArgumentException(err); } String jobName = "Giraph test"; Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); int nbWorkers = Integer.parseInt(args[2]); GiraphConfiguration configuration = new GiraphConfiguration(); configuration.setComputationClass(VertexComputation.class); configuration.setVertexInputFormatClass(VertexInputFormat.class); configuration.setVertexOutputFormatClass(VertexOutputFormat.class); configuration.setWorkerConfiguration(nbWorkers, nbWorkers, 100.f); GiraphFileInputFormat.addVertexInputPath(configuration, inputPath); GiraphJob job = new GiraphJob(configuration, jobName); FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath); //job.getConfiguration().setLong(SOURCE_ID, sourceId); if(!job.run(true)) System.exit(1); System.exit(0); } }
package lifo.giraph.test; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.Computation; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; public class VertexComputation extends Computation<LongWritable, LongWritable, NullWritable, LongWritable, LongWritable> { @Override public void compute(final Vertex<LongWritable, LongWritable, NullWritable> vertex, final Iterable<LongWritable> messages) { if(getSuperstep() == 0) { sendMessageToAllEdges(vertex, vertex.getValue()); return; } long min = Long.MAX_VALUE; for(final LongWritable m : messages) if(m.get() < min) min = m.get(); if(vertex.getValue().get() <= min) { vertex.voteToHalt(); return; } final LongWritable newValue = new LongWritable(min); vertex.setValue(newValue); sendMessageToAllEdges(vertex, newValue); } }
package lifo.giraph.test; import com.google.common.collect.Lists; import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.io.formats.TextVertexInputFormat; import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.EdgeFactory; import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; import java.util.List; import java.util.regex.Pattern; /** * Simple text-based VertexInputFormat for unweighted graphs with long ids. * Each line consists of: vertex neighbor1 neighbor2 ... */ public class VertexInputFormat extends TextVertexInputFormat<LongWritable, LongWritable, NullWritable> implements ImmutableClassesGiraphConfigurable<LongWritable, LongWritable, NullWritable> { private ImmutableClassesGiraphConfiguration<LongWritable, LongWritable, NullWritable> conf; @Override public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException { return new VertexReader(); } @Override public void setConf(ImmutableClassesGiraphConfiguration<LongWritable, LongWritable, NullWritable> configuration) { this.conf = configuration; } @Override public ImmutableClassesGiraphConfiguration<LongWritable, LongWritable, NullWritable> getConf() { return conf; } public class VertexReader extends TextVertexInputFormat<LongWritable, LongWritable, NullWritable>.TextVertexReader { // Separator of the vertex and neighbors private final Pattern separator = Pattern.compile("[\t ]"); @Override public Vertex<LongWritable, LongWritable, NullWritable> getCurrentVertex() throws IOException, InterruptedException { Vertex<LongWritable, LongWritable, NullWritable> vertex = conf.createVertex(); String[] tokens = separator.split(getRecordReader().getCurrentValue().toString()); List<Edge<LongWritable, NullWritable>> edges = Lists.newArrayListWithCapacity(tokens.length - 1); for(int n=1 ; n<tokens.length ; n++) edges.add(EdgeFactory.create(new LongWritable(Long.parseLong(tokens[n])), NullWritable.get())); final long vertexId = Long.parseLong(tokens[0]); vertex.initialize(new LongWritable(vertexId), new LongWritable(vertexId), edges); return vertex; } @Override public boolean nextVertex() throws IOException, InterruptedException { return getRecordReader().nextKeyValue(); } } }
package lifo.giraph.test; import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.formats.TextVertexOutputFormat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; import java.util.LinkedList; import java.util.HashSet; public class VertexOutputFormat extends TextVertexOutputFormat<LongWritable, LongWritable, NullWritable> { private class VertexWriter extends TextVertexWriter { @Override public void writeVertex(Vertex<LongWritable, LongWritable, NullWritable> vertex) throws IOException, InterruptedException { final String nodeId = vertex.getId().toString(); final String result = vertex.getValue().toString(); getRecordWriter().write(new Text(nodeId), new Text(result)); } } @Override public TextVertexWriter createVertexWriter(TaskAttemptContext context) throws IOException, InterruptedException { return new VertexWriter(); } }