Thanks for the help,
It takes a graph model, or my entire project of java?
Below is my class:
package cc.faccamp.allpair;
//Unemat - Faccamp
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.graph.GraphJob;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;
public class AllPair {
public static final String QTD_NODES = "quantity.nodes";
public static class ShortestPathVertex extends
Vertex<Text, Text, Text> {
int[] distances;
public ShortestPathVertex() {
this.setValue(new Text(String.valueOf(Integer.MIN_VALUE)));
}
@Override
public void compute(Iterator<Text> messages) throws IOException {
if (getSuperstepCount() == 0L) {
//takes the size of the array passed args
Text lengtharray = new Text(getConf().get(QTD_NODES));
//creates arrays one for each vertex
distances = new int [Integer.parseInt(lengtharray.toString())];
//initializes the arrays
for(int i=0;i<distances.length;i++){
if (i!=Integer.parseInt(getVertexID().toString()))
distances [i] = Byte.MAX_VALUE;
for (Edge<Text, Text> e : this.getEdges()) {
int otherVertexId = Integer.parseInt(e.getDestinationVertexID().toString());
if (i== otherVertexId)
distances[i]= 1;
}
}
}else{
boolean update = false;
while(messages.hasNext()){
update = true;
//receives the message
Text outroNo = messages.next();
String msg = outroNo.toString();
String[] separa_mensagem = msg.split(Pattern.quote(","));
for (int i=1;i<separa_mensagem.length;i++){
String []separa = separa_mensagem[i].split(Pattern.quote("."));
//Separates the message source and destination
//separa [0] index for array and separa [1] value array
int index_distances = Integer.parseInt(separa[0].toString());
int value = Integer.parseInt(separa[1].toString());
int vertex_value = Integer.parseInt(getValue().toString());
//If the value of the message is smaller than the array distances it
replaces
if (value < distances[index_distances]){
distances[index_distances]= value;
update = false;
//if the value is greater than the vertex calculation here it
//replaces the eccentricity of the vertex
if (vertex_value<value){
setValue(new Text(String.valueOf(value)));
}
}
}
}
String send_in_mensagem = null;
for(int i=0;i<distances.length;i++){
if ((distances[i] <Integer.MAX_VALUE)){
int sum_value_array_more_1 = distances[i]+1;
send_in_mensagem = send_in_mensagem+","+i+"."+sum_value_array_more_1;
}
}
if (send_in_mensagem!=null)
for(Edge<Text, Text> e : this.getEdges()){
sendMessage(e, new Text(send_in_mensagem.toString()));
}
if (update) voteToHalt();
}
}
}
public static class ParserGraph extends
VertexInputReader<LongWritable, Text, Text, NullWritable, Text> {
/*
String lastVertexId = null;
List<String> adjacents = new ArrayList<String>();
@Override
public boolean parseVertex(LongWritable key, Text value,
Vertex<Text, NullWritable, Text> vertex) {
String line = value.toString();
String[] lineSplit = line.split("\t");
if (!line.startsWith("#")) {
if (lastVertexId == null) {
lastVertexId = lineSplit[0];
}
if (lastVertexId.equals(lineSplit[0])) {
adjacents.add(lineSplit[1]);
} else {
vertex.setVertexID(new Text(lastVertexId));
for (String adjacent : adjacents) {
vertex.addEdge(new Edge<Text, NullWritable>(new Text(adjacent),
null));
}
adjacents.clear();
lastVertexId = lineSplit[0];
adjacents.add(lineSplit[1]);
return true;
}
}
return false;
}
}*/
String vertice = null;
List<String> adjacents = new ArrayList<String>();
@Override
public boolean parseVertex(LongWritable key, Text value,
Vertex<Text, NullWritable, Text> vertex) {
String line_file = value.toString();
String[] vertice_adjacentes;
//aqui separa os vertices adjacenstes 5;4
vertice_adjacentes = line_file.substring(line_file.indexOf("\t") + 1,
line_file.length()).split(";");
//aqui pega o vertice #4
vertice = line_file.substring(1, line_file.indexOf("\t"));
//aqui adiciona os vertives adjacentes na lista adjacents
for (String s : vertice_adjacentes)
adjacents.add(s);
//isso grava no arquivo a lista dos vertices;
vertex.setVertexID(new Text(vertice));
//aqui adiciona os vertices adjacentes no hama.
for (String adjacent : adjacents) {
vertex.addEdge(new Edge<Text, NullWritable>(new Text(adjacent), null));
}
//limpa a lista adjacents
adjacents.clear();
return true;
}
}
static void printUsage() {
System.out.println("Usage: <startnode> <input> <output> [tasks]");
System.exit(-1);
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
if (args.length < 3)
printUsage();
// Graph job configuration
HamaConfiguration conf = new HamaConfiguration();
GraphJob job = new GraphJob(conf, ShortestPathVertex.class);
job.setJobName("All Pair");
conf.set(QTD_NODES, args[0]);
job.setVertexClass(ShortestPathVertex.class);
job.setInputPath(new Path(args[1]));
job.setOutputPath(new Path(args[2]));
if (args.length >= 4)
job.setNumBspTask(Integer.parseInt(args[3]));
job.setVertexIDClass(Text.class);
job.setVertexValueClass(Text.class);
job.setEdgeValueClass(NullWritable.class);
job.setInputKeyClass(LongWritable.class);
job.setInputValueClass(Text.class);
job.setInputFormat(TextInputFormat.class);
job.setVertexInputReaderClass(ParserGraph.class);
job.setPartitioner(HashPartitioner.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
long startTime = System.currentTimeMillis();
if (job.waitForCompletion(true)) {
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
}
2013/2/17 Edward J. Yoon <[email protected]>
> Hi, can you attach your code here or send to me?
>
> On Sat, Feb 16, 2013 at 2:55 AM, Francisco Sanches
> <[email protected]> wrote:
> > Hi,
> >
> > After some testing I could see that my application generates enough
> > messages and the accumulation of these messages are that are causing
> > failures. From what I could see all the messages that are sent remain in
> > memory even if the superstep taking over. The accumulation of these
> > messages has caused failures, mainly due to lack of memory. I rotated the
> > hama in 15 i7 (8 cores each) with 8 gigabytes of ram. With this
> > infrastructure'm not able to calculate diameter of a graph with vertices
> 40
> > 000 and 300 000 edges.
> >
> > I tried to use the parameter:
> >
> > <property>
> > <name> Hama.messenger.queue.class </ name>
> > <value> Org.apache.hama.bsp.message.DiskQueue </ value>
> > </ Property>
> >
> > But also not getting success.
> >
> > Does anyone have any tips on how I can solve this problem?
> >
> > --
> > Francisco Sanches
>
>
>
> --
> Best Regards, Edward J. Yoon
> @eddieyoon
>
--
Francisco Sanches