Thanks for the help,

It takes a graph model, or my entire project of java?

Below is my class:

package cc.faccamp.allpair;


//Unemat - Faccamp
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.graph.GraphJob;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;



public class AllPair {

public static final String QTD_NODES = "quantity.nodes";


public static class ShortestPathVertex extends
Vertex<Text, Text, Text> {

int[] distances;

public ShortestPathVertex() {
this.setValue(new Text(String.valueOf(Integer.MIN_VALUE)));
}

@Override
public void compute(Iterator<Text> messages) throws IOException {

if (getSuperstepCount() == 0L) {
//takes the size of the array passed args
Text lengtharray = new Text(getConf().get(QTD_NODES));

//creates arrays one for each vertex
distances = new int [Integer.parseInt(lengtharray.toString())];

//initializes the arrays
for(int i=0;i<distances.length;i++){
if (i!=Integer.parseInt(getVertexID().toString()))
distances [i] = Byte.MAX_VALUE;
for (Edge<Text, Text> e : this.getEdges()) {
int otherVertexId = Integer.parseInt(e.getDestinationVertexID().toString());
if (i== otherVertexId)
distances[i]= 1;
}
}
}else{
boolean update = false;
while(messages.hasNext()){
update = true;
//receives the message
Text outroNo = messages.next();
String msg = outroNo.toString();
String[] separa_mensagem = msg.split(Pattern.quote(","));

for (int i=1;i<separa_mensagem.length;i++){
String []separa = separa_mensagem[i].split(Pattern.quote("."));
//Separates the message source and destination
//separa [0] index for array and separa [1] value array
int index_distances = Integer.parseInt(separa[0].toString());
int value = Integer.parseInt(separa[1].toString());
int  vertex_value = Integer.parseInt(getValue().toString());
//If the value of the message is smaller than the array distances it
replaces
if (value < distances[index_distances]){
distances[index_distances]= value;
update = false;
//if the value is greater than the vertex calculation here it
//replaces the eccentricity of the vertex
if (vertex_value<value){
setValue(new Text(String.valueOf(value)));
}
}
}
}

String send_in_mensagem = null;
for(int i=0;i<distances.length;i++){
if ((distances[i] <Integer.MAX_VALUE)){
int sum_value_array_more_1 = distances[i]+1;
send_in_mensagem = send_in_mensagem+","+i+"."+sum_value_array_more_1;
}
}

if (send_in_mensagem!=null)
for(Edge<Text, Text> e : this.getEdges()){
sendMessage(e, new Text(send_in_mensagem.toString()));
}

if (update) voteToHalt();
}
}
}
public static class ParserGraph extends
VertexInputReader<LongWritable, Text, Text, NullWritable, Text> {
/*
String lastVertexId = null;
List<String> adjacents = new ArrayList<String>();

@Override
public boolean parseVertex(LongWritable key, Text value,
Vertex<Text, NullWritable, Text> vertex) {

String line = value.toString();
String[] lineSplit = line.split("\t");
if (!line.startsWith("#")) {
if (lastVertexId == null) {
lastVertexId = lineSplit[0];
}
if (lastVertexId.equals(lineSplit[0])) {
adjacents.add(lineSplit[1]);
} else {
vertex.setVertexID(new Text(lastVertexId));
for (String adjacent : adjacents) {
vertex.addEdge(new Edge<Text, NullWritable>(new Text(adjacent),
null));
}
adjacents.clear();
lastVertexId = lineSplit[0];
adjacents.add(lineSplit[1]);
return true;
}
}
return false;
}

}*/

String vertice = null;
List<String> adjacents = new ArrayList<String>();

@Override
public boolean parseVertex(LongWritable key, Text value,
Vertex<Text, NullWritable, Text> vertex) {

String line_file = value.toString();
String[] vertice_adjacentes;

//aqui separa os vertices adjacenstes 5;4
vertice_adjacentes = line_file.substring(line_file.indexOf("\t") + 1,
line_file.length()).split(";");

//aqui pega o vertice #4
vertice = line_file.substring(1, line_file.indexOf("\t"));

//aqui adiciona os vertives adjacentes na lista adjacents
for (String s : vertice_adjacentes)
adjacents.add(s);

//isso grava no arquivo a lista dos vertices;
vertex.setVertexID(new Text(vertice));

//aqui adiciona os vertices adjacentes no hama.
for (String adjacent : adjacents) {
vertex.addEdge(new Edge<Text, NullWritable>(new Text(adjacent), null));
}

//limpa a lista adjacents
adjacents.clear();

return true;
}

}
static void printUsage() {
System.out.println("Usage: <startnode> <input> <output> [tasks]");
System.exit(-1);
}



public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
if (args.length < 3)
printUsage();


// Graph job configuration
HamaConfiguration conf = new HamaConfiguration();
GraphJob job = new GraphJob(conf, ShortestPathVertex.class);
job.setJobName("All Pair");

conf.set(QTD_NODES, args[0]);


job.setVertexClass(ShortestPathVertex.class);
job.setInputPath(new Path(args[1]));
job.setOutputPath(new Path(args[2]));


if (args.length >= 4)
job.setNumBspTask(Integer.parseInt(args[3]));

job.setVertexIDClass(Text.class);
job.setVertexValueClass(Text.class);
job.setEdgeValueClass(NullWritable.class);

job.setInputKeyClass(LongWritable.class);
job.setInputValueClass(Text.class);
job.setInputFormat(TextInputFormat.class);
job.setVertexInputReaderClass(ParserGraph.class);
job.setPartitioner(HashPartitioner.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);


long startTime = System.currentTimeMillis();
if (job.waitForCompletion(true)) {
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}

}


2013/2/17 Edward J. Yoon <[email protected]>

> Hi, can you attach your code here or send to me?
>
> On Sat, Feb 16, 2013 at 2:55 AM, Francisco Sanches
> <[email protected]> wrote:
> > Hi,
> >
> > After some testing I could see that my application generates enough
> > messages and the accumulation of these messages are that are causing
> > failures. From what I could see all the messages that are sent remain in
> > memory even if the superstep taking over. The accumulation of these
> > messages has caused failures, mainly due to lack of memory. I rotated the
> > hama in 15 i7 (8 cores each) with 8 gigabytes of ram. With this
> > infrastructure'm not able to calculate diameter of a graph with vertices
> 40
> > 000 and 300 000 edges.
> >
> > I tried to use the parameter:
> >
> > <property>
> > <name> Hama.messenger.queue.class </ name>
> > <value> Org.apache.hama.bsp.message.DiskQueue </ value>
> > </ Property>
> >
> > But also not getting success.
> >
> > Does anyone have any tips on how I can solve this problem?
> >
> > --
> > Francisco Sanches
>
>
>
> --
> Best Regards, Edward J. Yoon
> @eddieyoon
>



-- 
Francisco Sanches

Reply via email to