Thanks Harsh,

I dont get any error message. It just halts at 0% map 0% reduce for ever. However it works (though still slower) if I reduce the size of the cache file (less than 100 KB). But just halts if I increase the size of the cache file. Even 120KB is not working :-(


SS
On 20 Jul 2012, at 09:24, Harsh J wrote:

SS,

Is your job not progressing at all (i.e. continues to be at 0-progress
for hours), or does it fail after zero progress?

I'd try adding logging to various important points of the map/reduce
functions and see whats taking it so long, or if its getting stuck at
something. Logs are observable for each task attempt via the JT Web
UI/etc..

On Fri, Jul 20, 2012 at 9:43 PM, Shanu Sushmita
<shanu.sushmit...@gmail.com> wrote:
Hi,

I am trying to solve a problem where I need to computed frequencies of words
occurring in a file1 from file 2.

For example:
text in file1:
hadoop
user
hello
world

and text in file2 is:
hadoop
user
hello
world
hadoop
hadoop
hadoop
user
world
world
world
hadoop
user
hello

so the output should be:
hadoop 5
user 3
hello 2
world 4

I read that distributed caching is a good way to do such jobs.Size of my
files are:
File1 = 17GB
File2 = 3 MB

And here is my code:




import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import java.util.*;
import java.io.*;
import org.apache.hadoop.filecache.*;
import java.net.*;

public class RepeatFreqTest
{


public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
           private final static IntWritable one = new IntWritable(1);
           private Text word = new Text();
            private HashSet<String> dyads = new HashSet<String>();


 public void configure(JobConf conf)
 {


Path[] cacheFiles = new Path[0];
  try {


 cacheFiles = DistributedCache.getLocalCacheFiles(conf);
 } // end of try

 catch (IOException ioe) {
System.err.println("Caught exception while getting cached files:
" + StringUtils.stringifyException(ioe));
  }// end of catch

 for (Path dyadPath : cacheFiles)
 {

       loadDyads(dyadPath);
 } // end of for cachePath


 } // end of configure

private void loadDyads(Path dyadPath)
{
try
{
               BufferedReader wordReader = new BufferedReader(new
FileReader(dyadPath.toString()));
               String line = null;
               while ((line = wordReader.readLine()) != null) {
      dyads.add(line);

     } // end of while

     wordReader.close();

}// end of try
catch (IOException ioe) {
System.err.println("IOException reading from distributed cache");
     } // end of catch

}// end of loadDyads()



 /* actual map() method, etc go here */


                       @Override
            public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws
IOException
            {


                       // dyad, ut and year from all dyads (big file)
file!!!


                       String line = value.toString();
                       String[] tokens = line.split("\\|");
                       String ut1 = tokens[0].trim();
                       String dyad1 = tokens[1].trim();
                       String year1 = tokens[2].trim();
                       int y1 = Integer.parseInt(year1);


// dyad, ut and year from sample dyads file (sample
file stored in the memory)!!!


                       Iterator it = dyads.iterator();
                       while(it.hasNext())
                       {
                               //Text word = new Text();
                               String setline = it.next().toString();

String[] tokens2 = setline.split("\ \|");
                               String ut2 = tokens2[0].trim();
                               String dyad2 = tokens2[1].trim();
                               String year2 = tokens2[2].trim();
                               int y2 = Integer.parseInt(year2);




                               if(dyad1.equalsIgnoreCase(dyad2))
                               {
if(! (ut1.equalsIgnoreCase(ut2)))
                                       {
                                               if(y1<=y2)
                                               {

word.set(setline); output.collect(word,
one);
                                               }

                                       } // end of if ut1!=ut2

                               } //


                       }// end of while


} // end of override map
} // end of big Map class


public static class Reduce extends MapReduceBase implements Reducer<Text,
IntWritable, Text, IntWritable> {

@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws
IOException {
              int sum = 0;
              while (values.hasNext()) {
                sum += values.next().get();

              } // end of while
              output.collect(key, new IntWritable(sum));
            } // end of override reduce
          } // end of Big Reduce


public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(RepeatFreqTest.class);
conf.setJobName("Repeat");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));



DistributedCache.addCacheFile(new
Path("/user/ss/cacheFiles/File1.txt").toUri(), conf);

JobClient.runJob(conf);

}// end of main

} // end of class


And I put my File1.txt and File2.txt in hdfs as follows:

$HADOOP_HOME/bin/hadoop fs -mkdir input
$HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles
$HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input
$HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles

My problem is that my code compiles fine, but it would just not proceed from
map 0% reduce 0% stage.

What I am doing wrong?

Any suggestion would be of great help.

Best,
SS



--
Harsh J

Reply via email to