I have a few more questions...
1. When I run Junit do I have to run it in a pc with hadoop/Giraph
installed? If so do I have to start hadoop before I ran the programm?
2.I made certain that nodes and edges are added correctly in the graph.
But I get a null result after the job is run. And also I added
System.out.println() in many places in my KatzReduce.java and nothing
prints. As if it never got in to execute the job. *How can I fix this?**
*
public class KatzReduce extends BasicComputation<Text, FbGraphState,
DoubleWritable, FbGraphMessage> {
....
if (getSuperstep() == 0) {
sendMessageToAllEdges(vertex, new FbGraphMessage(0.5));
vertex.voteToHalt();
System.out.println("node "+vertex.getValue().getValue()+"
got superstep 0");
return;
}
if (getSuperstep() == 1) {
double sum = 0;
List<Text> mEdges = new ArrayList<>();
for (FbGraphMessage message : messages) {
sum += message.getKatzWeight();
}
vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() + sum);
sendMessageToAllEdges(vertex, new FbGraphMessage(0.25));
vertex.voteToHalt();
System.out.println("node "+vertex.getValue().getValue()+"
got superstep 1");
return;
}
......
}
KatzReduceTest.java
public void testCompute() throws Exception {
System.out.print("compute");
String[] tinyGraph =
{"1#2,3#54,78","2#1,3,4#78,63","3#1,2,4#77,125","4#2,3#77,63" };
// This is where you configure your job
GiraphConfiguration conf = new GiraphConfiguration();
conf.setComputationClass(KatzReduce.class);
// conf.setMasterComputeClass();
conf.setVertexInputFormatClass(FbGraphInputFormat.class);
conf.setVertexOutputFormatClass(FbGraphOutputFormat.class);
// Run and print results
Iterable<String> results = InternalVertexRunner.run(conf,
tinyGraph, tinyGraph);
for (String result : results) {
System.out.println(result);****<------result is always NULL
------NullPointerException
assertEquals(result,"1");//will always fail cause I
dont know the output format yet
}
}
output of node/edge list creation
compute1#2,3#54,78
added an edge id: 2 weight 1.0
added an edge id: 3 weight 1.0
created a vertex with value: 1 numOfedges: 2
2#1,3,4#78,63
added an edge id: 1 weight 1.0
added an edge id: 4 weight 1.0
added an edge id: 3 weight 1.0
created a vertex with value: 2 numOfedges: 3
3#1,2,4#77,125
added an edge id: 2 weight 1.0
added an edge id: 1 weight 1.0
added an edge id: 4 weight 1.0
created a vertex with value: 3 numOfedges: 3
4#2,3#77,63
added an edge id: 2 weight 1.0
added an edge id: 3 weight 1.0
created a vertex with value: 4 numOfedges: 2
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package Fb_package;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
*
* @author user
*/
public class FbGraphInputFormat extends TextVertexInputFormat<Text,
FbGraphState, DoubleWritable> {
@Override
public TextVertexReader createVertexReader(InputSplit is,
TaskAttemptContext tac) throws IOException {
return new FbGraphReader();
}
protected class FbGraphReader extends TextVertexReader {
@Override
public boolean nextVertex() throws IOException, InterruptedException {
return getRecordReader().nextKeyValue();
}
@Override
public Vertex<Text, FbGraphState, DoubleWritable> getCurrentVertex()
throws IOException, InterruptedException {
String line = getRecordReader().getCurrentValue().toString();
String[] tokens = line.trim().split("#");
if (tokens.length < 2) {
throw new IllegalArgumentException("Invalid line: (" + line +
")");
}
FbGraphState state = new FbGraphState();
Text id = new Text(tokens[0]);
state.setValue(id.toString());
state.setNodeWeight(1.0);
Map<Text, DoubleWritable> edgeMap = new HashMap<>();
ArrayList<Edge<Text, DoubleWritable>> edgesList = new ArrayList<>();
String[] edges = (tokens.length > 2) ? tokens[1].split(",") : new
String[0];
for (int i = 0; i < edges.length; i++) {
double weight = 1.0;
Text edgeKey = new Text(edges[i]);
edgeMap.put(edgeKey, new DoubleWritable(weight));
// edgesList.add(EdgeFactory.create(new
// LongWritable(edgeKey),new LongWritable(weight)));
}
for (Map.Entry<Text, DoubleWritable> entry
: edgeMap.entrySet()) {
edgesList.add(EdgeFactory.create(entry.getKey(),
entry.getValue()));
}
Vertex<Text, FbGraphState, DoubleWritable> vertex =
this.getConf().createVertex();
vertex.initialize(id, state, edgesList);
return vertex;
}
}
}
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package Fb_package;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
*
* @author user
*/
class FbGraphMessage implements Writable{
private double katzWeight;
public FbGraphMessage(){}
public FbGraphMessage(double weight){
this();
this.katzWeight = weight;
}
public FbGraphMessage(FbGraphMessage other){
this(other.katzWeight);
}
@Override
public void write(DataOutput d) throws IOException {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public void readFields(DataInput di) throws IOException {
katzWeight= di.readDouble();
}
public double getKatzWeight() {
return katzWeight;
}
public void setKatzWeight(double katzWeight) {
this.katzWeight = katzWeight;
}
}
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package Fb_package;
import java.io.IOException;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.TextVertexOutputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
*
* @author user
*/
public class FbGraphOutputFormat extends
TextVertexOutputFormat<Text,FbGraphState,DoubleWritable>{
@Override
public TextVertexWriter createVertexWriter(TaskAttemptContext tac) throws
IOException, InterruptedException {
return new FbGraphWriter();
}
private static class FbGraphWriter extends TextVertexWriter {
public FbGraphWriter() {
}
@Override
public void writeVertex(Vertex<Text, FbGraphState, DoubleWritable>
vertex) throws IOException, InterruptedException {
StringBuilder b = new StringBuilder();
b.append(vertex.getValue().getValue());
b.append("\t");
b.append(vertex.getValue().getNodeWeight());
b.append("\t");
for (Edge<Text,DoubleWritable> e: vertex.getEdges()){
b.append(e.getTargetVertexId());
b.append(":");
b.append(e.getValue());
b.append(",");
}
b.setLength(b.length() - 1);
getRecordWriter().write(vertex.getId(), new
Text(b.toString()));
}
}
}
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package Fb_package;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
*
* @author user
*/
public class FbGraphState implements Writable {
private String value;
private double nodeWeight;
private String birthday;
private String education_classes;
private String education_concentration_id;
private String education_degree_id;
private String education_school_id;
private String education_with_id;
private String education_type_id;
private String education_year_id;
private String first_name;
private String gender;
private String hometown_id;
private String languages_id;
private String last_name;
private String locale;
private String location_id;
private String work_employer_id;
private String work_end_date;
private String work_location;
private String work_position;
private String work_start_date;
private String work_with;
private String middle_name;
private String politcal ;
public FbGraphState() {
}
public void write(DataOutput d) throws IOException {
WritableUtils.writeString(d, value);
d.writeDouble(nodeWeight);
}
@Override
public void readFields(DataInput di) throws IOException {
value = WritableUtils.readString(di);
nodeWeight = di.readLong();
}
public void setValue(String value) {
this.value = value;
}
public void setNodeWeight(double nodeWeight) {
this.nodeWeight = nodeWeight;
}
public String getValue() {
return value;
}
public double getNodeWeight() {
return nodeWeight;
}
}
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package Fb_package;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.MutableEdge;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
/**
*
* @author user
*/
public class FbGraphVertex implements Vertex<Text,FbGraphState,DoubleWritable>{
@Override
public void initialize(Text i, FbGraphState v, Iterable<Edge<Text,
DoubleWritable>> itrbl) {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public void initialize(Text i, FbGraphState v) {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public Text getId() {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public FbGraphState getValue() {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public void setValue(FbGraphState v) {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public void voteToHalt() {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public int getNumEdges() {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public Iterable<Edge<Text, DoubleWritable>> getEdges() {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public void setEdges(Iterable<Edge<Text, DoubleWritable>> itrbl) {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public Iterable<MutableEdge<Text, DoubleWritable>> getMutableEdges() {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public DoubleWritable getEdgeValue(Text i) {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public void setEdgeValue(Text i, DoubleWritable e) {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public Iterable<DoubleWritable> getAllEdgeValues(Text i) {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public void addEdge(Edge<Text, DoubleWritable> edge) {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public void removeEdges(Text i) {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public void unwrapMutableEdges() {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public void wakeUp() {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public boolean isHalted() {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public ImmutableClassesGiraphConfiguration<Text, FbGraphState,
DoubleWritable> getConf() {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
@Override
public void setConf(ImmutableClassesGiraphConfiguration<Text, FbGraphState,
DoubleWritable> icgc) {
throw new UnsupportedOperationException("Not supported yet."); //To
change body of generated methods, choose Tools | Templates.
}
}
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package Fb_package;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
*
* @author user
*/
public class KatzReduce extends BasicComputation<Text, FbGraphState,
DoubleWritable, FbGraphMessage> {
@Override
public void compute(
Vertex<Text, FbGraphState, DoubleWritable> vertex,
Iterable<FbGraphMessage> messages) throws IOException {
// nodes that have no edges send themselves a message on the step 0
if (getSuperstep() == 0 && !vertex.getEdges().iterator().hasNext()) {
vertex.voteToHalt();
return;
}
if (getSuperstep() == 0) {
sendMessageToAllEdges(vertex, new FbGraphMessage(0.5));
vertex.voteToHalt();
return;
}
if (getSuperstep()
== 1) {
double sum = 0;
List<Text> mEdges = new ArrayList<>();
for (FbGraphMessage message : messages) {
sum += message.getKatzWeight();
}
vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() +
sum);
sendMessageToAllEdges(vertex, new FbGraphMessage(0.25));
vertex.voteToHalt();
return;
}
if (getSuperstep() == 2) {
double sum = 0;
List<Text> mEdges = new ArrayList<>();
for (FbGraphMessage message : messages) {
sum += message.getKatzWeight();
}
vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() +
sum);
sendMessageToAllEdges(vertex, new FbGraphMessage(0.125));
vertex.voteToHalt();
return;
}
if (getSuperstep() == 3) {
double sum = 0;
for (FbGraphMessage message : messages) {
sum += message.getKatzWeight();
}
vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() +
sum);
sendMessageToAllEdges(vertex, new FbGraphMessage(0.062));
vertex.voteToHalt();
return;
}
if (getSuperstep() == 4) {
double sum = 0;
for (FbGraphMessage message : messages) {
sum += message.getKatzWeight();
}
vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() +
sum);
sendMessageToAllEdges(vertex, new FbGraphMessage(0.031));
vertex.voteToHalt();
return;
}
if (getSuperstep()
== 5) {
double sum = 0;
for (FbGraphMessage message : messages) {
sum += message.getKatzWeight();
}
vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() +
sum);
sendMessageToAllEdges(vertex, new FbGraphMessage(0.015));
vertex.voteToHalt();
return;
}
if (getSuperstep()
> 5) {
double sum = 0;
for (FbGraphMessage message : messages) {
sum += message.getKatzWeight();
}
vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() +
sum);
vertex.voteToHalt();
return;
}
}
}