Update hadoop_cql3_word_count example patch by Chandar Pechetty; reviewed by Alex Liu for CASSANDRA-6703
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b4f2ff17 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b4f2ff17 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b4f2ff17 Branch: refs/heads/trunk Commit: b4f2ff17ad59e80c173f95f5f73419b989108c65 Parents: 98ed6a4 Author: Jonathan Ellis <jbel...@apache.org> Authored: Mon Mar 17 16:34:23 2014 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Mon Mar 17 16:34:23 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../hadoop_cql3_word_count/src/WordCount.java | 44 +++------------- .../src/WordCountCounters.java | 16 ++++++ .../src/WordCountSetup.java | 55 ++++---------------- 4 files changed, 35 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4f2ff17/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 688a759..040af7c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.7 + * Update hadoop_cql3_word_count example (CASSANDRA-6793) * Fix handling of RejectedExecution in sync Thrift server (CASSANDRA-6788) * Log more information when exceeding tombstone_warn_threshold (CASSANDRA-6865) * Fix truncate to not abort due to unreachable fat clients (CASSANDRA-6864) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4f2ff17/examples/hadoop_cql3_word_count/src/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java index c92f047..bc81a53 100644 --- a/examples/hadoop_cql3_word_count/src/WordCount.java +++ b/examples/hadoop_cql3_word_count/src/WordCount.java @@ -45,21 +45,16 @@ import java.nio.charset.CharacterCodingException; /** * This counts the occurrences of words in ColumnFamily - * cql3_worldcount ( user_id text, - * category_id text, - * sub_category_id text, - * title text, - * body text, - * PRIMARY KEY (user_id, category_id, sub_category_id)) + * cql3_worldcount ( id uuid, + * line text, + * PRIMARY KEY (id)) * * For each word, we output the total number of occurrences across all body texts. * * When outputting to Cassandra, we write the word counts to column family - * output_words ( row_id1 text, - * row_id2 text, - * word text, + * output_words ( word text, * count_num text, - * PRIMARY KEY ((row_id1, row_id2), word)) + * PRIMARY KEY (word)) * as a {word, count} to columns: word, count_num with a row key of "word sum" */ public class WordCount extends Configured implements Tool @@ -98,14 +93,11 @@ public class WordCount extends Configured implements Tool { for (Entry<String, ByteBuffer> column : columns.entrySet()) { - if (!"body".equalsIgnoreCase(column.getKey())) + if (!"line".equalsIgnoreCase(column.getKey())) continue; String value = ByteBufferUtil.string(column.getValue()); - logger.debug("read {}:{}={} from {}", - new Object[] {toString(keys), column.getKey(), value, context.getInputSplit()}); - StringTokenizer itr = new StringTokenizer(value); while (itr.hasMoreTokens()) { @@ -114,21 +106,6 @@ public class WordCount extends Configured implements Tool } } } - - private String toString(Map<String, ByteBuffer> keys) - { - String result = ""; - try - { - for (ByteBuffer key : keys.values()) - result = result + ByteBufferUtil.string(key) + ":"; - } - catch (CharacterCodingException e) - { - logger.error("Failed to print keys", e); - } - return result; - } } public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable> @@ -150,9 +127,6 @@ public class WordCount extends Configured implements Tool throws IOException, InterruptedException { keys = new LinkedHashMap<String, ByteBuffer>(); - String[] partitionKeys = context.getConfiguration().get(PRIMARY_KEY).split(","); - keys.put("row_id1", ByteBufferUtil.bytes(partitionKeys[0])); - keys.put("row_id2", ByteBufferUtil.bytes(partitionKeys[1])); } public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException @@ -160,13 +134,13 @@ public class WordCount extends Configured implements Tool int sum = 0; for (IntWritable val : values) sum += val.get(); + keys.put("word", ByteBufferUtil.bytes(word.toString())); context.write(keys, getBindVariables(word, sum)); } private List<ByteBuffer> getBindVariables(Text word, int sum) { List<ByteBuffer> variables = new ArrayList<ByteBuffer>(); - keys.put("word", ByteBufferUtil.bytes(word.toString())); variables.add(ByteBufferUtil.bytes(String.valueOf(sum))); return variables; } @@ -223,9 +197,7 @@ public class WordCount extends Configured implements Tool ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3"); - //this is the user defined filter clauses, you can comment it out if you want count all titles - CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "title='A'"); job.waitForCompletion(true); return 0; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4f2ff17/examples/hadoop_cql3_word_count/src/WordCountCounters.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/src/WordCountCounters.java b/examples/hadoop_cql3_word_count/src/WordCountCounters.java index 8454b70..542a473 100644 --- a/examples/hadoop_cql3_word_count/src/WordCountCounters.java +++ b/examples/hadoop_cql3_word_count/src/WordCountCounters.java @@ -33,6 +33,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -63,6 +64,7 @@ public class WordCountCounters extends Configured implements Tool public static class SumMapper extends Mapper<Map<String, ByteBuffer>, Map<String, ByteBuffer>, Text, LongWritable> { long sum = -1; + public void map(Map<String, ByteBuffer> key, Map<String, ByteBuffer> columns, Context context) throws IOException, InterruptedException { if (sum < 0) @@ -94,12 +96,26 @@ public class WordCountCounters extends Configured implements Tool } + public static class ReducerToFilesystem extends Reducer<Text, LongWritable, Text, LongWritable> + { + long sum = 0; + + public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException + { + for (LongWritable val : values) + sum += val.get(); + context.write(key, new LongWritable(sum)); + } + } + public int run(String[] args) throws Exception { Job job = new Job(getConf(), "wordcountcounters"); job.setJarByClass(WordCountCounters.class); job.setMapperClass(SumMapper.class); + job.setCombinerClass(ReducerToFilesystem.class); + job.setReducerClass(ReducerToFilesystem.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4f2ff17/examples/hadoop_cql3_word_count/src/WordCountSetup.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/src/WordCountSetup.java b/examples/hadoop_cql3_word_count/src/WordCountSetup.java index 0acb8f7..ebf7485 100644 --- a/examples/hadoop_cql3_word_count/src/WordCountSetup.java +++ b/examples/hadoop_cql3_word_count/src/WordCountSetup.java @@ -90,12 +90,9 @@ public class WordCountSetup TException { String query = "CREATE TABLE " + WordCount.KEYSPACE + "." + WordCount.COLUMN_FAMILY + - " ( user_id text," + - " category_id text, " + - " sub_category_id text," + - " title text," + - " body text," + - " PRIMARY KEY (user_id, category_id, sub_category_id) ) "; + " ( id uuid," + + " line text, " + + " PRIMARY KEY (id) ) "; try { @@ -107,22 +104,10 @@ public class WordCountSetup logger.error("failed to create table " + WordCount.KEYSPACE + "." + WordCount.COLUMN_FAMILY, e); } - query = "CREATE INDEX title on " + WordCount.COLUMN_FAMILY + "(title)"; - try - { - logger.info("set up index on title column "); - client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); - } - catch (InvalidRequestException e) - { - logger.error("Failed to create index on title", e); - } - query = "CREATE TABLE " + WordCount.KEYSPACE + "." + WordCount.OUTPUT_COLUMN_FAMILY + - " ( row_id text," + - " word text, " + + " ( word text," + " count_num text," + - " PRIMARY KEY (row_id, word) ) "; + " PRIMARY KEY (word) ) "; try { @@ -163,26 +148,19 @@ public class WordCountSetup TException { String query = "INSERT INTO " + WordCount.COLUMN_FAMILY + - "(user_id, category_id, sub_category_id, title, body ) " + - " values (?, ?, ?, ?, ?) "; + "(id, line) " + + " values (?, ?) "; CqlPreparedResult result = client.prepare_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE); - String [] title = titleData(); String [] body = bodyData(); - for (int i=1; i<5; i++) + for (int i = 0; i < 5; i++) { - for (int j=1; j<444; j++) + for (int j = 1; j <= 200; j++) { - for (int k=1; k<4; k++) - { List<ByteBuffer> values = new ArrayList<ByteBuffer>(); - values.add(ByteBufferUtil.bytes(String.valueOf(j))); - values.add(ByteBufferUtil.bytes(String.valueOf(i))); - values.add(ByteBufferUtil.bytes(String.valueOf(k))); - values.add(ByteBufferUtil.bytes(title[i])); + values.add(ByteBufferUtil.bytes(UUID.randomUUID())); values.add(ByteBufferUtil.bytes(body[i])); client.execute_prepared_cql3_query(result.itemId, values, ConsistencyLevel.ONE); - } } } } @@ -190,7 +168,6 @@ public class WordCountSetup private static String[] bodyData() { // Public domain context, source http://en.wikisource.org/wiki/If%E2%80%94 return new String[]{ - "", "If you can keep your head when all about you", "Are losing theirs and blaming it on you", "If you can trust yourself when all men doubt you,", @@ -198,16 +175,4 @@ public class WordCountSetup "If you can wait and not be tired by waiting," }; } - - private static String[] titleData() - { // Public domain context, source http://en.wikisource.org/wiki/If%E2%80%94 - return new String[]{ - "", - "A", - "B", - "C", - "D", - "E" - }; - } }