Hi, I have designed a mapreduce algorithm for all pairs shortest paths problem. As a part of the implementation of this algorithm, I have written the following mapreduce job. It is running well and producing desired output in pseudo distributed mode. I have used a machine with ubuntu 8.04 and hadoop-0.18.3 to run the job in pseudo distributed mode. When I tried to run the same program on a cluster of 4 machines(each running Redhat linux 9) with the same version of hadoop(hadooop-0.18.3), the program is not giving any errors but its not giving any output as well(The output file is blank). This is the first time I am facing this kind of problem. I am attaching the jar file of the program and sample inputs: out1 and out2 as well.(The program need to read input from these two files) I have searched the archive but didn't find any mail mentioning this problem. I have googled, but it was of no use. I am not able to find out what am I missing in the code. Should I be using hadoop-0.20?
The program is below: To compile the program, please save it as GraphMultiplication1.java. To run the program please use the following command: bin/hadoop jar ~/graphmultiplication1.jar GraphMultiplication1 out2,out1 out3 5 #maps #reducers '5' is need in the above command, as it indicates the number of nodes in the graph. import java.lang.Integer; import java.util.TreeMap; import java.io.IOException; import java.util.Date; import java.util.Iterator; import java.lang.StringBuilder; import java.util.StringTokenizer; import java.util.Random; import java.lang.String; import java.util.TreeSet; import java.util.HashMap; import java.util.Arrays; import org.apache.hadoop.conf. Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.mapred.lib.IdentityReducer; public class GraphMultiplication1 extends Configured implements Tool { private static int nodes; public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { private Text intermediatekey = new Text(); private Text intermediatevalue = new Text(); /** map method. Read a line of the form (from to weight). Toss a coin */ public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { int i; String row = value.toString(); if(row.indexOf("T")!=-1) { intermediatevalue.set(row); for(i=1;i<=nodes;i++) { intermediatekey.set(String.valueOf(i)); output.collect(intermediatekey,intermediatevalue); }//for(i=1;i<=10680;i++) }//if(row.indexOf("T")!=-1) else { String[] row1=row.split("\t"); intermediatekey.set(row1[0]); intermediatevalue.set(row1[1]); output.collect(intermediatekey,intermediatevalue); }//else }//end of map method }//end of MapClass /** * The reducer class. */ public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { TreeSet ts = new TreeSet(); Text value = new Text(); public void reduce(Text key, Iterator<Text> values,OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String mul = new String(); int[] arr1; int[] arr2; while(values.hasNext()) { String st = new String(); st=values.next().toString(); if(st.indexOf("T")!=-1) ts.add(st); else mul=st; }//while(values.hasNext()) Iterator itr = ts.iterator(); while(itr.hasNext()) { HashMap hm = new HashMap(); String[] str = ((String)itr.next()).split("\t"); String[] str1 = str[1].split(" "); arr1= new int[nodes]; arr2= new int[nodes]; for(int i=1; i < str1.length ; i++) { String[] temp = str1[i].split(":"); arr1[Integer.parseInt(temp[0])-1]=Integer.parseInt(temp[1]); }//for(int i=0; i < str[1].length ; i++) String[] str2 = mul.split(" "); for(int i=0; i < str2.length ; i++) { String[] temp = str2[i].split(":"); arr2[Integer.parseInt(temp[0])-1]=Integer.parseInt(temp[1]); }//for(int i=0; i < str2.lenght ; i++) for(int i=0; i < nodes ; i++) { if(arr1[i]==0) arr1[i]=10000000; if(arr2[i]==0) arr2[i]=10000000; }//for(int i=0; i < nodes ; i++) arr1[Integer.parseInt(str[0])-1]=0; arr2[Integer.parseInt(key.toString())-1]=0; int weight=10000000; for(int i=0;i < nodes ; i++) { if(arr1[i]!=10000000 && arr2[i]!=10000000) { int t=(arr1[i]+arr2[i]); if(weight > t ) weight=t; } }//for(int i=0;i < nodes ; i++) if(weight!=10000000) { output.collect(key,new Text(str[0]+":"+String.valueOf(weight))); } // output.collect(key,new Text(str[0]+":"+s1+"x"+s2)); }//while(itr.hasNext()) }//public void reduce(Text key, Iterator<Text> values,OutputCollector<Text, Text> output,Reporter reporter) throws IOException }//public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> protected static int printUsage () { System.out.println("GraphMultiplication1 parameters: inputdir outputdir #mappers #reducers"); ToolRunner.printGenericCommandUsage(System.out); return -1; } protected static void printWarning() { System.out.print("Line with neither 2 nor 3 tokens \n"); ToolRunner.printGenericCommandUsage(System.out); } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new GraphMultiplication1(), args); System.exit(res); } public int run(String[] args) throws Exception { if( args.length != 5) { System.err.println(" 5 command line arguments needed"); System.err.println(" You gave instead "+args.length); System.exit(-2); } for( int i = 0 ; i < args.length; i++) System.out.println("Argument "+(i+1)+": "+args[i]); JobConf conf = new JobConf(getConf(), GraphMultiplication1.class); conf.setJobName("GraphMultiplication1"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.setMapperClass(MapClass.class); conf.setReducerClass(Reduce.class); Path outdir = new Path(args[1]); nodes= Integer.parseInt(args[2]); FileInputFormat.setInputPaths(conf, args[0]); FileOutputFormat.setOutputPath(conf, outdir); conf.setNumMapTasks(Integer.parseInt(args[3])); conf.setNumReduceTasks(Integer.parseInt(args[4])); Date startTime = new Date(); System.out.println("Job started: " + startTime); JobClient.runJob(conf); Date endTime = new Date(); System.out.println("Job ended: " + endTime); System.out.println("The job took " + (endTime.getTime() - startTime.getTime()) /1000 + " seconds."); return 0; } }
graphmultiplication1.jar
Description: application/java-archive