Author: jbellis
Date: Wed Dec  8 00:55:29 2010
New Revision: 1043256

URL: http://svn.apache.org/viewvc?rev=1043256&view=rev
Log:
clean up and comment reducer code
patch by jbellis

Modified:
    cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java

Modified: cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java?rev=1043256&r1=1043255&r2=1043256&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java 
(original)
+++ cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java Wed 
Dec  8 00:55:29 2010
@@ -49,6 +49,9 @@ import org.apache.hadoop.util.ToolRunner
  * "text" containing a sequence of words.
  *
  * For each word, we output the total number of occurrences across all texts.
+ *
+ * When outputting to Cassandra, we write the word counts as a {word, count} 
column/value pair,
+ * with a row key equal to the name of the source column we read the words 
from.
  */
 public class WordCount extends Configured implements Tool
 {
@@ -74,11 +77,17 @@ public class WordCount extends Configure
     {
         private final static IntWritable one = new IntWritable(1);
         private Text word = new Text();
-        private ByteBuffer columnName;
+        private ByteBuffer sourceColumn;
+
+        protected void setup(org.apache.hadoop.mapreduce.Mapper.Context 
context)
+        throws IOException, InterruptedException
+        {
+            sourceColumn = 
ByteBuffer.wrap(context.getConfiguration().get(CONF_COLUMN_NAME).getBytes());
+        }
 
         public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> 
columns, Context context) throws IOException, InterruptedException
         {
-            IColumn column = columns.get(columnName);
+            IColumn column = columns.get(sourceColumn);
             if (column == null)
                 return;
             String value = ByteBufferUtil.string(column.value());
@@ -91,78 +100,48 @@ public class WordCount extends Configure
                 context.write(word, one);
             }
         }
-
-        protected void setup(org.apache.hadoop.mapreduce.Mapper.Context 
context)
-            throws IOException, InterruptedException
-        {
-            this.columnName = 
ByteBuffer.wrap(context.getConfiguration().get(CONF_COLUMN_NAME).getBytes());
-        }
-        
     }
 
     public static class ReducerToFilesystem extends Reducer<Text, IntWritable, 
Text, IntWritable>
     {
-        private IntWritable result = new IntWritable();
-
         public void reduce(Text key, Iterable<IntWritable> values, Context 
context) throws IOException, InterruptedException
         {
             int sum = 0;
             for (IntWritable val : values)
-            {
                 sum += val.get();
-            }
-
-            result.set(sum);
-            context.write(key, result);
+            context.write(key, new IntWritable(sum));
         }
     }
 
     public static class ReducerToCassandra extends Reducer<Text, IntWritable, 
ByteBuffer, List<Mutation>>
     {
-        private List<Mutation> results = new ArrayList<Mutation>();
-        private String columnName;
-
-        public void reduce(Text key, Iterable<IntWritable> values, Context 
context) throws IOException, InterruptedException
-        {
-            int sum = 0;
-            for (IntWritable val : values)
-            {
-                sum += val.get();
-            }
-
-            results.add(getMutation(key, sum));
-            context.write(ByteBuffer.wrap(columnName.getBytes()), results);
-            results.clear();
-        }
+        private ByteBuffer outputKey;
 
         protected void setup(org.apache.hadoop.mapreduce.Reducer.Context 
context)
-            throws IOException, InterruptedException
+        throws IOException, InterruptedException
         {
-            this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME);
+            outputKey = 
ByteBuffer.wrap(context.getConfiguration().get(CONF_COLUMN_NAME).getBytes());
         }
 
-        private static Mutation getMutation(Text key, int sum)
+        public void reduce(Text word, Iterable<IntWritable> values, Context 
context) throws IOException, InterruptedException
         {
-            Mutation m = new Mutation();
-            m.column_or_supercolumn = getCoSC(key, sum);
-            return m;
+            int sum = 0;
+            for (IntWritable val : values)
+                sum += val.get();
+            context.write(outputKey, 
Collections.singletonList(getMutation(word, sum)));
         }
 
-        private static ColumnOrSuperColumn getCoSC(Text key, int sum)
+        private static Mutation getMutation(Text word, int sum)
         {
-            // Have to convert both the key and the sum to ByteBuffers
-            // for the generalized output format
-            ByteBuffer name = ByteBuffer.wrap(key.getBytes());
-            ByteBuffer value = ByteBuffer.wrap(String.valueOf(sum).getBytes());
-
             Column c = new Column();
-            c.name = name;
-            c.value = value;
+            c.name = ByteBuffer.wrap(word.getBytes());
+            c.value = ByteBuffer.wrap(String.valueOf(sum).getBytes());
             c.timestamp = System.currentTimeMillis() * 1000;
-            c.ttl = 0;
-            ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
-            cosc.column = c;
-            return cosc;
+
+            Mutation m = new Mutation();
+            m.column_or_supercolumn = new ColumnOrSuperColumn();
+            m.column_or_supercolumn.column = c;
+            return m;
         }
     }
 
@@ -204,7 +183,7 @@ public class WordCount extends Configure
                 job.setOutputValueClass(List.class);
 
                 job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
-                
+
                 ConfigHelper.setOutputColumnFamily(job.getConfiguration(), 
KEYSPACE, OUTPUT_COLUMN_FAMILY);
             }
 


Reply via email to