I have a few more questions...
1. When I run Junit do I have to run it in a pc with hadoop/Giraph installed? If so do I have to start hadoop before I ran the programm? 2.I made certain that nodes and edges are added correctly in the graph. But I get a null result after the job is run. And also I added System.out.println() in many places in my KatzReduce.java and nothing prints. As if it never got in to execute the job. *How can I fix this?**
*
public class KatzReduce extends BasicComputation<Text, FbGraphState, DoubleWritable, FbGraphMessage> {
....

if (getSuperstep() == 0) {
            sendMessageToAllEdges(vertex, new FbGraphMessage(0.5));
            vertex.voteToHalt();
System.out.println("node "+vertex.getValue().getValue()+" got superstep 0");
            return;
        }
        if (getSuperstep() == 1) {
            double sum = 0;
            List<Text> mEdges = new ArrayList<>();
            for (FbGraphMessage message : messages) {
                sum += message.getKatzWeight();
            }
vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() + sum);
            sendMessageToAllEdges(vertex, new FbGraphMessage(0.25));
            vertex.voteToHalt();
System.out.println("node "+vertex.getValue().getValue()+" got superstep 1");
            return;
        }
......
}

KatzReduceTest.java
public void testCompute() throws Exception {
    System.out.print("compute");
String[] tinyGraph = {"1#2,3#54,78","2#1,3,4#78,63","3#1,2,4#77,125","4#2,3#77,63" };
        // This is where you configure your job
        GiraphConfiguration conf = new GiraphConfiguration();
        conf.setComputationClass(KatzReduce.class);
      //  conf.setMasterComputeClass();


        conf.setVertexInputFormatClass(FbGraphInputFormat.class);
conf.setVertexOutputFormatClass(FbGraphOutputFormat.class);
        // Run and print results
Iterable<String> results = InternalVertexRunner.run(conf, tinyGraph, tinyGraph);
        for (String result : results) {
System.out.println(result);****<------result is always NULL ------NullPointerException assertEquals(result,"1");//will always fail cause I dont know the output format yet
        }
    }


output of node/edge list creation
compute1#2,3#54,78
added an edge id: 2 weight 1.0
added an edge id: 3 weight 1.0
created a vertex with value: 1 numOfedges: 2
2#1,3,4#78,63
added an edge id: 1 weight 1.0
added an edge id: 4 weight 1.0
added an edge id: 3 weight 1.0
created a vertex with value: 2 numOfedges: 3
3#1,2,4#77,125
added an edge id: 2 weight 1.0
added an edge id: 1 weight 1.0
added an edge id: 4 weight 1.0
created a vertex with value: 3 numOfedges: 3
4#2,3#77,63
added an edge id: 2 weight 1.0
added an edge id: 3 weight 1.0
created a vertex with value: 4 numOfedges: 2
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package Fb_package;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
 *
 * @author user
 */
public class FbGraphInputFormat extends TextVertexInputFormat<Text, 
FbGraphState, DoubleWritable> {

    @Override
    public TextVertexReader createVertexReader(InputSplit is, 
TaskAttemptContext tac) throws IOException {
        return new FbGraphReader();
    }

    protected class FbGraphReader extends TextVertexReader {

        @Override
        public boolean nextVertex() throws IOException, InterruptedException {
            return getRecordReader().nextKeyValue();
        }

        @Override
        public Vertex<Text, FbGraphState, DoubleWritable> getCurrentVertex() 
throws IOException, InterruptedException {
            String line = getRecordReader().getCurrentValue().toString();
            String[] tokens = line.trim().split("#");
            if (tokens.length < 2) {
                throw new IllegalArgumentException("Invalid line: (" + line + 
")");
            }
            FbGraphState state = new FbGraphState();
            Text id = new Text(tokens[0]);
            state.setValue(id.toString());
            state.setNodeWeight(1.0);

            Map<Text, DoubleWritable> edgeMap = new HashMap<>();
            ArrayList<Edge<Text, DoubleWritable>> edgesList = new ArrayList<>();
            String[] edges = (tokens.length > 2) ? tokens[1].split(",") : new 
String[0];
            for (int i = 0; i < edges.length; i++) {
                double weight =  1.0;
                Text edgeKey = new Text(edges[i]);
                edgeMap.put(edgeKey, new DoubleWritable(weight));
                // edgesList.add(EdgeFactory.create(new
                // LongWritable(edgeKey),new LongWritable(weight)));
            }

            for (Map.Entry<Text, DoubleWritable> entry
                    : edgeMap.entrySet()) {
                edgesList.add(EdgeFactory.create(entry.getKey(), 
entry.getValue()));
            }

            Vertex<Text, FbGraphState, DoubleWritable> vertex = 
this.getConf().createVertex();

            vertex.initialize(id, state, edgesList);

            return vertex;
        }

    }
}
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package Fb_package;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;

