Hi,

I have folder named INPUT.

Inside INPUT i have 5 resume are there.

hduser@localhost:~/Ranjini$ hadoop fs -ls /user/hduser/INPUT
Found 5 items
-rw-r--r--   1 hduser supergroup       5438 2014-03-18 15:20
/user/hduser/INPUT/Rakesh Chowdary_Microstrategy.txt
-rw-r--r--   1 hduser supergroup       6022 2014-03-18 15:22
/user/hduser/INPUT/Ramarao Devineni_Microstrategy.txt
-rw-r--r--   1 hduser supergroup       3517 2014-03-18 15:21
/user/hduser/INPUT/vinitha.txt
-rw-r--r--   1 hduser supergroup       3517 2014-03-18 15:21
/user/hduser/INPUT/sony.txt
-rw-r--r--   1 hduser supergroup       3517 2014-03-18 15:21
/user/hduser/INPUT/ravi.txt
hduser@localhost:~/Ranjini$

I have to process the folder and the content .

I need ouput has

filename   word   occurance
vinitha       java       4
sony          oracle      3



But iam not getting the filename.  Has the input file content are merged
file name is not getting correct .


please help in this issue to fix.  I have given by code below


 import java.io.IOException;
 import java.util.*;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.util.*;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.lib.*;

 public class WordCount {
    public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
     private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      public void map(LongWritable key, Text value, OutputCollector<Text,
IntWritable> output, Reporter reporter) throws IOException {
   FSDataInputStream fs=null;
   FileSystem hdfs = null;
   String line = value.toString();
         int i=0,k=0;
  try{
   Configuration configuration = new Configuration();
      configuration.set("fs.default.name", "hdfs://localhost:4440/");

   Path srcPath = new Path("/user/hduser/INPUT/");

   hdfs = FileSystem.get(configuration);
   FileStatus[] status = hdfs.listStatus(srcPath);
   fs=hdfs.open(srcPath);
   BufferedReader br=new BufferedReader(new
InputStreamReader(hdfs.open(srcPath)));

String[] splited = line.split("\\s <file://s/>+");
    for( i=0;i<splited.length;i++)
 {
     String sp[]=splited[i].split(",");
     for( k=0;k<sp.length;k++)
 {

   if(!sp[k].isEmpty()){
StringTokenizer tokenizer = new StringTokenizer(sp[k]);
if((sp[k].equalsIgnoreCase("C"))){
        while (tokenizer.hasMoreTokens()) {
          word.set(tokenizer.nextToken());
          output.collect(word, one);
        }
}
if((sp[k].equalsIgnoreCase("JAVA"))){
        while (tokenizer.hasMoreTokens()) {
          word.set(tokenizer.nextToken());
          output.collect(word, one);
        }
}
      }
    }
}
 } catch (IOException e) {
    e.printStackTrace();
 }
}
}
    public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
      public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws
IOException {
        int sum = 0;
        while (values.hasNext()) {
          sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
      }
    }
    public static void main(String[] args) throws Exception {


      JobConf conf = new JobConf(WordCount.class);
      conf.setJobName("wordcount");
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
      conf.setMapperClass(Map.class);
      conf.setCombinerClass(Reduce.class);
      conf.setReducerClass(Reduce.class);
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
      JobClient.runJob(conf);
    }
 }



Please help

Thanks in advance.

Ranjini

Reply via email to