SS, Is your job not progressing at all (i.e. continues to be at 0-progress for hours), or does it fail after zero progress?
I'd try adding logging to various important points of the map/reduce functions and see whats taking it so long, or if its getting stuck at something. Logs are observable for each task attempt via the JT Web UI/etc.. On Fri, Jul 20, 2012 at 9:43 PM, Shanu Sushmita <shanu.sushmit...@gmail.com> wrote: > Hi, > > I am trying to solve a problem where I need to computed frequencies of words > occurring in a file1 from file 2. > > For example: > text in file1: > hadoop > user > hello > world > > and text in file2 is: > hadoop > user > hello > world > hadoop > hadoop > hadoop > user > world > world > world > hadoop > user > hello > > so the output should be: > hadoop 5 > user 3 > hello 2 > world 4 > > I read that distributed caching is a good way to do such jobs.Size of my > files are: > File1 = 17GB > File2 = 3 MB > > And here is my code: > > > > > import org.apache.hadoop.fs.*; > import org.apache.hadoop.conf.*; > import org.apache.hadoop.io.*; > import org.apache.hadoop.mapred.*; > import org.apache.hadoop.util.*; > import java.util.*; > import java.io.*; > import org.apache.hadoop.filecache.*; > import java.net.*; > > public class RepeatFreqTest > { > > > public static class Map extends MapReduceBase implements > Mapper<LongWritable, Text, Text, IntWritable> { > private final static IntWritable one = new IntWritable(1); > private Text word = new Text(); > private HashSet<String> dyads = new HashSet<String>(); > > > public void configure(JobConf conf) > { > > > Path[] cacheFiles = new Path[0]; > try { > > > cacheFiles = DistributedCache.getLocalCacheFiles(conf); > } // end of try > > catch (IOException ioe) { > System.err.println("Caught exception while getting cached files: > " + StringUtils.stringifyException(ioe)); > }// end of catch > > for (Path dyadPath : cacheFiles) > { > > loadDyads(dyadPath); > } // end of for cachePath > > > } // end of configure > > private void loadDyads(Path dyadPath) > { > try > { > BufferedReader wordReader = new BufferedReader(new > FileReader(dyadPath.toString())); > String line = null; > while ((line = wordReader.readLine()) != null) { > dyads.add(line); > > } // end of while > > wordReader.close(); > > }// end of try > catch (IOException ioe) { > System.err.println("IOException reading from distributed cache"); > } // end of catch > > }// end of loadDyads() > > > > /* actual map() method, etc go here */ > > > @Override > public void map(LongWritable key, Text value, > OutputCollector<Text, IntWritable> output, Reporter reporter) throws > IOException > { > > > // dyad, ut and year from all dyads (big file) > file!!! > > > String line = value.toString(); > String[] tokens = line.split("\\|"); > String ut1 = tokens[0].trim(); > String dyad1 = tokens[1].trim(); > String year1 = tokens[2].trim(); > int y1 = Integer.parseInt(year1); > > > // dyad, ut and year from sample dyads file (sample > file stored in the memory)!!! > > > Iterator it = dyads.iterator(); > while(it.hasNext()) > { > //Text word = new Text(); > String setline = it.next().toString(); > > String[] tokens2 = setline.split("\\|"); > String ut2 = tokens2[0].trim(); > String dyad2 = tokens2[1].trim(); > String year2 = tokens2[2].trim(); > int y2 = Integer.parseInt(year2); > > > > > if(dyad1.equalsIgnoreCase(dyad2)) > { > if(!(ut1.equalsIgnoreCase(ut2))) > { > if(y1<=y2) > { > > word.set(setline); > output.collect(word, > one); > } > > } // end of if ut1!=ut2 > > } // > > > }// end of while > > > } // end of override map > } // end of big Map class > > > public static class Reduce extends MapReduceBase implements Reducer<Text, > IntWritable, Text, IntWritable> { > > @Override > public void reduce(Text key, Iterator<IntWritable> values, > OutputCollector<Text, IntWritable> output, Reporter reporter) throws > IOException { > int sum = 0; > while (values.hasNext()) { > sum += values.next().get(); > > } // end of while > output.collect(key, new IntWritable(sum)); > } // end of override reduce > } // end of Big Reduce > > > public static void main(String[] args) throws Exception { > > JobConf conf = new JobConf(RepeatFreqTest.class); > conf.setJobName("Repeat"); > > conf.setOutputKeyClass(Text.class); > conf.setOutputValueClass(IntWritable.class); > > conf.setMapperClass(Map.class); > conf.setCombinerClass(Reduce.class); > conf.setReducerClass(Reduce.class); > > conf.setInputFormat(TextInputFormat.class); > conf.setOutputFormat(TextOutputFormat.class); > > FileInputFormat.setInputPaths(conf, new Path(args[0])); > FileOutputFormat.setOutputPath(conf, new Path(args[1])); > > > > DistributedCache.addCacheFile(new > Path("/user/ss/cacheFiles/File1.txt").toUri(), conf); > > JobClient.runJob(conf); > > }// end of main > > } // end of class > > > And I put my File1.txt and File2.txt in hdfs as follows: > > $HADOOP_HOME/bin/hadoop fs -mkdir input > $HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles > $HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input > $HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles > > My problem is that my code compiles fine, but it would just not proceed from > map 0% reduce 0% stage. > > What I am doing wrong? > > Any suggestion would be of great help. > > Best, > SS -- Harsh J