Author: jbellis Date: Mon Sep 27 22:11:05 2010 New Revision: 1001930 URL: http://svn.apache.org/viewvc?rev=1001930&view=rev Log: Added option for filesystem/cassandra output. patch by Jeremy Hanna; reviewed by Stu Hood for CASSANDRA-1342
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/contrib/word_count/README.txt cassandra/trunk/contrib/word_count/bin/word_count (contents, props changed) cassandra/trunk/contrib/word_count/bin/word_count_setup (contents, props changed) cassandra/trunk/contrib/word_count/src/WordCount.java cassandra/trunk/contrib/word_count/src/WordCountSetup.java cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1001930&r1=1001929&r2=1001930&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Mon Sep 27 22:11:05 2010 @@ -1,5 +1,7 @@ dev * create EndpointSnitchInfo and MBean to expose rack and DC (CASSANDRA-1491) + * added option to contrib/word_count to output results back to Cassandra + (CASSANDRA-1342) 0.7-beta2 Modified: cassandra/trunk/contrib/word_count/README.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/README.txt?rev=1001930&r1=1001929&r2=1001930&view=diff ============================================================================== --- cassandra/trunk/contrib/word_count/README.txt (original) +++ cassandra/trunk/contrib/word_count/README.txt Mon Sep 27 22:11:05 2010 @@ -10,14 +10,20 @@ contrib/word_count$ ant contrib/word_count$ bin/word_count_setup contrib/word_count$ bin/word_count -Output will be in /tmp/word_count*. +The output of the word count can now be configured. In the bin/word_count +file, you can specify the OUTPUT_REDUCER. The two options are 'filesystem' +and 'cassandra'. The filesystem option outputs to the /tmp/word_count* +directories. The cassandra option outputs to the 'Standard2' column family. + +In order to view the results in Cassandra, one can use python/pycassa and +perform the following operations: +$ python +>>> import pycassa +>>> con = pycassa.connect('Keyspace1') +>>> cf = pycassa.ColumnFamily(con, 'Standard2') +>>> list(cf.get_range()) Read the code in src/ for more details. *If you want to point wordcount at a real cluster, modify the seed -and listenaddress settings in storage-conf.xml accordingly. - -*For Mac users, the storage-conf.xml uses 127.0.0.2 for the -word_count_setup. Mac OS X doesn't have that address available. -To add it, run this before running bin/word_count_setup: -sudo ifconfig lo0 alias 127.0.0.2 up +and listenaddress settings accordingly. Modified: cassandra/trunk/contrib/word_count/bin/word_count URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/bin/word_count?rev=1001930&r1=1001929&r2=1001930&view=diff ============================================================================== --- cassandra/trunk/contrib/word_count/bin/word_count (original) +++ cassandra/trunk/contrib/word_count/bin/word_count Mon Sep 27 22:11:05 2010 @@ -53,5 +53,7 @@ if [ "x$JAVA" = "x" ]; then exit 1 fi +OUTPUT_REDUCER=filesystem + #echo $CLASSPATH -$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount +$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER Propchange: cassandra/trunk/contrib/word_count/bin/word_count ------------------------------------------------------------------------------ svn:executable = * Modified: cassandra/trunk/contrib/word_count/bin/word_count_setup URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/bin/word_count_setup?rev=1001930&r1=1001929&r2=1001930&view=diff ============================================================================== (empty) Propchange: cassandra/trunk/contrib/word_count/bin/word_count_setup ------------------------------------------------------------------------------ svn:executable = * Modified: cassandra/trunk/contrib/word_count/src/WordCount.java URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/src/WordCount.java?rev=1001930&r1=1001929&r2=1001930&view=diff ============================================================================== --- cassandra/trunk/contrib/word_count/src/WordCount.java (original) +++ cassandra/trunk/contrib/word_count/src/WordCount.java Mon Sep 27 22:11:05 2010 @@ -17,10 +17,13 @@ */ import java.io.IOException; -import java.util.Arrays; -import java.util.SortedMap; -import java.util.StringTokenizer; +import java.nio.ByteBuffer; +import java.util.*; +import org.apache.cassandra.avro.Column; +import org.apache.cassandra.avro.ColumnOrSuperColumn; +import org.apache.cassandra.avro.Mutation; +import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,9 +55,13 @@ public class WordCount extends Configure static final String KEYSPACE = "Keyspace1"; static final String COLUMN_FAMILY = "Standard1"; - private static final String CONF_COLUMN_NAME = "columnname"; + + static final String OUTPUT_REDUCER_VAR = "output_reducer"; + static final String OUTPUT_COLUMN_FAMILY = "Standard2"; private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count"; + private static final String CONF_COLUMN_NAME = "columnname"; + public static void main(String[] args) throws Exception { // Let ToolRunner handle generic command-line options @@ -92,7 +99,7 @@ public class WordCount extends Configure } - public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> + public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @@ -109,30 +116,109 @@ public class WordCount extends Configure } } + public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> + { + private List<Mutation> results = new ArrayList<Mutation>(); + private String columnName; + + public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException + { + int sum = 0; + for (IntWritable val : values) + { + sum += val.get(); + } + + results.add(getMutation(key, sum)); + context.write(ByteBuffer.wrap(columnName.getBytes()), results); + results.clear(); + } + + protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context) + throws IOException, InterruptedException + { + this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME); + } + + private static Mutation getMutation(Text key, int sum) + { + Mutation m = new Mutation(); + m.column_or_supercolumn = getCoSC(key, sum); + return m; + } + + private static ColumnOrSuperColumn getCoSC(Text key, int sum) + { + // Have to convert both the key and the sum to ByteBuffers + // for the generalized output format + ByteBuffer name = ByteBuffer.wrap(key.getBytes()); + ByteBuffer value = ByteBuffer.wrap(String.valueOf(sum).getBytes()); + + Column c = new Column(); + c.name = name; + c.value = value; + c.timestamp = System.currentTimeMillis() * 1000; + c.ttl = 0; + ColumnOrSuperColumn cosc = new ColumnOrSuperColumn(); + cosc.column = c; + return cosc; + } + } + public int run(String[] args) throws Exception { + String outputReducerType = "filesystem"; + if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR)) + { + String[] s = args[0].split("="); + if (s != null && s.length == 2) + outputReducerType = s[1]; + } + logger.info("output reducer type: " + outputReducerType); for (int i = 0; i < WordCountSetup.TEST_COUNT; i++) { String columnName = "text" + i; getConf().set(CONF_COLUMN_NAME, columnName); + Job job = new Job(getConf(), "wordcount"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); - job.setCombinerClass(IntSumReducer.class); - job.setReducerClass(IntSumReducer.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); + + if (outputReducerType.equalsIgnoreCase("filesystem")) + { + job.setCombinerClass(ReducerToFilesystem.class); + job.setReducerClass(ReducerToFilesystem.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + i)); + } + else + { + job.setReducerClass(ReducerToCassandra.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(ByteBuffer.class); + job.setOutputValueClass(List.class); + + job.setOutputFormatClass(ColumnFamilyOutputFormat.class); + + ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY); + } job.setInputFormatClass(ColumnFamilyInputFormat.class); - FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + i)); + ConfigHelper.setRpcPort(job.getConfiguration(), "9160"); ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost"); + ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner"); ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes())); ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); + + job.waitForCompletion(true); } return 0; Modified: cassandra/trunk/contrib/word_count/src/WordCountSetup.java URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/src/WordCountSetup.java?rev=1001930&r1=1001929&r2=1001930&view=diff ============================================================================== --- cassandra/trunk/contrib/word_count/src/WordCountSetup.java (original) +++ cassandra/trunk/contrib/word_count/src/WordCountSetup.java Mon Sep 27 22:11:05 2010 @@ -97,8 +97,8 @@ public class WordCountSetup private static void setupKeyspace(Cassandra.Iface client) throws TException, InvalidRequestException { List<CfDef> cfDefList = new ArrayList<CfDef>(); - CfDef cfDef = new CfDef(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY); - cfDefList.add(cfDef); + cfDefList.add(new CfDef(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY)); + cfDefList.add(new CfDef(WordCount.KEYSPACE, WordCount.OUTPUT_COLUMN_FAMILY)); client.system_add_keyspace(new KsDef(WordCount.KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", 1, cfDefList)); } Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=1001930&r1=1001929&r2=1001930&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java Mon Sep 27 22:11:05 2010 @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific * OutputFormat that allows reduce tasks to store keys (and corresponding * values) as Cassandra rows (and respective columns) in a given - * {...@link ColumnFamily}. + * ColumnFamily. * * <p> * As is the case with the {...@link ColumnFamilyInputFormat}, you need to set the