/**
 *
 * @author user
 */
class FbGraphMessage implements Writable{

    private double katzWeight;

    public FbGraphMessage(){}
    public FbGraphMessage(double weight){
                this();
                
                this.katzWeight = weight;

        }
    public FbGraphMessage(FbGraphMessage other){
                this(other.katzWeight);
        }
    @Override
    public void write(DataOutput d) throws IOException {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public void readFields(DataInput di) throws IOException {
      
                katzWeight= di.readDouble();
                
    }

    public double getKatzWeight() {
        return katzWeight;
    }

    public void setKatzWeight(double katzWeight) {
        this.katzWeight = katzWeight;
    }

    
}
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package Fb_package;

import java.io.IOException;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.TextVertexOutputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
 *
 * @author user
 */
public class FbGraphOutputFormat  extends 
TextVertexOutputFormat<Text,FbGraphState,DoubleWritable>{
    

    @Override
    public TextVertexWriter createVertexWriter(TaskAttemptContext tac) throws 
IOException, InterruptedException {
       return new FbGraphWriter();
    }

    private static class FbGraphWriter extends TextVertexWriter {

        public FbGraphWriter() {
        }

        @Override
        public void writeVertex(Vertex<Text, FbGraphState, DoubleWritable> 
vertex) throws IOException, InterruptedException {
           StringBuilder b = new StringBuilder();
                        b.append(vertex.getValue().getValue());
                        b.append("\t");
                        b.append(vertex.getValue().getNodeWeight());
                        b.append("\t");
                        
                        for (Edge<Text,DoubleWritable> e: vertex.getEdges()){
                                b.append(e.getTargetVertexId());
                                b.append(":");
                                b.append(e.getValue());
                                b.append(",");
                        }
                        b.setLength(b.length() - 1);
                        
                        getRecordWriter().write(vertex.getId(), new 
Text(b.toString()));
                        
        }
    }
}
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package Fb_package;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;

/**
 *
 * @author user
 */
public class FbGraphState implements Writable {

    private String value;
    private double nodeWeight;
    private String birthday;
    private String education_classes;
    private String education_concentration_id;
    private String education_degree_id;
    private String education_school_id;
    private String education_with_id;
    private String education_type_id;
    private String education_year_id;
    private String first_name;
    private String gender;
    private String hometown_id;
    private String languages_id;
    private String last_name;
    private String locale;
    private String location_id;
    private String work_employer_id;
    private String work_end_date;
    private String work_location;
    private String work_position;
    private String work_start_date;
    private String work_with;
    private String middle_name;
    private String politcal ;
    
    public FbGraphState() {
    }

    public void write(DataOutput d) throws IOException {
        WritableUtils.writeString(d, value);
        d.writeDouble(nodeWeight);

    }

    @Override
    public void readFields(DataInput di) throws IOException {
        value = WritableUtils.readString(di);
        nodeWeight = di.readLong();

    }

    public void setValue(String value) {
        this.value = value;
    }

    public void setNodeWeight(double nodeWeight) {
        this.nodeWeight = nodeWeight;
    }

    public String getValue() {
        return value;
    }

    public double getNodeWeight() {
        return nodeWeight;
    }

}
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package Fb_package;

import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.MutableEdge;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;

/**
 *
 * @author user
 */
public class FbGraphVertex implements Vertex<Text,FbGraphState,DoubleWritable>{

