You want to do a word count for each file, but the code give you a word count for all the files, right?
===== word.set(tokenizer.nextToken()); output.collect(word, one); ====== change it to: word.set("filename"+" "+tokenizer.nextToken()); output.collect(word,one); Regards, *Stanley Shi,* On Wed, Mar 19, 2014 at 8:50 PM, Ranjini Rathinam <ranjinibe...@gmail.com>wrote: > Hi, > > I have folder named INPUT. > > Inside INPUT i have 5 resume are there. > > hduser@localhost:~/Ranjini$ hadoop fs -ls /user/hduser/INPUT > Found 5 items > -rw-r--r-- 1 hduser supergroup 5438 2014-03-18 15:20 > /user/hduser/INPUT/Rakesh Chowdary_Microstrategy.txt > -rw-r--r-- 1 hduser supergroup 6022 2014-03-18 15:22 > /user/hduser/INPUT/Ramarao Devineni_Microstrategy.txt > -rw-r--r-- 1 hduser supergroup 3517 2014-03-18 15:21 > /user/hduser/INPUT/vinitha.txt > -rw-r--r-- 1 hduser supergroup 3517 2014-03-18 15:21 > /user/hduser/INPUT/sony.txt > -rw-r--r-- 1 hduser supergroup 3517 2014-03-18 15:21 > /user/hduser/INPUT/ravi.txt > hduser@localhost:~/Ranjini$ > > I have to process the folder and the content . > > I need ouput has > > filename word occurance > vinitha java 4 > sony oracle 3 > > > > But iam not getting the filename. Has the input file content are merged > file name is not getting correct . > > > please help in this issue to fix. I have given by code below > > > import java.io.IOException; > import java.util.*; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.conf.*; > import org.apache.hadoop.io.*; > import org.apache.hadoop.mapred.*; > import org.apache.hadoop.util.*; > import java.io.File; > import java.io.FileReader; > import java.io.FileWriter; > import java.io.IOException; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.FileStatus; > import org.apache.hadoop.conf.*; > import org.apache.hadoop.io.*; > import org.apache.hadoop.mapred.*; > import org.apache.hadoop.util.*; > import org.apache.hadoop.mapred.lib.*; > > public class WordCount { > public static class Map extends MapReduceBase implements > Mapper<LongWritable, Text, Text, IntWritable> { > private final static IntWritable one = new IntWritable(1); > private Text word = new Text(); > public void map(LongWritable key, Text value, OutputCollector<Text, > IntWritable> output, Reporter reporter) throws IOException { > FSDataInputStream fs=null; > FileSystem hdfs = null; > String line = value.toString(); > int i=0,k=0; > try{ > Configuration configuration = new Configuration(); > configuration.set("fs.default.name", "hdfs://localhost:4440/"); > > Path srcPath = new Path("/user/hduser/INPUT/"); > > hdfs = FileSystem.get(configuration); > FileStatus[] status = hdfs.listStatus(srcPath); > fs=hdfs.open(srcPath); > BufferedReader br=new BufferedReader(new > InputStreamReader(hdfs.open(srcPath))); > > String[] splited = line.split("\\s+"); > for( i=0;i<splited.length;i++) > { > String sp[]=splited[i].split(","); > for( k=0;k<sp.length;k++) > { > > if(!sp[k].isEmpty()){ > StringTokenizer tokenizer = new StringTokenizer(sp[k]); > if((sp[k].equalsIgnoreCase("C"))){ > while (tokenizer.hasMoreTokens()) { > word.set(tokenizer.nextToken()); > output.collect(word, one); > } > } > if((sp[k].equalsIgnoreCase("JAVA"))){ > while (tokenizer.hasMoreTokens()) { > word.set(tokenizer.nextToken()); > output.collect(word, one); > } > } > } > } > } > } catch (IOException e) { > e.printStackTrace(); > } > } > } > public static class Reduce extends MapReduceBase implements > Reducer<Text, IntWritable, Text, IntWritable> { > public void reduce(Text key, Iterator<IntWritable> values, > OutputCollector<Text, IntWritable> output, Reporter reporter) throws > IOException { > int sum = 0; > while (values.hasNext()) { > sum += values.next().get(); > } > output.collect(key, new IntWritable(sum)); > } > } > public static void main(String[] args) throws Exception { > > > JobConf conf = new JobConf(WordCount.class); > conf.setJobName("wordcount"); > conf.setOutputKeyClass(Text.class); > conf.setOutputValueClass(IntWritable.class); > conf.setMapperClass(Map.class); > conf.setCombinerClass(Reduce.class); > conf.setReducerClass(Reduce.class); > conf.setInputFormat(TextInputFormat.class); > conf.setOutputFormat(TextOutputFormat.class); > FileInputFormat.setInputPaths(conf, new Path(args[0])); > FileOutputFormat.setOutputPath(conf, new Path(args[1])); > JobClient.runJob(conf); > } > } > > > > Please help > > Thanks in advance. > > Ranjini > > >