Please wait for some time. We're working on reducing memory leaks and memory use[1].
1. https://issues.apache.org/jira/browse/HAMA-704 On Mon, Feb 18, 2013 at 1:28 AM, Francisco Sanches <[email protected]> wrote: > Thanks for the help, > > It takes a graph model, or my entire project of java? > > Below is my class: > > package cc.faccamp.allpair; > > > //Unemat - Faccamp > import java.io.IOException; > > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.io.LongWritable; > import org.apache.hadoop.io.NullWritable; > import org.apache.hadoop.io.Text; > import org.apache.hama.HamaConfiguration; > import org.apache.hama.bsp.HashPartitioner; > import org.apache.hama.bsp.TextInputFormat; > import org.apache.hama.bsp.TextOutputFormat; > import org.apache.hama.graph.GraphJob; > import java.util.ArrayList; > import java.util.Iterator; > import java.util.List; > import java.util.regex.Pattern; > import org.apache.hama.graph.Edge; > import org.apache.hama.graph.Vertex; > import org.apache.hama.graph.VertexInputReader; > > > > public class AllPair { > > public static final String QTD_NODES = "quantity.nodes"; > > > public static class ShortestPathVertex extends > Vertex<Text, Text, Text> { > > int[] distances; > > public ShortestPathVertex() { > this.setValue(new Text(String.valueOf(Integer.MIN_VALUE))); > } > > @Override > public void compute(Iterator<Text> messages) throws IOException { > > if (getSuperstepCount() == 0L) { > //takes the size of the array passed args > Text lengtharray = new Text(getConf().get(QTD_NODES)); > > //creates arrays one for each vertex > distances = new int [Integer.parseInt(lengtharray.toString())]; > > //initializes the arrays > for(int i=0;i<distances.length;i++){ > if (i!=Integer.parseInt(getVertexID().toString())) > distances [i] = Byte.MAX_VALUE; > for (Edge<Text, Text> e : this.getEdges()) { > int otherVertexId = Integer.parseInt(e.getDestinationVertexID().toString()); > if (i== otherVertexId) > distances[i]= 1; > } > } > }else{ > boolean update = false; > while(messages.hasNext()){ > update = true; > //receives the message > Text outroNo = messages.next(); > String msg = outroNo.toString(); > String[] separa_mensagem = msg.split(Pattern.quote(",")); > > for (int i=1;i<separa_mensagem.length;i++){ > String []separa = separa_mensagem[i].split(Pattern.quote(".")); > //Separates the message source and destination > //separa [0] index for array and separa [1] value array > int index_distances = Integer.parseInt(separa[0].toString()); > int value = Integer.parseInt(separa[1].toString()); > int vertex_value = Integer.parseInt(getValue().toString()); > //If the value of the message is smaller than the array distances it > replaces > if (value < distances[index_distances]){ > distances[index_distances]= value; > update = false; > //if the value is greater than the vertex calculation here it > //replaces the eccentricity of the vertex > if (vertex_value<value){ > setValue(new Text(String.valueOf(value))); > } > } > } > } > > String send_in_mensagem = null; > for(int i=0;i<distances.length;i++){ > if ((distances[i] <Integer.MAX_VALUE)){ > int sum_value_array_more_1 = distances[i]+1; > send_in_mensagem = send_in_mensagem+","+i+"."+sum_value_array_more_1; > } > } > > if (send_in_mensagem!=null) > for(Edge<Text, Text> e : this.getEdges()){ > sendMessage(e, new Text(send_in_mensagem.toString())); > } > > if (update) voteToHalt(); > } > } > } > public static class ParserGraph extends > VertexInputReader<LongWritable, Text, Text, NullWritable, Text> { > /* > String lastVertexId = null; > List<String> adjacents = new ArrayList<String>(); > > @Override > public boolean parseVertex(LongWritable key, Text value, > Vertex<Text, NullWritable, Text> vertex) { > > String line = value.toString(); > String[] lineSplit = line.split("\t"); > if (!line.startsWith("#")) { > if (lastVertexId == null) { > lastVertexId = lineSplit[0]; > } > if (lastVertexId.equals(lineSplit[0])) { > adjacents.add(lineSplit[1]); > } else { > vertex.setVertexID(new Text(lastVertexId)); > for (String adjacent : adjacents) { > vertex.addEdge(new Edge<Text, NullWritable>(new Text(adjacent), > null)); > } > adjacents.clear(); > lastVertexId = lineSplit[0]; > adjacents.add(lineSplit[1]); > return true; > } > } > return false; > } > > }*/ > > String vertice = null; > List<String> adjacents = new ArrayList<String>(); > > @Override > public boolean parseVertex(LongWritable key, Text value, > Vertex<Text, NullWritable, Text> vertex) { > > String line_file = value.toString(); > String[] vertice_adjacentes; > > //aqui separa os vertices adjacenstes 5;4 > vertice_adjacentes = line_file.substring(line_file.indexOf("\t") + 1, > line_file.length()).split(";"); > > //aqui pega o vertice #4 > vertice = line_file.substring(1, line_file.indexOf("\t")); > > //aqui adiciona os vertives adjacentes na lista adjacents > for (String s : vertice_adjacentes) > adjacents.add(s); > > //isso grava no arquivo a lista dos vertices; > vertex.setVertexID(new Text(vertice)); > > //aqui adiciona os vertices adjacentes no hama. > for (String adjacent : adjacents) { > vertex.addEdge(new Edge<Text, NullWritable>(new Text(adjacent), null)); > } > > //limpa a lista adjacents > adjacents.clear(); > > return true; > } > > } > static void printUsage() { > System.out.println("Usage: <startnode> <input> <output> [tasks]"); > System.exit(-1); > } > > > > public static void main(String[] args) throws IOException, > InterruptedException, ClassNotFoundException { > if (args.length < 3) > printUsage(); > > > // Graph job configuration > HamaConfiguration conf = new HamaConfiguration(); > GraphJob job = new GraphJob(conf, ShortestPathVertex.class); > job.setJobName("All Pair"); > > conf.set(QTD_NODES, args[0]); > > > job.setVertexClass(ShortestPathVertex.class); > job.setInputPath(new Path(args[1])); > job.setOutputPath(new Path(args[2])); > > > if (args.length >= 4) > job.setNumBspTask(Integer.parseInt(args[3])); > > job.setVertexIDClass(Text.class); > job.setVertexValueClass(Text.class); > job.setEdgeValueClass(NullWritable.class); > > job.setInputKeyClass(LongWritable.class); > job.setInputValueClass(Text.class); > job.setInputFormat(TextInputFormat.class); > job.setVertexInputReaderClass(ParserGraph.class); > job.setPartitioner(HashPartitioner.class); > job.setOutputFormat(TextOutputFormat.class); > job.setOutputKeyClass(Text.class); > job.setOutputValueClass(Text.class); > > > long startTime = System.currentTimeMillis(); > if (job.waitForCompletion(true)) { > System.out.println("Job Finished in " > + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); > } > } > > } > > > 2013/2/17 Edward J. Yoon <[email protected]> > >> Hi, can you attach your code here or send to me? >> >> On Sat, Feb 16, 2013 at 2:55 AM, Francisco Sanches >> <[email protected]> wrote: >> > Hi, >> > >> > After some testing I could see that my application generates enough >> > messages and the accumulation of these messages are that are causing >> > failures. From what I could see all the messages that are sent remain in >> > memory even if the superstep taking over. The accumulation of these >> > messages has caused failures, mainly due to lack of memory. I rotated the >> > hama in 15 i7 (8 cores each) with 8 gigabytes of ram. With this >> > infrastructure'm not able to calculate diameter of a graph with vertices >> 40 >> > 000 and 300 000 edges. >> > >> > I tried to use the parameter: >> > >> > <property> >> > <name> Hama.messenger.queue.class </ name> >> > <value> Org.apache.hama.bsp.message.DiskQueue </ value> >> > </ Property> >> > >> > But also not getting success. >> > >> > Does anyone have any tips on how I can solve this problem? >> > >> > -- >> > Francisco Sanches >> >> >> >> -- >> Best Regards, Edward J. Yoon >> @eddieyoon >> > > > > -- > Francisco Sanches -- Best Regards, Edward J. Yoon @eddieyoon
