Hi,

I am a new hadoop developer and am struggling to understand why I cannot pass 
TupleWritable between a map and reduce function.  I have modified the wordcount 
example to demonstrate the issue.  Also I am using hadoop 0.17.1.

package wordcount; import java.io.IOException; import java.util.*; import 
org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import 
org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.join.*; public 
class WordCount {    public static class Map extends MapReduceBase implements 
Mapper<LongWritable, Text, Text, TupleWritable> {        private final static 
IntWritable one = new IntWritable(1);        private Text word = new Text();    
    public void map(LongWritable key, Text value, OutputCollector<Text, 
TupleWritable> output, Reporter reporter) throws IOException {            
String line = value.toString();            StringTokenizer tokenizer = new 
StringTokenizer(line);            TupleWritable tuple = new TupleWritable(new 
Writable[] { one } );            while (tokenizer.hasMoreTokens()) {            
    word.set(tokenizer.nextToken());                output.collect(word, 
tuple);            }        }    }    public static class Reduce extends 
MapReduceBase implements Reducer<Text, TupleWritable, Text, TupleWritable> {    
    public void reduce(Text key, Iterator<TupleWritable> values, 
OutputCollector<Text, TupleWritable> output, Reporter reporter) throws 
IOException {            IntWritable i = new IntWritable();            int sum 
= 0;            while (values.hasNext()) {                i = ((IntWritable) 
values.next().get(0));                sum += i.get();            }            
TupleWritable tuple = new TupleWritable(new Writable[] { new IntWritable(sum) } 
);            output.collect(key, tuple);        }    }    public static void 
main(String[] args) throws Exception {        JobConf conf = new 
JobConf(WordCount.class);        conf.setJobName("wordcount");        
conf.setOutputKeyClass(Text.class);        
conf.setOutputValueClass(TupleWritable.class);        
conf.setMapperClass(Map.class);        conf.setReducerClass(Reduce.class);      
  conf.setInputFormat(TextInputFormat.class);        
conf.setOutputFormat(TextOutputFormat.class);        
FileInputFormat.setInputPaths(conf, new Path(args[0]));        
FileOutputFormat.setOutputPath(conf, new Path(args[1]));        
JobClient.runJob(conf);    } }
The output is always empty tuples ('[]').  Using the debugger, I have 
determined that the line:
    TupleWritable tuple = new TupleWritable(new Writable[] { one } );

Is properly constructing the desired tuple.  I am not sure if it is being 
outputed correctly by output.collect as I cannot find the field in the 
OutputCollector data structure.  When I check in the reduce method the values 
are always empty tuples.  I have a feeling it has something to do with this 
line in the JavaDoc:

TupleWritable(Writable[] vals)
          Initialize tuple with storage; unknown whether any of them contain 
"written" values.

Thanks in advance for any all help,

Michael


Reply via email to