Author: jbellis Date: Wed Dec 8 00:55:29 2010 New Revision: 1043256 URL: http://svn.apache.org/viewvc?rev=1043256&view=rev Log: clean up and comment reducer code patch by jbellis
Modified: cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java Modified: cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java?rev=1043256&r1=1043255&r2=1043256&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java (original) +++ cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java Wed Dec 8 00:55:29 2010 @@ -49,6 +49,9 @@ import org.apache.hadoop.util.ToolRunner * "text" containing a sequence of words. * * For each word, we output the total number of occurrences across all texts. + * + * When outputting to Cassandra, we write the word counts as a {word, count} column/value pair, + * with a row key equal to the name of the source column we read the words from. */ public class WordCount extends Configured implements Tool { @@ -74,11 +77,17 @@ public class WordCount extends Configure { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); - private ByteBuffer columnName; + private ByteBuffer sourceColumn; + + protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) + throws IOException, InterruptedException + { + sourceColumn = ByteBuffer.wrap(context.getConfiguration().get(CONF_COLUMN_NAME).getBytes()); + } public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException { - IColumn column = columns.get(columnName); + IColumn column = columns.get(sourceColumn); if (column == null) return; String value = ByteBufferUtil.string(column.value()); @@ -91,78 +100,48 @@ public class WordCount extends Configure context.write(word, one); } } - - protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) - throws IOException, InterruptedException - { - this.columnName = ByteBuffer.wrap(context.getConfiguration().get(CONF_COLUMN_NAME).getBytes()); - } - } public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable> { - private IntWritable result = new IntWritable(); - public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) - { sum += val.get(); - } - - result.set(sum); - context.write(key, result); + context.write(key, new IntWritable(sum)); } } public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> { - private List<Mutation> results = new ArrayList<Mutation>(); - private String columnName; - - public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException - { - int sum = 0; - for (IntWritable val : values) - { - sum += val.get(); - } - - results.add(getMutation(key, sum)); - context.write(ByteBuffer.wrap(columnName.getBytes()), results); - results.clear(); - } + private ByteBuffer outputKey; protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context) - throws IOException, InterruptedException + throws IOException, InterruptedException { - this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME); + outputKey = ByteBuffer.wrap(context.getConfiguration().get(CONF_COLUMN_NAME).getBytes()); } - private static Mutation getMutation(Text key, int sum) + public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { - Mutation m = new Mutation(); - m.column_or_supercolumn = getCoSC(key, sum); - return m; + int sum = 0; + for (IntWritable val : values) + sum += val.get(); + context.write(outputKey, Collections.singletonList(getMutation(word, sum))); } - private static ColumnOrSuperColumn getCoSC(Text key, int sum) + private static Mutation getMutation(Text word, int sum) { - // Have to convert both the key and the sum to ByteBuffers - // for the generalized output format - ByteBuffer name = ByteBuffer.wrap(key.getBytes()); - ByteBuffer value = ByteBuffer.wrap(String.valueOf(sum).getBytes()); - Column c = new Column(); - c.name = name; - c.value = value; + c.name = ByteBuffer.wrap(word.getBytes()); + c.value = ByteBuffer.wrap(String.valueOf(sum).getBytes()); c.timestamp = System.currentTimeMillis() * 1000; - c.ttl = 0; - ColumnOrSuperColumn cosc = new ColumnOrSuperColumn(); - cosc.column = c; - return cosc; + + Mutation m = new Mutation(); + m.column_or_supercolumn = new ColumnOrSuperColumn(); + m.column_or_supercolumn.column = c; + return m; } } @@ -204,7 +183,7 @@ public class WordCount extends Configure job.setOutputValueClass(List.class); job.setOutputFormatClass(ColumnFamilyOutputFormat.class); - + ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY); }