"Can you paste your cluster information ?"
What kind of information do you need?How can I get these informations?
    "What are your message types?"
The message type is just LongWritable. I don't use collections during the graph 
processing. I use collections just to load the input graph but it seems to 
works perfectly. Is it possible to avoid allocation of primitive writables 
(like LongWritable) to increase performances and use less memory ?
    "How you invoke the job?"
Here is the command typed in my terminal to start the job :    hadoop jar 
hadoop/jars/test-connexity.jar \            lifo.giraph.test.Main \            
/test-connexity \            /test-connexity-output \            10
The first Giraph argument is the input file, the second is the output file and 
the last is the number of workers.Please find attached the code of my Giraph 
application. Main.java configure and start the job. VertexComputation.java 
compute the data and the thow last file define how to load the input and save 
the output graph.
PS : I'm not English, so I'm sorry if I do some language mistakes.
Thanks for your help.

Date: Fri, 26 Jul 2013 08:13:22 -0700
From: ach...@apache.org
To: user@giraph.apache.org
Subject: Re: Scaling Problem


  
    
  
  
    Hi guys,

        

        At some point, we do need to help with a guide for conserving
        memory, but this is a generic Java problem.  You can work around
        it by avoiding objects as much as possible by using primitives
        directly.  If you need primitive collections see FastUtils,
        Trove, etc.  Combiners also save a lot of memory for messages.

      

      What are your message types?

      

      Avery

      

      On 7/26/13 6:53 AM, Puneet Jain wrote:

    
    
      Can you paste your cluster information ? I am also
        struggling to make it work on 75M vertices and 100s of million
        edges.
      

        

        On Fri, Jul 26, 2013 at 8:02 AM, jerome
          richard <jeromerichard...@msn.com>
          wrote:

          
            
              Hi,

                
                  
                    

                    
                    I
                      encountered a critical scaling problem using
                      Giraph. I made a very simple algorithm to test
                      Giraph on large graphs : a connexity test. It
                      works on relatively large graphs (3 072 441 nodes
                      and 117 185 083 edges) but not on very large graph
                      (52 000 000 nodes and 2 000 000 000 edges). 
                    In

                      fact, during the processing of the biggest graph,
                      Giraph core seems to fail after the superstep 14
                      (15 on some jobs). The input graph size is 30 GB
                      stored as text and the output is also stored as
                      text. 9 working jobs are used to compute the
                      graph.
                    

                    
                    Here

                      is the tracktrace of jobs (this is the same for
                      the 9 jobs):
                     
                        java.lang.IllegalStateException: run: Caught an
                      unrecoverable exception exists: Failed to check
                      
/_hadoopBsp/job_201307260439_0006/_applicationAttemptsDir/0/_superstepDir/97/_addressesAndPartitions

                      after 3 tries!
                     
                            at
                      
org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:101)
                     
                            at
                      
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
                     
                            at
                      org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
                     
                            at
                      org.apache.hadoop.mapred.Child$4.run(Child.java:255)
                     
                            at
                      java.security.AccessController.doPrivileged(Native
                      Method)
                     
                            at javax.security.auth.Subject.doAs(Unknown
                      Source)
                     
                            at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)
                     
                            at
                      org.apache.hadoop.mapred.Child.main(Child.java:249)
                     
                        Caused by: java.lang.IllegalStateException:
                      exists: Failed to check
                      
/_hadoopBsp/job_201307260439_0006/_applicationAttemptsDir/0/_superstepDir/97/_addressesAndPartitions

                      after 3 tries!
                     
                            at
                      
org.apache.giraph.zk.ZooKeeperExt.exists(ZooKeeperExt.java:369)
                     
                            at
org.apache.giraph.worker.BspServiceWorker.startSuperstep(BspServiceWorker.java:678)
                     
                            at
org.apache.giraph.graph.GraphTaskManager.execute(GraphTaskManager.java:248)
                     
                            at
                      
org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:91)
                     
                            ... 7 more
                    

                    
                    Could

                      you help me to solve this problem?
                    If

                      you need the code of the program, I can put that
                      here (the code is relatively tiny).
                    

                    
                    Thanks, 
                    Jérôme.
                    

                    
                  
                
              
            
          
        
        

        
        

        
        -- 

        --Puneet 
    
    

                                          
package lifo.giraph.test;

import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.formats.GiraphFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;

import java.util.Map;


public class Main
{
    //public static int SOURCE_ID = 0;

    public static void main(String[] args) throws Exception
    {
        if(args.length != 3)
        {
            String err = "Must have 3 arguments: <input path> <output path> 
<number of workers>";
            throw new IllegalArgumentException(err);
        }

        String jobName = "Giraph test";
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);
        int nbWorkers = Integer.parseInt(args[2]);