    @Override
    public void initialize(Text i, FbGraphState v, Iterable<Edge<Text, 
DoubleWritable>> itrbl) {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public void initialize(Text i, FbGraphState v) {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public Text getId() {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public FbGraphState getValue() {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public void setValue(FbGraphState v) {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public void voteToHalt() {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public int getNumEdges() {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public Iterable<Edge<Text, DoubleWritable>> getEdges() {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public void setEdges(Iterable<Edge<Text, DoubleWritable>> itrbl) {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public Iterable<MutableEdge<Text, DoubleWritable>> getMutableEdges() {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public DoubleWritable getEdgeValue(Text i) {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public void setEdgeValue(Text i, DoubleWritable e) {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public Iterable<DoubleWritable> getAllEdgeValues(Text i) {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public void addEdge(Edge<Text, DoubleWritable> edge) {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public void removeEdges(Text i) {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public void unwrapMutableEdges() {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public void wakeUp() {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public boolean isHalted() {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public ImmutableClassesGiraphConfiguration<Text, FbGraphState, 
DoubleWritable> getConf() {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }

    @Override
    public void setConf(ImmutableClassesGiraphConfiguration<Text, FbGraphState, 
DoubleWritable> icgc) {
        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
    }
    
}
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package Fb_package;

import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 *
 * @author user
 */
public class KatzReduce extends BasicComputation<Text, FbGraphState, 
DoubleWritable, FbGraphMessage> {

    @Override

    public void compute(
            Vertex<Text, FbGraphState, DoubleWritable> vertex,
            Iterable<FbGraphMessage> messages) throws IOException {

        // nodes that have no edges send themselves a message on the step 0
        if (getSuperstep() == 0 && !vertex.getEdges().iterator().hasNext()) {
            vertex.voteToHalt();
            return;
        }
        if (getSuperstep() == 0) {
            sendMessageToAllEdges(vertex, new FbGraphMessage(0.5));
            vertex.voteToHalt();
            return;

        }

        if (getSuperstep()
                == 1) {
            double sum = 0;
            List<Text> mEdges = new ArrayList<>();
            for (FbGraphMessage message : messages) {
                sum += message.getKatzWeight();
            }
            vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() + 
sum);
            sendMessageToAllEdges(vertex, new FbGraphMessage(0.25));
            vertex.voteToHalt();
            return;

        }

        if (getSuperstep() == 2) {
            double sum = 0;
            List<Text> mEdges = new ArrayList<>();
            for (FbGraphMessage message : messages) {
                sum += message.getKatzWeight();
            }
            vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() + 
sum);
            sendMessageToAllEdges(vertex, new FbGraphMessage(0.125));

            vertex.voteToHalt();
            return;

        }

        if (getSuperstep() == 3) {
            double sum = 0;
            for (FbGraphMessage message : messages) {
                sum += message.getKatzWeight();
            }
            vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() + 
sum);

            sendMessageToAllEdges(vertex, new FbGraphMessage(0.062));

            vertex.voteToHalt();
            return;

        }

        if (getSuperstep() == 4) {
            double sum = 0;
            for (FbGraphMessage message : messages) {
                sum += message.getKatzWeight();
            }
            vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() + 
sum);
            sendMessageToAllEdges(vertex, new FbGraphMessage(0.031));

            vertex.voteToHalt();
            return;

        }

        if (getSuperstep()
                == 5) {
            double sum = 0;
            for (FbGraphMessage message : messages) {
                sum += message.getKatzWeight();
            }
            vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() + 
sum);
            sendMessageToAllEdges(vertex, new FbGraphMessage(0.015));
            vertex.voteToHalt();
            return;

        }
         if (getSuperstep()
                > 5) {
            double sum = 0;
            for (FbGraphMessage message : messages) {
                sum += message.getKatzWeight();
            }
            vertex.getValue().setNodeWeight(vertex.getValue().getNodeWeight() + 
sum);
            vertex.voteToHalt();
            return;

        }
    }
}

Reply via email to