You need access to TupleWritable::setWritten(int). If you want to use
TupleWritable outside the join package, then you need to make this
(and probably related methods, like clearWritten(int)) public and
recompile.
Please file a JIRA if you think it should be more general. -C
On Aug 7, 2008, at 4:18 PM, Michael Andrews wrote:
Hi,
I am a new hadoop developer and am struggling to understand why I
cannot pass TupleWritable between a map and reduce function. I have
modified the wordcount example to demonstrate the issue. Also I am
using hadoop 0.17.1.
package wordcount; import java.io.IOException; import java.util.*;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*; import
org.apache.hadoop.mapred.join.*; public class WordCount { public
static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, TupleWritable> { private
final static IntWritable one = new IntWritable(1); private
Text word = new Text(); public void map(LongWritable key,
Text value, OutputCollector<Text, TupleWritable> output, Reporter
reporter) throws IOException { String line =
value.toString(); StringTokenizer tokenizer = new
StringTokenizer(line); TupleWritable tuple = new
TupleWritable(new Writable[] { one } ); while
(tokenizer.hasMoreTokens())
{ word.set(tokenizer.nextToken());
output.collect(word, tuple); } } } public
static class Reduce extends MapReduceBase implements Reducer<Text,
TupleWritable, Text, TupleWritable> { public void reduce(Text
key, Iterator<TupleWritable> values, OutputCollector<Text,
TupleWritable> output, Reporter reporter) throws IOException
{ IntWritable i = new IntWritable(); int sum =
0; while (values.hasNext()) { i =
((IntWritable) values.next().get(0)); sum +=
i.get(); } TupleWritable tuple = new
TupleWritable(new Writable[] { new IntWritable(sum) } );
output.collect(key, tuple); } } public static void
main(String[] args) throws Exception { JobConf conf = new
JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(TupleWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf); } }
The output is always empty tuples ('[]'). Using the debugger, I
have determined that the line:
TupleWritable tuple = new TupleWritable(new Writable[] { one } );
Is properly constructing the desired tuple. I am not sure if it is
being outputed correctly by output.collect as I cannot find the
field in the OutputCollector data structure. When I check in the
reduce method the values are always empty tuples. I have a feeling
it has something to do with this line in the JavaDoc:
TupleWritable(Writable[] vals)
Initialize tuple with storage; unknown whether any of them
contain "written" values.
Thanks in advance for any all help,
Michael