I think Teppo Kurki raised a valid point and may reveal a serious defect of Hadoop.
When map tasks write intermediate data out, they always use SequencialFile RecordWriter with key/value classes from the job object. When the reducers write the final results out, its output format is obtained from the job object. By default, it is TextOutputFormat, and no conflicts. However, if one wants to use SequencialFileFormat for the final results, then the key/value classes are also obtained from the job object, the same as the map tasks' output. Now we have a problem. It is impossible for the map outputs and reducer outputs use different key/value classes, if one wants the reducers generate outputs in SequentialFileFormat. Runping -----Original Message----- From: Hairong Kuang [mailto:[EMAIL PROTECTED] Sent: Thursday, March 30, 2006 11:41 AM To: [email protected] Subject: RE: Different Key/Value classes for Map and Reduce? The input format specifies the format of your input files. The default format is TextInputFormat, which the key is LongWritable (the offset of the line in the file) and the type of value is UTF8 ( the line). So your program does not need to specify the input format and does not need to set the input key type and the input value type. The output format specifies the format of the output files of the map job. The default is TextOutputFormat. In your program, your map task emits (LongWritable, UTF8) pairs. So you need to do statJob.setOutputKeyClass(LongWritable.class); // not IntWritable statJob.setOutputValueClass(UTF8.class); Hope it helps. Hairong -----Original Message----- From: Teppo Kurki [mailto:[EMAIL PROTECTED] Sent: Wednesday, March 29, 2006 9:57 PM To: [email protected] Subject: Re: Different Key/Value classes for Map and Reduce? Hairong Kuang wrote: >Did you create your input file using a SequenceFile.Writer? > > > No, my input files are line-oriented log files, conceptually similar to the Grep example. Here's what I'm trying to do in a simplified example (parsing the lines replaced with simulation code). public class MapRedClassDemo implements Mapper, Reducer { long count = 0; public void configure(JobConf job) {} public void close() throws IOException {} public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { String line = ((UTF8) value).toString(); String s = "demo" + line.length(); output.collect(new LongWritable(count++ % 20), new UTF8(s)); } public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { Set set = new HashSet(); while (values.hasNext()) { set.add(values.next()); } // output.collect(key, new IntWritable(set.size())); output.collect(key, new UTF8("" + set.size())); } public static void main(String[] args) throws Exception { Configuration defaults = new Configuration(); JobConf statJob = new JobConf(defaults, MapRedClassDemo.class); statJob.setInputDir(new File(args[0])); statJob.setMapperClass(MapRedClassDemo.class); statJob.setReducerClass(MapRedClassDemo.class); statJob.setInputKeyClass(LongWritable.class); statJob.setInputValueClass(UTF8.class); statJob.setOutputDir(new File(args[1] + "/" + System.currentTimeMillis() + "/")); statJob.setOutputFormat(TextOutputFormat.class); statJob.setOutputKeyClass(LongWritable.class); // statJob.setOutputValueClass(IntWritable.class); statJob.setOutputValueClass(UTF8.class); JobClient.runJob(statJob); } } This works, since both Map and Reduce emit <LongWritable, UTF8> pairs. If tried with statJob.setOutputValueClass(IntWritable.class) and Recuce emitting <LongWritable,IntWritable>s the _Map_ operation fails with java.io.IOException: wrong value class: demo310 is not class org.apache.hadoop.io.IntWritable at org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:150) at org.apache.hadoop.mapred.MapTask$2.collect(MapTask.java:92) so it appears that the setInputXXClass methods have no effect on the Map phase. Or have I understood something completely wrong?
