Hi,

I tried looking into the logs, and there is no error message which could have 
indicated the cause of my map-reduce being stuck at 0%.

But, I wonder if number of nodes being used affects the speed of a map-reduce 
job?

Earlier I was running it by requesting 2 nodes with 8 processors, now I tried 
running it on 4 nodes and it works slightly  faster. That is, from 0% map ( for 
over 6 hours and after too) to 1 % map per hour.

Can this be a reason?

Best,
SS

On Jul 20, 2012, at 9:24 AM, Harsh J wrote:

> SS,
> 
> Is your job not progressing at all (i.e. continues to be at 0-progress
> for hours), or does it fail after zero progress?
> 
> I'd try adding logging to various important points of the map/reduce
> functions and see whats taking it so long, or if its getting stuck at
> something. Logs are observable for each task attempt via the JT Web
> UI/etc..
> 
> On Fri, Jul 20, 2012 at 9:43 PM, Shanu Sushmita
> <shanu.sushmit...@gmail.com> wrote:
>> Hi,
>> 
>> I am trying to solve a problem where I need to computed frequencies of words
>> occurring in a file1 from file 2.
>> 
>> For example:
>> text in file1:
>> hadoop
>> user
>> hello
>> world
>> 
>> and text in file2 is:
>> hadoop
>> user
>> hello
>> world
>> hadoop
>> hadoop
>> hadoop
>> user
>> world
>> world
>> world
>> hadoop
>> user
>> hello
>> 
>> so the output should be:
>> hadoop 5
>> user 3
>> hello 2
>> world 4
>> 
>> I read that distributed caching is a good way to do such jobs.Size of my
>> files are:
>> File1 = 17GB
>> File2 = 3 MB
>> 
>> And here is my code:
>> 
>> 
>> 
>> 
>> import org.apache.hadoop.fs.*;
>> import org.apache.hadoop.conf.*;
>> import org.apache.hadoop.io.*;
>> import org.apache.hadoop.mapred.*;
>> import org.apache.hadoop.util.*;
>> import java.util.*;
>> import java.io.*;
>> import org.apache.hadoop.filecache.*;
>> import java.net.*;
>> 
>> public class RepeatFreqTest
>> {
>> 
>> 
>> public static class Map extends MapReduceBase implements
>> Mapper<LongWritable, Text, Text, IntWritable> {
>>            private final static IntWritable one = new IntWritable(1);
>>            private Text word = new Text();
>>             private HashSet<String> dyads = new HashSet<String>();
>> 
>> 
>>  public void configure(JobConf conf)
>>  {
>> 
>> 
>> Path[] cacheFiles = new Path[0];
>>   try {
>> 
>> 
>>  cacheFiles = DistributedCache.getLocalCacheFiles(conf);
>>  } // end of try
>> 
>>  catch (IOException ioe) {
>>            System.err.println("Caught exception while getting cached files:
>> " + StringUtils.stringifyException(ioe));
>>   }// end of catch
>> 
>>  for (Path dyadPath : cacheFiles)
>>  {
>> 
>>        loadDyads(dyadPath);
>>  } // end of for cachePath
>> 
>> 
>>  } // end of configure
>> 
>> private void loadDyads(Path dyadPath)
>> {
>> try
>> {
>>                BufferedReader wordReader = new BufferedReader(new
>> FileReader(dyadPath.toString()));
>>                String line = null;
>>                while ((line = wordReader.readLine()) != null) {
>>       dyads.add(line);
>> 
>>      } // end of while
>> 
>>      wordReader.close();
>> 
>> }// end of try
>> catch (IOException ioe) {
>>      System.err.println("IOException reading from distributed cache");
>>      } // end of catch
>> 
>> }// end of loadDyads()
>> 
>> 
>> 
>>  /* actual map() method, etc go here */
>> 
>> 
>>                        @Override
>>             public void map(LongWritable key, Text value,
>> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
>> IOException
>>             {
>> 
>> 
>>                        // dyad, ut and year from all dyads (big file)
>> file!!!
>> 
>> 
>>                        String line = value.toString();
>>                        String[] tokens = line.split("\\|");
>>                        String ut1 = tokens[0].trim();
>>                        String dyad1 = tokens[1].trim();
>>                        String year1 = tokens[2].trim();
>>                        int y1 = Integer.parseInt(year1);
>> 
>> 
>>                        // dyad, ut and year from sample dyads file (sample
>> file stored in the memory)!!!
>> 
>> 
>>                        Iterator it = dyads.iterator();
>>                        while(it.hasNext())
>>                        {
>>                                //Text word = new Text();
>>                                String setline = it.next().toString();
>> 
>>                                String[] tokens2 = setline.split("\\|");
>>                                String ut2 = tokens2[0].trim();
>>                                String dyad2 = tokens2[1].trim();
>>                                String year2 = tokens2[2].trim();
>>                                int y2 = Integer.parseInt(year2);
>> 
>> 
>> 
>> 
>>                                if(dyad1.equalsIgnoreCase(dyad2))
>>                                {
>>                                        if(!(ut1.equalsIgnoreCase(ut2)))
>>                                        {
>>                                                if(y1<=y2)
>>                                                {
>> 
>>                                                        word.set(setline);
>>                                                        output.collect(word,
>> one);
>>                                                }
>> 
>>                                        } // end of if ut1!=ut2
>> 
>>                                } //
>> 
>> 
>>                        }// end of while
>> 
>> 
>> } // end of override map
>> } // end of big Map class
>> 
>> 
>> public static class Reduce extends MapReduceBase implements Reducer<Text,
>> IntWritable, Text, IntWritable> {
>> 
>> @Override
>>             public void reduce(Text key, Iterator<IntWritable> values,
>> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
>> IOException {
>>               int sum = 0;
>>               while (values.hasNext()) {
>>                 sum += values.next().get();
>> 
>>               } // end of while
>>               output.collect(key, new IntWritable(sum));
>>             } // end of override reduce
>>           } // end of Big Reduce
>> 
>> 
>> public static void main(String[] args) throws Exception {
>> 
>> JobConf conf = new JobConf(RepeatFreqTest.class);
>> conf.setJobName("Repeat");
>> 
>> conf.setOutputKeyClass(Text.class);
>> conf.setOutputValueClass(IntWritable.class);
>> 
>> conf.setMapperClass(Map.class);
>> conf.setCombinerClass(Reduce.class);
>> conf.setReducerClass(Reduce.class);
>> 
>> conf.setInputFormat(TextInputFormat.class);
>> conf.setOutputFormat(TextOutputFormat.class);
>> 
>> FileInputFormat.setInputPaths(conf, new Path(args[0]));
>> FileOutputFormat.setOutputPath(conf, new Path(args[1]));
>> 
>> 
>> 
>> DistributedCache.addCacheFile(new
>> Path("/user/ss/cacheFiles/File1.txt").toUri(), conf);
>> 
>> JobClient.runJob(conf);
>> 
>> }// end of main
>> 
>> } // end of class
>> 
>> 
>> And I put my File1.txt and File2.txt in hdfs as follows:
>> 
>> $HADOOP_HOME/bin/hadoop fs -mkdir input
>> $HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles
>> $HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input
>> $HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles
>> 
>> My problem is that my code compiles fine, but it would just not proceed from
>> map 0% reduce 0% stage.
>> 
>> What I am doing wrong?
>> 
>> Any suggestion would be of great help.
>> 
>> Best,
>> SS
> 
> 
> 
> -- 
> Harsh J

Reply via email to