I'm not sure if this is exactly what you want but, can you emit map records as:
cat, doc5 -> 3 cat, doc1 -> 1 cat, doc5 -> 1 and so on.. This way, your reducers will get the intermediate key,value pairs as cat, doc5 -> 3 cat, doc5 -> 1 cat, doc1 -> 1 then you can split the keys (cat, doc*) inside the reducer and perform your additions. -Jim On Mon, Apr 13, 2009 at 4:53 PM, Streckfus, William [USA] < streckfus_will...@bah.com> wrote: > Hi Everyone, > > I'm working on a relatively simple MapReduce job with a slight complication > with regards to the ordering of my key/values heading into the reducer. The > output from the mapper might be something like > > cat -> doc5, 1 > cat -> doc1, 1 > cat -> doc5, 3 > ... > > Here, 'cat' is my key and the value is the document ID and the count (my > own WritableComparable.) Originally I was going to create a HashMap in the > reduce method and add an entry for each document ID and sum the counts for > each. I realized the method would be better if the values were in order like > so: > > cat -> doc1, 1 > cat -> doc5, 1 > cat -> doc5, 3 > ... > > Using this style I can continue summing until I reach a new document ID and > just collect the output at this point thus avoiding data structures and > object creation costs. I tried setting > JobConf.setOutputValueGroupingComparator() but this didn't seem to do > anything. In fact, I threw an exception from the Comparator I supplied but > this never showed up when running the job. My map output value consists of a > UTF and a Long so perhaps the Comparator I'm using (identical to > Text.Comparator) is incorrect: > > *public* *int* compare(*byte*[] b1, *int* s1, *int* l1, *byte*[] b2, *int*s2, > *int* l2) { > *int* n1 = WritableUtils.*decodeVIntSize*(b1[s1]); > *int* n2 = WritableUtils.*decodeVIntSize*(b2[s2]); > > *return* *compareBytes*(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2); > } > In my final output I'm basically running into the same word -> documentID > being output multiple times. So for the above example I have multiple lines > with cat -> doc5, X. > > Reducer method just in case: > > *public* *void* reduce(Text key, Iterator<TermFrequencyWritable> values, > OutputCollector<Text, TermFrequencyWritable> output, Reporter reporter) * > throws* IOException { > *long* sum = 0; > String lastDocID = *null*; > > // Iterate through all values > *while*(values.hasNext()) { > TermFrequencyWritable value = values.next(); > > // Encountered new document ID = record and reset > *if*(!value.getDocumentID().equals(lastDocID)) { > // Ignore first go through > *if*(sum != 0) { > termFrequency.setDocumentID(lastDocID); > termFrequency.setFrequency(sum); > output.collect(key, termFrequency); > } > > sum = 0; > lastDocID = value.getDocumentID(); > } > > sum += value.getFrequency(); > } > > // Record last one > termFrequency.setDocumentID(lastDocID); > termFrequency.setFrequency(sum); > output.collect(key, termFrequency); > } > > Any ideas (Using Hadoop .19.1)? > > Thanks, > - Bill >