Author: jbellis
Date: Mon Sep 27 22:11:05 2010
New Revision: 1001930

URL: http://svn.apache.org/viewvc?rev=1001930&view=rev
Log:
Added option for filesystem/cassandra output.
patch by Jeremy Hanna; reviewed by Stu Hood for CASSANDRA-1342


Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/word_count/README.txt
    cassandra/trunk/contrib/word_count/bin/word_count   (contents, props 
changed)
    cassandra/trunk/contrib/word_count/bin/word_count_setup   (contents, props 
changed)
    cassandra/trunk/contrib/word_count/src/WordCount.java
    cassandra/trunk/contrib/word_count/src/WordCountSetup.java
    
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Sep 27 22:11:05 2010
@@ -1,5 +1,7 @@
 dev
  * create EndpointSnitchInfo and MBean to expose rack and DC (CASSANDRA-1491)
+ * added option to contrib/word_count to output results back to Cassandra
+   (CASSANDRA-1342)
 
 
 0.7-beta2

Modified: cassandra/trunk/contrib/word_count/README.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/README.txt?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
--- cassandra/trunk/contrib/word_count/README.txt (original)
+++ cassandra/trunk/contrib/word_count/README.txt Mon Sep 27 22:11:05 2010
@@ -10,14 +10,20 @@ contrib/word_count$ ant
 contrib/word_count$ bin/word_count_setup
 contrib/word_count$ bin/word_count
 
-Output will be in /tmp/word_count*.
+The output of the word count can now be configured. In the bin/word_count
+file, you can specify the OUTPUT_REDUCER. The two options are 'filesystem'
+and 'cassandra'. The filesystem option outputs to the /tmp/word_count*
+directories. The cassandra option outputs to the 'Standard2' column family.
+
+In order to view the results in Cassandra, one can use python/pycassa and
+perform the following operations:
+$ python
+>>> import pycassa
+>>> con = pycassa.connect('Keyspace1')
+>>> cf = pycassa.ColumnFamily(con, 'Standard2')
+>>> list(cf.get_range())
 
 Read the code in src/ for more details.
 
 *If you want to point wordcount at a real cluster, modify the seed
-and listenaddress settings in storage-conf.xml accordingly.
-
-*For Mac users, the storage-conf.xml uses 127.0.0.2 for the 
-word_count_setup. Mac OS X doesn't have that address available.
-To add it, run this before running bin/word_count_setup:
-sudo ifconfig lo0 alias 127.0.0.2 up
+and listenaddress settings accordingly.

Modified: cassandra/trunk/contrib/word_count/bin/word_count
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/bin/word_count?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
--- cassandra/trunk/contrib/word_count/bin/word_count (original)
+++ cassandra/trunk/contrib/word_count/bin/word_count Mon Sep 27 22:11:05 2010
@@ -53,5 +53,7 @@ if [ "x$JAVA" = "x" ]; then
     exit 1
 fi
 
+OUTPUT_REDUCER=filesystem
+
 #echo $CLASSPATH
-$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount
+$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER

Propchange: cassandra/trunk/contrib/word_count/bin/word_count
------------------------------------------------------------------------------
    svn:executable = *

Modified: cassandra/trunk/contrib/word_count/bin/word_count_setup
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/bin/word_count_setup?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
    (empty)

Propchange: cassandra/trunk/contrib/word_count/bin/word_count_setup
------------------------------------------------------------------------------
    svn:executable = *

Modified: cassandra/trunk/contrib/word_count/src/WordCount.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/src/WordCount.java?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
--- cassandra/trunk/contrib/word_count/src/WordCount.java (original)
+++ cassandra/trunk/contrib/word_count/src/WordCount.java Mon Sep 27 22:11:05 
2010
@@ -17,10 +17,13 @@
  */
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.SortedMap;
-import java.util.StringTokenizer;
+import java.nio.ByteBuffer;
+import java.util.*;
 
+import org.apache.cassandra.avro.Column;
+import org.apache.cassandra.avro.ColumnOrSuperColumn;
+import org.apache.cassandra.avro.Mutation;
+import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,9 +55,13 @@ public class WordCount extends Configure
 
     static final String KEYSPACE = "Keyspace1";
     static final String COLUMN_FAMILY = "Standard1";
-    private static final String CONF_COLUMN_NAME = "columnname";
+
+    static final String OUTPUT_REDUCER_VAR = "output_reducer";
+    static final String OUTPUT_COLUMN_FAMILY = "Standard2";
     private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count";
 
+    private static final String CONF_COLUMN_NAME = "columnname";
+
     public static void main(String[] args) throws Exception
     {
         // Let ToolRunner handle generic command-line options
@@ -92,7 +99,7 @@ public class WordCount extends Configure
         
     }
 
-    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, 
IntWritable>
+    public static class ReducerToFilesystem extends Reducer<Text, IntWritable, 
Text, IntWritable>
     {
         private IntWritable result = new IntWritable();
 
@@ -109,30 +116,109 @@ public class WordCount extends Configure
         }
     }
 
