Hi, I tried looking into the logs, and there is no error message which could have indicated the cause of my map-reduce being stuck at 0%.
But, I wonder if number of nodes being used affects the speed of a map-reduce job? Earlier I was running it by requesting 2 nodes with 8 processors, now I tried running it on 4 nodes and it works slightly faster. That is, from 0% map ( for over 6 hours and after too) to 1 % map per hour. Can this be a reason? Best, SS On Jul 20, 2012, at 9:24 AM, Harsh J wrote: > SS, > > Is your job not progressing at all (i.e. continues to be at 0-progress > for hours), or does it fail after zero progress? > > I'd try adding logging to various important points of the map/reduce > functions and see whats taking it so long, or if its getting stuck at > something. Logs are observable for each task attempt via the JT Web > UI/etc.. > > On Fri, Jul 20, 2012 at 9:43 PM, Shanu Sushmita > <shanu.sushmit...@gmail.com> wrote: >> Hi, >> >> I am trying to solve a problem where I need to computed frequencies of words >> occurring in a file1 from file 2. >> >> For example: >> text in file1: >> hadoop >> user >> hello >> world >> >> and text in file2 is: >> hadoop >> user >> hello >> world >> hadoop >> hadoop >> hadoop >> user >> world >> world >> world >> hadoop >> user >> hello >> >> so the output should be: >> hadoop 5 >> user 3 >> hello 2 >> world 4 >> >> I read that distributed caching is a good way to do such jobs.Size of my >> files are: >> File1 = 17GB >> File2 = 3 MB >> >> And here is my code: >> >> >> >> >> import org.apache.hadoop.fs.*; >> import org.apache.hadoop.conf.*; >> import org.apache.hadoop.io.*; >> import org.apache.hadoop.mapred.*; >> import org.apache.hadoop.util.*; >> import java.util.*; >> import java.io.*; >> import org.apache.hadoop.filecache.*; >> import java.net.*; >> >> public class RepeatFreqTest >> { >> >> >> public static class Map extends MapReduceBase implements >> Mapper<LongWritable, Text, Text, IntWritable> { >> private final static IntWritable one = new IntWritable(1); >> private Text word = new Text(); >> private HashSet<String> dyads = new HashSet<String>(); >> >> >> public void configure(JobConf conf) >> { >> >> >> Path[] cacheFiles = new Path[0]; >> try { >> >> >> cacheFiles = DistributedCache.getLocalCacheFiles(conf); >> } // end of try >> >> catch (IOException ioe) { >> System.err.println("Caught exception while getting cached files: >> " + StringUtils.stringifyException(ioe)); >> }// end of catch >> >> for (Path dyadPath : cacheFiles) >> { >> >> loadDyads(dyadPath); >> } // end of for cachePath >> >> >> } // end of configure >> >> private void loadDyads(Path dyadPath) >> { >> try >> { >> BufferedReader wordReader = new BufferedReader(new >> FileReader(dyadPath.toString())); >> String line = null; >> while ((line = wordReader.readLine()) != null) { >> dyads.add(line); >> >> } // end of while >> >> wordReader.close(); >> >> }// end of try >> catch (IOException ioe) { >> System.err.println("IOException reading from distributed cache"); >> } // end of catch >> >> }// end of loadDyads() >> >> >> >> /* actual map() method, etc go here */ >> >> >> @Override >> public void map(LongWritable key, Text value, >> OutputCollector<Text, IntWritable> output, Reporter reporter) throws >> IOException >> { >> >> >> // dyad, ut and year from all dyads (big file) >> file!!! >> >> >> String line = value.toString(); >> String[] tokens = line.split("\\|"); >> String ut1 = tokens[0].trim(); >> String dyad1 = tokens[1].trim(); >> String year1 = tokens[2].trim(); >> int y1 = Integer.parseInt(year1); >> >> >> // dyad, ut and year from sample dyads file (sample >> file stored in the memory)!!! >> >> >> Iterator it = dyads.iterator(); >> while(it.hasNext()) >> { >> //Text word = new Text(); >> String setline = it.next().toString(); >> >> String[] tokens2 = setline.split("\\|"); >> String ut2 = tokens2[0].trim(); >> String dyad2 = tokens2[1].trim(); >> String year2 = tokens2[2].trim(); >> int y2 = Integer.parseInt(year2); >> >> >> >> >> if(dyad1.equalsIgnoreCase(dyad2)) >> { >> if(!(ut1.equalsIgnoreCase(ut2))) >> { >> if(y1<=y2) >> { >> >> word.set(setline); >> output.collect(word, >> one); >> } >> >> } // end of if ut1!=ut2 >> >> } // >> >> >> }// end of while >> >> >> } // end of override map >> } // end of big Map class >> >> >> public static class Reduce extends MapReduceBase implements Reducer<Text, >> IntWritable, Text, IntWritable> { >> >> @Override >> public void reduce(Text key, Iterator<IntWritable> values, >> OutputCollector<Text, IntWritable> output, Reporter reporter) throws >> IOException { >> int sum = 0; >> while (values.hasNext()) { >> sum += values.next().get(); >> >> } // end of while >> output.collect(key, new IntWritable(sum)); >> } // end of override reduce >> } // end of Big Reduce >> >> >> public static void main(String[] args) throws Exception { >> >> JobConf conf = new JobConf(RepeatFreqTest.class); >> conf.setJobName("Repeat"); >> >> conf.setOutputKeyClass(Text.class); >> conf.setOutputValueClass(IntWritable.class); >> >> conf.setMapperClass(Map.class); >> conf.setCombinerClass(Reduce.class); >> conf.setReducerClass(Reduce.class); >> >> conf.setInputFormat(TextInputFormat.class); >> conf.setOutputFormat(TextOutputFormat.class); >> >> FileInputFormat.setInputPaths(conf, new Path(args[0])); >> FileOutputFormat.setOutputPath(conf, new Path(args[1])); >> >> >> >> DistributedCache.addCacheFile(new >> Path("/user/ss/cacheFiles/File1.txt").toUri(), conf); >> >> JobClient.runJob(conf); >> >> }// end of main >> >> } // end of class >> >> >> And I put my File1.txt and File2.txt in hdfs as follows: >> >> $HADOOP_HOME/bin/hadoop fs -mkdir input >> $HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles >> $HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input >> $HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles >> >> My problem is that my code compiles fine, but it would just not proceed from >> map 0% reduce 0% stage. >> >> What I am doing wrong? >> >> Any suggestion would be of great help. >> >> Best, >> SS > > > > -- > Harsh J