Hi,
  I have designed a mapreduce algorithm for all pairs shortest paths
problem. As a part of the implementation of this algorithm, I have written
the following mapreduce job. It is running well and producing desired output
in pseudo distributed mode. I have used a machine with ubuntu 8.04 and
hadoop-0.18.3 to run the job in pseudo distributed mode. When I tried to run
the same program on a cluster of 4 machines(each running Redhat linux 9)
with the same version of hadoop(hadooop-0.18.3), the program is not giving
any errors but its not giving any output as well(The output file is blank).
This is the first time I am facing this kind of problem.
I am attaching the jar file of the program and sample inputs: out1 and out2
as well.(The program need to read input from these two files)
I have searched the archive but didn't find any mail mentioning this
problem. I have googled, but it was of no use.
I am not able to find out what am I missing in the code.
Should I be using hadoop-0.20?

The program is below:

To compile the program, please save it as GraphMultiplication1.java. To run
the program please use the following command:

bin/hadoop jar ~/graphmultiplication1.jar GraphMultiplication1 out2,out1
out3 5 #maps #reducers

'5' is need in the above command, as it indicates the number of nodes in the
graph.

import java.lang.Integer;
import java.util.TreeMap;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import java.lang.StringBuilder;
import java.util.StringTokenizer;
import java.util.Random;
import java.lang.String;
import java.util.TreeSet;
import java.util.HashMap;
import java.util.Arrays;
import org.apache.hadoop.conf.
Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapred.lib.IdentityReducer;


public class GraphMultiplication1 extends Configured implements Tool {

    private static int nodes;
    public static class MapClass extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {

    private Text intermediatekey = new Text();
    private Text intermediatevalue = new Text();


    /** map method. Read a line of the form (from to weight). Toss a coin */
    public void map(LongWritable key, Text value, OutputCollector<Text,
Text> output,
            Reporter reporter) throws IOException
      {
        int i;
        String row = value.toString();
        if(row.indexOf("T")!=-1)
        {
          intermediatevalue.set(row);
          for(i=1;i<=nodes;i++)
          {
            intermediatekey.set(String.valueOf(i));
            output.collect(intermediatekey,intermediatevalue);
          }//for(i=1;i<=10680;i++)
        }//if(row.indexOf("T")!=-1)
        else
        {
           String[] row1=row.split("\t");
           intermediatekey.set(row1[0]);
           intermediatevalue.set(row1[1]);
           output.collect(intermediatekey,intermediatevalue);
        }//else
    }//end of map method
  }//end of MapClass
    /**
     *  The reducer class.

     */
  public static class Reduce extends MapReduceBase implements Reducer<Text,
Text, Text, Text>
  {
      TreeSet ts = new TreeSet();
      Text value = new Text();
      public void reduce(Text key, Iterator<Text>
values,OutputCollector<Text, Text> output,
              Reporter reporter) throws IOException {
          String mul = new String();
      int[] arr1;
      int[] arr2;

          while(values.hasNext())
          {
            String st = new String();
            st=values.next().toString();
            if(st.indexOf("T")!=-1)
              ts.add(st);
            else
              mul=st;
          }//while(values.hasNext())

          Iterator itr = ts.iterator();
          while(itr.hasNext())
          {
            HashMap hm = new HashMap();
            String[] str = ((String)itr.next()).split("\t");
            String[] str1 = str[1].split(" ");

            arr1= new int[nodes];
            arr2= new int[nodes];


            for(int i=1; i < str1.length ; i++)
            {
              String[] temp = str1[i].split(":");
              arr1[Integer.parseInt(temp[0])-1]=Integer.parseInt(temp[1]);
            }//for(int i=0; i < str[1].length ; i++)

            String[] str2 = mul.split(" ");
            for(int i=0; i < str2.length ; i++)
            {
              String[] temp = str2[i].split(":");

arr2[Integer.parseInt(temp[0])-1]=Integer.parseInt(temp[1]);
            }//for(int i=0; i < str2.lenght ; i++)

            for(int i=0; i < nodes ; i++)
            {
              if(arr1[i]==0)
                arr1[i]=10000000;
              if(arr2[i]==0)
                arr2[i]=10000000;
            }//for(int i=0; i < nodes ; i++)
            arr1[Integer.parseInt(str[0])-1]=0;
            arr2[Integer.parseInt(key.toString())-1]=0;


           int weight=10000000;
           for(int i=0;i < nodes ; i++)
           {
             if(arr1[i]!=10000000 && arr2[i]!=10000000)
             {
               int t=(arr1[i]+arr2[i]);
               if(weight > t )
                weight=t;
             }
           }//for(int i=0;i < nodes ; i++)
           if(weight!=10000000)
           {
            output.collect(key,new Text(str[0]+":"+String.valueOf(weight)));
           }
  //         output.collect(key,new Text(str[0]+":"+s1+"x"+s2));
          }//while(itr.hasNext())
    }//public void reduce(Text key, Iterator<Text>
values,OutputCollector<Text, Text> output,Reporter reporter) throws
IOException


  }//public static class Reduce extends MapReduceBase implements
Reducer<Text, Text, Text, Text>

    protected static int printUsage ()
    {
    System.out.println("GraphMultiplication1 parameters: inputdir outputdir
#mappers #reducers");
    ToolRunner.printGenericCommandUsage(System.out);
    return -1;
    }

    protected static void printWarning()
    {
        System.out.print("Line with neither 2 nor 3 tokens \n");
        ToolRunner.printGenericCommandUsage(System.out);

    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new
GraphMultiplication1(), args);
        System.exit(res);
    }

    public int run(String[] args) throws Exception {

        if( args.length != 5)
        {
            System.err.println(" 5 command line arguments needed");
            System.err.println(" You gave instead "+args.length);
            System.exit(-2);
        }
        for( int i = 0 ; i < args.length; i++)
            System.out.println("Argument "+(i+1)+": "+args[i]);

         JobConf conf = new JobConf(getConf(), GraphMultiplication1.class);
         conf.setJobName("GraphMultiplication1");
         conf.setOutputKeyClass(Text.class);
         conf.setOutputValueClass(Text.class);
         conf.setMapperClass(MapClass.class);
         conf.setReducerClass(Reduce.class);
         Path outdir = new Path(args[1]);
         nodes= Integer.parseInt(args[2]);
         FileInputFormat.setInputPaths(conf, args[0]);
         FileOutputFormat.setOutputPath(conf, outdir);
         conf.setNumMapTasks(Integer.parseInt(args[3]));
         conf.setNumReduceTasks(Integer.parseInt(args[4]));

         Date startTime = new Date();
         System.out.println("Job started: " + startTime);
         JobClient.runJob(conf);
         Date endTime = new Date();
         System.out.println("Job ended: " + endTime);
         System.out.println("The job took " +
                       (endTime.getTime() - startTime.getTime()) /1000 +
                       " seconds.");
         return 0;
    }

}

Attachment: graphmultiplication1.jar
Description: application/java-archive

Reply via email to