+    public static class ReducerToCassandra extends Reducer<Text, IntWritable, 
ByteBuffer, List<Mutation>>
+    {
+        private List<Mutation> results = new ArrayList<Mutation>();
+        private String columnName;
+
+        public void reduce(Text key, Iterable<IntWritable> values, Context 
context) throws IOException, InterruptedException
+        {
+            int sum = 0;
+            for (IntWritable val : values)
+            {
+                sum += val.get();
+            }
+
+            results.add(getMutation(key, sum));
+            context.write(ByteBuffer.wrap(columnName.getBytes()), results);
+            results.clear();
+        }
+
+        protected void setup(org.apache.hadoop.mapreduce.Reducer.Context 
context)
+            throws IOException, InterruptedException
+        {
+            this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME);
+        }
+
+        private static Mutation getMutation(Text key, int sum)
+        {
+            Mutation m = new Mutation();
+            m.column_or_supercolumn = getCoSC(key, sum);
+            return m;
+        }
+
+        private static ColumnOrSuperColumn getCoSC(Text key, int sum)
+        {
+            // Have to convert both the key and the sum to ByteBuffers
+            // for the generalized output format
+            ByteBuffer name = ByteBuffer.wrap(key.getBytes());
+            ByteBuffer value = ByteBuffer.wrap(String.valueOf(sum).getBytes());
+
+            Column c = new Column();
+            c.name = name;
+            c.value = value;
+            c.timestamp = System.currentTimeMillis() * 1000;
+            c.ttl = 0;
+            ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+            cosc.column = c;
+            return cosc;
+        }
+    }
+
     public int run(String[] args) throws Exception
     {
+        String outputReducerType = "filesystem";
+        if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR))
+        {
+            String[] s = args[0].split("=");
+            if (s != null && s.length == 2)
+                outputReducerType = s[1];
+        }
+        logger.info("output reducer type: " + outputReducerType);
 
         for (int i = 0; i < WordCountSetup.TEST_COUNT; i++)
         {
             String columnName = "text" + i;
             getConf().set(CONF_COLUMN_NAME, columnName);
+
             Job job = new Job(getConf(), "wordcount");
             job.setJarByClass(WordCount.class);
             job.setMapperClass(TokenizerMapper.class);
-            job.setCombinerClass(IntSumReducer.class);
-            job.setReducerClass(IntSumReducer.class);
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(IntWritable.class);
+
+            if (outputReducerType.equalsIgnoreCase("filesystem"))
+            {
+                job.setCombinerClass(ReducerToFilesystem.class);
+                job.setReducerClass(ReducerToFilesystem.class);
+                job.setOutputKeyClass(Text.class);
+                job.setOutputValueClass(IntWritable.class);
+                FileOutputFormat.setOutputPath(job, new 
Path(OUTPUT_PATH_PREFIX + i));
+            }
+            else
+            {
+                job.setReducerClass(ReducerToCassandra.class);
+
+                job.setMapOutputKeyClass(Text.class);
+                job.setMapOutputValueClass(IntWritable.class);
+                job.setOutputKeyClass(ByteBuffer.class);
+                job.setOutputValueClass(List.class);
+
+                job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
+                
+                ConfigHelper.setOutputColumnFamily(job.getConfiguration(), 
KEYSPACE, OUTPUT_COLUMN_FAMILY);
+            }
 
             job.setInputFormatClass(ColumnFamilyInputFormat.class);
-            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + 
i));
+
 
             ConfigHelper.setRpcPort(job.getConfiguration(), "9160");
             ConfigHelper.setInitialAddress(job.getConfiguration(), 
"localhost");
+            ConfigHelper.setPartitioner(job.getConfiguration(), 
"org.apache.cassandra.dht.RandomPartitioner");
             ConfigHelper.setInputColumnFamily(job.getConfiguration(), 
KEYSPACE, COLUMN_FAMILY);
             SlicePredicate predicate = new 
SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
             ConfigHelper.setInputSlicePredicate(job.getConfiguration(), 
predicate);
 
+
+
             job.waitForCompletion(true);
         }
         return 0;

Modified: cassandra/trunk/contrib/word_count/src/WordCountSetup.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/src/WordCountSetup.java?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
--- cassandra/trunk/contrib/word_count/src/WordCountSetup.java (original)
+++ cassandra/trunk/contrib/word_count/src/WordCountSetup.java Mon Sep 27 
22:11:05 2010
@@ -97,8 +97,8 @@ public class WordCountSetup
     private static void setupKeyspace(Cassandra.Iface client) throws 
TException, InvalidRequestException
     {
         List<CfDef> cfDefList = new ArrayList<CfDef>();
-        CfDef cfDef = new CfDef(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY);
-        cfDefList.add(cfDef);
+        cfDefList.add(new CfDef(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY));
+        cfDefList.add(new CfDef(WordCount.KEYSPACE, 
WordCount.OUTPUT_COLUMN_FAMILY));
 
         client.system_add_keyspace(new KsDef(WordCount.KEYSPACE, 
"org.apache.cassandra.locator.SimpleStrategy", 1, cfDefList));
     }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
 Mon Sep 27 22:11:05 2010
@@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory;
  * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
  * OutputFormat that allows reduce tasks to store keys (and corresponding
  * values) as Cassandra rows (and respective columns) in a given
- * {...@link ColumnFamily}.
+ * ColumnFamily.
  * 
  * <p>
  * As is the case with the {...@link ColumnFamilyInputFormat}, you need to set 
the


Reply via email to