        GiraphConfiguration configuration = new GiraphConfiguration();
        configuration.setComputationClass(VertexComputation.class);
        configuration.setVertexInputFormatClass(VertexInputFormat.class);
        configuration.setVertexOutputFormatClass(VertexOutputFormat.class);
        configuration.setWorkerConfiguration(nbWorkers, nbWorkers, 100.f);
        GiraphFileInputFormat.addVertexInputPath(configuration, inputPath);

        GiraphJob job = new GiraphJob(configuration, jobName);
        FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath);
        //job.getConfiguration().setLong(SOURCE_ID, sourceId);
        
        if(!job.run(true))
            System.exit(1);

        System.exit(0);
    }
}

package lifo.giraph.test;

import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.Computation;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;


public class VertexComputation extends Computation<LongWritable, LongWritable, 
NullWritable, LongWritable, LongWritable>
{
    @Override
    public void compute(final Vertex<LongWritable, LongWritable, NullWritable> 
vertex, final Iterable<LongWritable> messages)
    {
        if(getSuperstep() == 0)
        {
            sendMessageToAllEdges(vertex, vertex.getValue());
            return;
        }

        long min = Long.MAX_VALUE;

        for(final LongWritable m : messages)
            if(m.get() < min)
                min = m.get();

        if(vertex.getValue().get() <= min)
        {
            vertex.voteToHalt();
            return;
        }

        final LongWritable newValue = new LongWritable(min);
        vertex.setValue(newValue);
        sendMessageToAllEdges(vertex, newValue);
    }
}
package lifo.giraph.test;

import com.google.common.collect.Lists;
import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;
import java.util.List;
import java.util.regex.Pattern;


/**
 * Simple text-based VertexInputFormat for unweighted graphs with long ids. 
 * Each line consists of: vertex neighbor1 neighbor2 ...
 */
public class VertexInputFormat
        extends TextVertexInputFormat<LongWritable, LongWritable, NullWritable>
        implements ImmutableClassesGiraphConfigurable<LongWritable, 
LongWritable, NullWritable>
{
    private ImmutableClassesGiraphConfiguration<LongWritable, LongWritable, 
NullWritable> conf;

    @Override
    public TextVertexReader createVertexReader(InputSplit split, 
TaskAttemptContext context) throws IOException
    {
        return new VertexReader();
    }

    @Override
    public void setConf(ImmutableClassesGiraphConfiguration<LongWritable, 
LongWritable, NullWritable> configuration)
    {
        this.conf = configuration;
    }

    @Override
    public ImmutableClassesGiraphConfiguration<LongWritable, LongWritable, 
NullWritable> getConf()
    {
        return conf;
    }

    public class VertexReader extends TextVertexInputFormat<LongWritable, 
LongWritable, NullWritable>.TextVertexReader
    {
        // Separator of the vertex and neighbors
        private final Pattern separator = Pattern.compile("[\t ]");

        @Override
        public Vertex<LongWritable, LongWritable, NullWritable> 
getCurrentVertex() throws IOException, InterruptedException
        {
            Vertex<LongWritable, LongWritable, NullWritable> vertex = 
conf.createVertex();

            String[] tokens = 
separator.split(getRecordReader().getCurrentValue().toString());
            List<Edge<LongWritable, NullWritable>> edges = 
Lists.newArrayListWithCapacity(tokens.length - 1);

            for(int n=1 ; n<tokens.length ; n++)
                edges.add(EdgeFactory.create(new 
LongWritable(Long.parseLong(tokens[n])), NullWritable.get()));

            final long vertexId = Long.parseLong(tokens[0]);
            vertex.initialize(new LongWritable(vertexId), new 
LongWritable(vertexId), edges);

            return vertex;
        }

        @Override
        public boolean nextVertex() throws IOException, InterruptedException
        {
            return getRecordReader().nextKeyValue();
        }
    }
}

package lifo.giraph.test;

import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.TextVertexOutputFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;
import java.util.LinkedList;
import java.util.HashSet;


public class VertexOutputFormat extends TextVertexOutputFormat<LongWritable, 
LongWritable, NullWritable>
{
    private class VertexWriter extends TextVertexWriter
    {
        @Override
        public void writeVertex(Vertex<LongWritable, LongWritable, 
NullWritable> vertex) throws IOException, InterruptedException
        {
            final String nodeId = vertex.getId().toString();
            final String result = vertex.getValue().toString();

            getRecordWriter().write(new Text(nodeId), new Text(result));
        }
    }

    @Override
    public TextVertexWriter createVertexWriter(TaskAttemptContext context) 
throws IOException, InterruptedException
    {
        return new VertexWriter();
    }
}

Reply via email to