Rename packages for the crunch-examples project and add license headers Signed-off-by: Josh Wills <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2ff7247c Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2ff7247c Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2ff7247c Branch: refs/heads/master Commit: 2ff7247c081e69fe0fe2f04b7848876d25f802d2 Parents: dcbe378 Author: Josh Wills <[email protected]> Authored: Sat Jul 7 12:28:56 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Sat Jul 7 14:21:30 2012 -0700 ---------------------------------------------------------------------- examples/pom.xml | 8 +- .../cloudera/crunch/examples/AverageBytesByIP.java | 131 -------------- .../cloudera/crunch/examples/TotalBytesByIP.java | 103 ----------- .../com/cloudera/crunch/examples/WordCount.java | 73 -------- .../apache/crunch/examples/AverageBytesByIP.java | 134 +++++++++++++++ .../org/apache/crunch/examples/TotalBytesByIP.java | 106 ++++++++++++ .../java/org/apache/crunch/examples/WordCount.java | 76 ++++++++ 7 files changed, 320 insertions(+), 311 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 21c27ee..df00ac4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -4,7 +4,7 @@ <groupId>com.cloudera.crunch</groupId> <artifactId>crunch-examples</artifactId> <packaging>jar</packaging> - <version>0.2.0</version> + <version>0.3.0</version> <name>crunch-examples</name> <dependencies> @@ -22,9 +22,9 @@ </dependency> <dependency> - <groupId>com.cloudera.crunch</groupId> + <groupId>org.apache.crunch</groupId> <artifactId>crunch</artifactId> - <version>0.2.0</version> + <version>0.3.0</version> </dependency> </dependencies> @@ -47,7 +47,7 @@ </descriptors> <archive> <manifest> - <mainClass>com.cloudera.crunch.examples.WordCount</mainClass> + <mainClass>org.apache.crunch.examples.WordCount</mainClass> </manifest> </archive> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/src/main/java/com/cloudera/crunch/examples/AverageBytesByIP.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/cloudera/crunch/examples/AverageBytesByIP.java b/examples/src/main/java/com/cloudera/crunch/examples/AverageBytesByIP.java deleted file mode 100644 index daa5259..0000000 --- a/examples/src/main/java/com/cloudera/crunch/examples/AverageBytesByIP.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ -package com.cloudera.crunch.examples; - -import java.io.Serializable; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import com.cloudera.crunch.CombineFn; -import com.cloudera.crunch.DoFn; -import com.cloudera.crunch.Emitter; -import com.cloudera.crunch.MapFn; -import com.cloudera.crunch.PCollection; -import com.cloudera.crunch.PTable; -import com.cloudera.crunch.Pair; -import com.cloudera.crunch.Pipeline; -import com.cloudera.crunch.impl.mr.MRPipeline; -import com.cloudera.crunch.type.writable.Writables; - -@SuppressWarnings("serial") -public class AverageBytesByIP extends Configured implements Tool, Serializable { - static enum COUNTERS { - NO_MATCH, - CORRUPT_SIZE - } - static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\""; - public int run(String[] args) throws Exception { - if(args.length != 2) { - System.err.println(); - System.err.println("Two and only two arguments are accepted."); - System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output"); - System.err.println(); - GenericOptionsParser.printGenericCommandUsage(System.err); - return 1; - } - // Create an object to coordinate pipeline creation and execution. - Pipeline pipeline = new MRPipeline(AverageBytesByIP.class, getConf()); - // Reference a given text file as a collection of Strings. - PCollection<String> lines = pipeline.readTextFile(args[0]); - - // Combiner used for summing up response size and count - CombineFn<String, Pair<Long, Long>> stringPairOfLongsSumCombiner = - CombineFn.pairAggregator(CombineFn.SUM_LONGS, CombineFn.SUM_LONGS); - - // Table of (ip, sum(response size), count) - PTable<String, Pair<Long, Long>> remoteAddrResponseSize = lines.parallelDo(extractResponseSize, - Writables.tableOf(Writables.strings(), - Writables.pairs(Writables.longs(), Writables.longs()))) - .groupByKey() - .combineValues(stringPairOfLongsSumCombiner); - - // Calculate average response size by ip address - PTable<String, Double> avgs = remoteAddrResponseSize.parallelDo(calulateAverage, - Writables.tableOf(Writables.strings(), Writables.doubles())); - - // write the result to a text file - pipeline.writeTextFile(avgs, args[1]); - // Execute the pipeline as a MapReduce. - pipeline.done(); - return 0; - } - - // Function to calculate the average response size for a given ip address - // - // Input: (ip, sum(response size), count) - // Output: (ip, average response size) - MapFn<Pair<String, Pair<Long, Long>>, Pair<String, Double>> calulateAverage = - new MapFn<Pair<String, Pair<Long, Long>>, Pair<String, Double>>() { - @Override - public Pair<String, Double> map(Pair<String, Pair<Long, Long>> arg) { - Pair<Long, Long> sumCount = arg.second(); - double avg = 0; - if(sumCount.second() > 0) { - avg = (double)sumCount.first() / (double)sumCount.second(); - } - return Pair.of(arg.first(), avg); - } - }; - - // Function to parse apache log records - // Given a standard apache log line, extract the ip address and - // response size. Outputs ip and the response size and a count (1) so that - // a combiner can be used. - // - // Input: 55.1.3.2 ...... 200 512 .... - // Output: (55.1.3.2, (512, 1)) - DoFn<String, Pair<String, Pair<Long, Long>>> extractResponseSize = new DoFn<String, Pair<String, Pair<Long, Long>>>() { - transient Pattern pattern; - public void initialize() { - pattern = Pattern.compile(logRegex); - } - public void process(String line, Emitter<Pair<String, Pair<Long, Long>>> emitter) { - Matcher matcher = pattern.matcher(line); - if(matcher.matches()) { - try { - Long responseSize = Long.parseLong(matcher.group(7)); - Pair<Long, Long> sumCount = Pair.of(responseSize, 1L); - String remoteAddr = matcher.group(1); - emitter.emit(Pair.of(remoteAddr, sumCount)); - } catch (NumberFormatException e) { - this.getCounter(COUNTERS.CORRUPT_SIZE).increment(1); - } - } else { - this.getCounter(COUNTERS.NO_MATCH).increment(1); - } - } - }; - - - public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new AverageBytesByIP(), args); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/src/main/java/com/cloudera/crunch/examples/TotalBytesByIP.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/cloudera/crunch/examples/TotalBytesByIP.java b/examples/src/main/java/com/cloudera/crunch/examples/TotalBytesByIP.java deleted file mode 100644 index 3de1f73..0000000 --- a/examples/src/main/java/com/cloudera/crunch/examples/TotalBytesByIP.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ -package com.cloudera.crunch.examples; - -import java.io.Serializable; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import com.cloudera.crunch.CombineFn; -import com.cloudera.crunch.DoFn; -import com.cloudera.crunch.Emitter; -import com.cloudera.crunch.PCollection; -import com.cloudera.crunch.PTable; -import com.cloudera.crunch.Pair; -import com.cloudera.crunch.Pipeline; -import com.cloudera.crunch.impl.mr.MRPipeline; -import com.cloudera.crunch.type.writable.Writables; - -@SuppressWarnings("serial") -public class TotalBytesByIP extends Configured implements Tool, Serializable { - static enum COUNTERS { - NO_MATCH, - CORRUPT_SIZE - } - static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\""; - public int run(String[] args) throws Exception { - if(args.length != 2) { - System.err.println(); - System.err.println("Two and only two arguments are accepted."); - System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output"); - System.err.println(); - GenericOptionsParser.printGenericCommandUsage(System.err); - return 1; - } - // Create an object to coordinate pipeline creation and execution. - Pipeline pipeline = new MRPipeline(TotalBytesByIP.class, getConf()); - // Reference a given text file as a collection of Strings. - PCollection<String> lines = pipeline.readTextFile(args[0]); - - // Combiner used for summing up response size - CombineFn<String, Long> longSumCombiner = CombineFn.SUM_LONGS(); - - // Table of (ip, sum(response size)) - PTable<String, Long> ipAddrResponseSize = - lines.parallelDo(extractIPResponseSize, - Writables.tableOf(Writables.strings(),Writables.longs())) - .groupByKey() - .combineValues(longSumCombiner); - - pipeline.writeTextFile(ipAddrResponseSize, args[1]); - // Execute the pipeline as a MapReduce. - pipeline.done(); - return 0; - } - - // Function to parse apache log records - // Given a standard apache log line, extract the ip address and - // request size. Outputs the ip and response size. - // - // Input: 55.1.3.2 ...... 200 512 .... - // Output: (55.1.3.2, 512) - DoFn<String, Pair<String, Long>> extractIPResponseSize = new DoFn<String, Pair<String, Long>>() { - transient Pattern pattern; - public void initialize() { - pattern = Pattern.compile(logRegex); - } - public void process(String line, Emitter<Pair<String, Long>> emitter) { - Matcher matcher = pattern.matcher(line); - if(matcher.matches()) { - try { - Long requestSize = Long.parseLong(matcher.group(7)); - String remoteAddr = matcher.group(1); - emitter.emit(Pair.of(remoteAddr, requestSize)); - } catch (NumberFormatException e) { - // corrupt line, we should increment counter - } - } - } - }; - - - public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new TotalBytesByIP(), args); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/src/main/java/com/cloudera/crunch/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/cloudera/crunch/examples/WordCount.java b/examples/src/main/java/com/cloudera/crunch/examples/WordCount.java deleted file mode 100644 index a87b5a2..0000000 --- a/examples/src/main/java/com/cloudera/crunch/examples/WordCount.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ -package com.cloudera.crunch.examples; - -import com.cloudera.crunch.DoFn; -import com.cloudera.crunch.Emitter; -import com.cloudera.crunch.PCollection; -import com.cloudera.crunch.PTable; -import com.cloudera.crunch.Pipeline; -import com.cloudera.crunch.impl.mr.MRPipeline; -import com.cloudera.crunch.lib.Aggregate; -import com.cloudera.crunch.type.writable.Writables; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import java.io.Serializable; - -public class WordCount extends Configured implements Tool, Serializable { - public int run(String[] args) throws Exception { - if(args.length != 3) { - System.err.println(); - System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output"); - System.err.println(); - GenericOptionsParser.printGenericCommandUsage(System.err); - return 1; - } - // Create an object to coordinate pipeline creation and execution. - Pipeline pipeline = new MRPipeline(WordCount.class, getConf()); - // Reference a given text file as a collection of Strings. - PCollection<String> lines = pipeline.readTextFile(args[1]); - - // Define a function that splits each line in a PCollection of Strings into a - // PCollection made up of the individual words in the file. - PCollection<String> words = lines.parallelDo(new DoFn<String, String>() { - public void process(String line, Emitter<String> emitter) { - for (String word : line.split("\\s+")) { - emitter.emit(word); - } - } - }, Writables.strings()); // Indicates the serialization format - - // The count method applies a series of Crunch primitives and returns - // a map of the unique words in the input PCollection to their counts. - // Best of all, the count() function doesn't need to know anything about - // the kind of data stored in the input PCollection. - PTable<String, Long> counts = words.count(); - - // Instruct the pipeline to write the resulting counts to a text file. - pipeline.writeTextFile(counts, args[2]); - // Execute the pipeline as a MapReduce. - pipeline.done(); - return 0; - } - - public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new WordCount(), args); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java b/examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java new file mode 100644 index 0000000..930721f --- /dev/null +++ b/examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.examples; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import org.apache.crunch.CombineFn; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.types.writable.Writables; + +@SuppressWarnings("serial") +public class AverageBytesByIP extends Configured implements Tool, Serializable { + static enum COUNTERS { + NO_MATCH, + CORRUPT_SIZE + } + static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\""; + public int run(String[] args) throws Exception { + if(args.length != 2) { + System.err.println(); + System.err.println("Two and only two arguments are accepted."); + System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output"); + System.err.println(); + GenericOptionsParser.printGenericCommandUsage(System.err); + return 1; + } + // Create an object to coordinate pipeline creation and execution. + Pipeline pipeline = new MRPipeline(AverageBytesByIP.class, getConf()); + // Reference a given text file as a collection of Strings. + PCollection<String> lines = pipeline.readTextFile(args[0]); + + // Combiner used for summing up response size and count + CombineFn<String, Pair<Long, Long>> stringPairOfLongsSumCombiner = + CombineFn.pairAggregator(CombineFn.SUM_LONGS, CombineFn.SUM_LONGS); + + // Table of (ip, sum(response size), count) + PTable<String, Pair<Long, Long>> remoteAddrResponseSize = lines.parallelDo(extractResponseSize, + Writables.tableOf(Writables.strings(), + Writables.pairs(Writables.longs(), Writables.longs()))) + .groupByKey() + .combineValues(stringPairOfLongsSumCombiner); + + // Calculate average response size by ip address + PTable<String, Double> avgs = remoteAddrResponseSize.parallelDo(calulateAverage, + Writables.tableOf(Writables.strings(), Writables.doubles())); + + // write the result to a text file + pipeline.writeTextFile(avgs, args[1]); + // Execute the pipeline as a MapReduce. + pipeline.done(); + return 0; + } + + // Function to calculate the average response size for a given ip address + // + // Input: (ip, sum(response size), count) + // Output: (ip, average response size) + MapFn<Pair<String, Pair<Long, Long>>, Pair<String, Double>> calulateAverage = + new MapFn<Pair<String, Pair<Long, Long>>, Pair<String, Double>>() { + @Override + public Pair<String, Double> map(Pair<String, Pair<Long, Long>> arg) { + Pair<Long, Long> sumCount = arg.second(); + double avg = 0; + if(sumCount.second() > 0) { + avg = (double)sumCount.first() / (double)sumCount.second(); + } + return Pair.of(arg.first(), avg); + } + }; + + // Function to parse apache log records + // Given a standard apache log line, extract the ip address and + // response size. Outputs ip and the response size and a count (1) so that + // a combiner can be used. + // + // Input: 55.1.3.2 ...... 200 512 .... + // Output: (55.1.3.2, (512, 1)) + DoFn<String, Pair<String, Pair<Long, Long>>> extractResponseSize = new DoFn<String, Pair<String, Pair<Long, Long>>>() { + transient Pattern pattern; + public void initialize() { + pattern = Pattern.compile(logRegex); + } + public void process(String line, Emitter<Pair<String, Pair<Long, Long>>> emitter) { + Matcher matcher = pattern.matcher(line); + if(matcher.matches()) { + try { + Long responseSize = Long.parseLong(matcher.group(7)); + Pair<Long, Long> sumCount = Pair.of(responseSize, 1L); + String remoteAddr = matcher.group(1); + emitter.emit(Pair.of(remoteAddr, sumCount)); + } catch (NumberFormatException e) { + this.getCounter(COUNTERS.CORRUPT_SIZE).increment(1); + } + } else { + this.getCounter(COUNTERS.NO_MATCH).increment(1); + } + } + }; + + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new AverageBytesByIP(), args); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java b/examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java new file mode 100644 index 0000000..1b168fa --- /dev/null +++ b/examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.examples; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import org.apache.crunch.CombineFn; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.types.writable.Writables; + +@SuppressWarnings("serial") +public class TotalBytesByIP extends Configured implements Tool, Serializable { + static enum COUNTERS { + NO_MATCH, + CORRUPT_SIZE + } + static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\""; + public int run(String[] args) throws Exception { + if(args.length != 2) { + System.err.println(); + System.err.println("Two and only two arguments are accepted."); + System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output"); + System.err.println(); + GenericOptionsParser.printGenericCommandUsage(System.err); + return 1; + } + // Create an object to coordinate pipeline creation and execution. + Pipeline pipeline = new MRPipeline(TotalBytesByIP.class, getConf()); + // Reference a given text file as a collection of Strings. + PCollection<String> lines = pipeline.readTextFile(args[0]); + + // Combiner used for summing up response size + CombineFn<String, Long> longSumCombiner = CombineFn.SUM_LONGS(); + + // Table of (ip, sum(response size)) + PTable<String, Long> ipAddrResponseSize = + lines.parallelDo(extractIPResponseSize, + Writables.tableOf(Writables.strings(),Writables.longs())) + .groupByKey() + .combineValues(longSumCombiner); + + pipeline.writeTextFile(ipAddrResponseSize, args[1]); + // Execute the pipeline as a MapReduce. + pipeline.done(); + return 0; + } + + // Function to parse apache log records + // Given a standard apache log line, extract the ip address and + // request size. Outputs the ip and response size. + // + // Input: 55.1.3.2 ...... 200 512 .... + // Output: (55.1.3.2, 512) + DoFn<String, Pair<String, Long>> extractIPResponseSize = new DoFn<String, Pair<String, Long>>() { + transient Pattern pattern; + public void initialize() { + pattern = Pattern.compile(logRegex); + } + public void process(String line, Emitter<Pair<String, Long>> emitter) { + Matcher matcher = pattern.matcher(line); + if(matcher.matches()) { + try { + Long requestSize = Long.parseLong(matcher.group(7)); + String remoteAddr = matcher.group(1); + emitter.emit(Pair.of(remoteAddr, requestSize)); + } catch (NumberFormatException e) { + // corrupt line, we should increment counter + } + } + } + }; + + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new TotalBytesByIP(), args); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/src/main/java/org/apache/crunch/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/crunch/examples/WordCount.java b/examples/src/main/java/org/apache/crunch/examples/WordCount.java new file mode 100644 index 0000000..53db14e --- /dev/null +++ b/examples/src/main/java/org/apache/crunch/examples/WordCount.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.examples; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.lib.Aggregate; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.Serializable; + +public class WordCount extends Configured implements Tool, Serializable { + public int run(String[] args) throws Exception { + if(args.length != 3) { + System.err.println(); + System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output"); + System.err.println(); + GenericOptionsParser.printGenericCommandUsage(System.err); + return 1; + } + // Create an object to coordinate pipeline creation and execution. + Pipeline pipeline = new MRPipeline(WordCount.class, getConf()); + // Reference a given text file as a collection of Strings. + PCollection<String> lines = pipeline.readTextFile(args[1]); + + // Define a function that splits each line in a PCollection of Strings into a + // PCollection made up of the individual words in the file. + PCollection<String> words = lines.parallelDo(new DoFn<String, String>() { + public void process(String line, Emitter<String> emitter) { + for (String word : line.split("\\s+")) { + emitter.emit(word); + } + } + }, Writables.strings()); // Indicates the serialization format + + // The count method applies a series of Crunch primitives and returns + // a map of the unique words in the input PCollection to their counts. + // Best of all, the count() function doesn't need to know anything about + // the kind of data stored in the input PCollection. + PTable<String, Long> counts = words.count(); + + // Instruct the pipeline to write the resulting counts to a text file. + pipeline.writeTextFile(counts, args[2]); + // Execute the pipeline as a MapReduce. + pipeline.done(); + return 0; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new WordCount(), args); + } +}
