package org.mlbio.hadoop.test;

/**
 * 
 * Reads a graph file and just outputs which host a vertex was loaded on
 * Just one superstep
 * 
 */

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;

public class GraphTester {

    public static class GraphTrainingVertex extends
            Vertex<Text, IntWritable, Text> {

        public void testConfigSame(Configuration c1, Configuration c2) {
            if (c1.equals(c2)) {
                System.out.println("Configurations are the same");
            } else {
                System.out.println("Configuration are NOT the same");
            }
        }

        @Override
        public void setup(Configuration conf) {
            // testConfigSame(conf, this.getConf());
        }

        @Override
        public void compute(Iterable<Text> messages) throws IOException {
            String host = this.getPeer().getPeerName();
            setValue(new Text(host));
            voteToHalt();
        }

    }

    public static class MyVertexReader
            extends
            VertexInputReader<LongWritable, Text, Text, IntWritable, Text> {
        
        /*
         * Graph Input Format
         * <graph_id:text> <value:int>  [<graph_neighbor_id:text> ]+ (one or more neighbors) 
         */

        @Override
        public boolean parseVertex(LongWritable key, Text value,
                Vertex<Text, IntWritable, Text> vertex) throws Exception {
            String[] split = value.toString().split("\\s+");
            vertex.setVertexID(new Text(split[0]));
            vertex.setValue( new Text(split[1]));
            for (int i = 2; i < split.length; i++) {
                vertex.addEdge(new Edge<Text, IntWritable>(new Text(split[i]),
                        new IntWritable(1)));
            }
            return true;
        }
    }

    /*
     * USAGE: <prog> <input_path> <output_path>
     * sample input/output in testdata/graphTesting
     * 
     */
    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {

        String inpath = args[0];
        String outpath = args[1];

        HamaConfiguration conf = new HamaConfiguration();
        GraphJob gJob = new GraphJob(conf, GraphTester.class);
        gJob.setJobName("Graph Testing");

        gJob.setInputPath(new Path(inpath));
        gJob.setOutputPath(new Path(outpath));
        gJob.setInputFormat(TextInputFormat.class);
        gJob.setOutputFormat(TextOutputFormat.class);
        gJob.setInputKeyClass(LongWritable.class);
        gJob.setInputValueClass(Text.class);
        gJob.setOutputKeyClass(Text.class);
        gJob.setOutputValueClass(IntWritable.class);

        gJob.setMaxIteration(3);
        gJob.setVertexClass(GraphTrainingVertex.class);
        gJob.setVertexIDClass(Text.class);
        gJob.setVertexValueClass(Text.class);
        gJob.setEdgeValueClass(IntWritable.class);
        gJob.setVertexInputReaderClass(MyVertexReader.class);

        if (gJob.waitForCompletion(true)) {
            System.out.println("Job Finished");
        }

    }

}
