CRUNCH-12: Create a basic Maven site. Configure Maven site plugin with Markdown support and some basic reports. Add a site descriptor and CSS borrowed from Apache Whirr. Move README to Maven site, fixing Markdown problems on the way.
Signed-off-by: jwills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/15e24a24 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/15e24a24 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/15e24a24 Branch: refs/heads/master Commit: 15e24a2475013f773110fe803286f026dc1595c2 Parents: 5d841a4 Author: Matthias Friedrich <[email protected]> Authored: Wed Jul 11 17:45:07 2012 +0200 Committer: jwills <[email protected]> Committed: Wed Jul 11 11:31:13 2012 -0700 ---------------------------------------------------------------------- README.md | 288 ---------------------------------- crunch/pom.xml | 16 -- pom.xml | 73 ++++++--- src/site/markdown/index.md | 288 ++++++++++++++++++++++++++++++++++ src/site/resources/css/site.css | 34 ++++ src/site/site.xml | 65 ++++++++ 6 files changed, 438 insertions(+), 326 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/15e24a24/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md deleted file mode 100644 index e1fdefa..0000000 --- a/README.md +++ /dev/null @@ -1,288 +0,0 @@ -# Crunch - Simple and Efficient Java Library for MapReduce Pipelines - -## Introduction - -Crunch is a Java library for writing, testing, and running MapReduce pipelines, based on -Google's FlumeJava. Its goal is to make pipelines that are composed of many user-defined -functions simple to write, easy to test, and efficient to run. - -## Build and Installation - -Crunch uses Maven for dependency management. The code in the examples/ subdirectory relies -on the top-level crunch libraries. In order to execute the included WordCount application, run: - - mvn install - cd examples/ - mvn package - hadoop jar target/crunch-examples-0.2.0-job.jar com.cloudera.crunch.examples.WordCount <inputfile> <outputdir> - -## High Level Concepts - -### Data Model and Operators - -Crunch is centered around three interfaces that represent distributed datasets: `PCollection<T>`, `PTable<K, V>`, and `PGroupedTable<K, V>`. - -A `PCollection<T>` represents a distributed, unordered collection of elements of type T. For example, we represent a text file in Crunch as a -`PCollection<String>` object. PCollection provides a method, `parallelDo`, that applies a function to each element in a PCollection in parallel, -and returns a new PCollection as its result. - -A `PTable<K, V>` is a sub-interface of PCollection that represents a distributed, unordered multimap of its key type K to its value type V. -In addition to the parallelDo operation, PTable provides a `groupByKey` operation that aggregates all of the values in the PTable that -have the same key into a single record. It is the groupByKey operation that triggers the sort phase of a MapReduce job. - -The result of a groupByKey operation is a `PGroupedTable<K, V>` object, which is a distributed, sorted map of keys of type K to an Iterable -collection of values of type V. In addition to parallelDo, the PGroupedTable provides a `combineValues` operation, which allows for -a commutative and associative aggregation operator to be applied to the values of the PGroupedTable instance on both the map side and the -reduce side of a MapReduce job. - -Finally, PCollection, PTable, and PGroupedTable all support a `union` operation, which takes a series of distinct PCollections and treats -them as a single, virtual PCollection. The union operator is required for operations that combine multiple inputs, such as cogroups and -joins. - -### Pipeline Building and Execution - -Every Crunch pipeline starts with a `Pipeline` object that is used to coordinate building the pipeline and executing the underlying MapReduce -jobs. For efficiency, Crunch uses lazy evaluation, so it will only construct MapReduce jobs from the different stages of the pipelines when -the Pipeline object's `run` or `done` methods are called. - -## A Detailed Example - -Here is the classic WordCount application using Crunch: - - import com.cloudera.crunch.DoFn; - import com.cloudera.crunch.Emitter; - import com.cloudera.crunch.PCollection; - import com.cloudera.crunch.PTable; - import com.cloudera.crunch.Pipeline; - import com.cloudera.crunch.impl.mr.MRPipeline; - import com.cloudera.crunch.lib.Aggregate; - import com.cloudera.crunch.type.writable.Writables; - - public class WordCount { - public static void main(String[] args) throws Exception { - Pipeline pipeline = new MRPipeline(WordCount.class); - PCollection<String> lines = pipeline.readTextFile(args[0]); - - PCollection<String> words = lines.parallelDo("my splitter", new DoFn<String, String>() { - public void process(String line, Emitter<String> emitter) { - for (String word : line.split("\\s+")) { - emitter.emit(word); - } - } - }, Writables.strings()); - - PTable<String, Long> counts = Aggregate.count(words); - - pipeline.writeTextFile(counts, args[1]); - pipeline.run(); - } - } - -Let's walk through the example line by line. - -### Step 1: Creating a Pipeline and referencing a text file - -The `MRPipeline` implementation of the Pipeline interface compiles the individual stages of a -pipeline into a series of MapReduce jobs. The MRPipeline constructor takes a class argument -that is used to tell Hadoop where to find the code that is used in the pipeline execution. - -We now need to tell the Pipeline about the inputs it will be consuming. The Pipeline interface -defines a `readTextFile` method that takes in a String and returns a PCollection of Strings. -In addition to text files, Crunch supports reading data from SequenceFiles and Avro container files, -via the `SequenceFileSource` and `AvroFileSource` classes defined in the com.cloudera.crunch.io package. - -Note that each PCollection is a _reference_ to a source of data- no data is actually loaded into a -PCollection on the client machine. - -### Step 2: Splitting the lines of text into words - -Crunch defines a small set of primitive operations that can be composed in order to build complex data -pipelines. The first of these primitives is the `parallelDo` function, which applies a function (defined -by a subclass of `DoFn`) to every record in a PCollection, and returns a new PCollection that contains -the results. - -The first argument to parallelDo is a string that is used to identify this step in the pipeline. When -a pipeline is composed into a series of MapReduce jobs, it is often the case that multiple stages will -run within the same Mapper or Reducer. Having a string that identifies each processing step is useful -for debugging errors that occur in a running pipeline. - -The second argument to parallelDo is an anonymous subclass of DoFn. Each DoFn subclass must override -the `process` method, which takes in a record from the input PCollection and an `Emitter` object that -may have any number of output values written to it. In this case, our DoFn splits each lines up into -words, using a blank space as a separator, and emits the words from the split to the output PCollection. - -The last argument to parallelDo is an instance of the `PType` interface, which specifies how the data -in the output PCollection is serialized. While Crunch takes advantage of Java Generics to provide -compile-time type safety, the generic type information is not available at runtime. Crunch needs to know -how to map the records stored in each PCollection into a Hadoop-supported serialization format in order -to read and write data to disk. Two serialization implementations are supported in crunch via the -`PTypeFamily` interface: a Writable-based system that is defined in the com.cloudera.crunch.type.writable -package, and an Avro-based system that is defined in the com.cloudera.crunch.type.avro package. Each -implementation provides convenience methods for working with the common PTypes (Strings, longs, bytes, etc.) -as well as utility methods for creating PTypes from existing Writable classes or Avro schemas. - -### Step 3: Counting the words - -Out of Crunch's simple primitive operations, we can build arbitrarily complex chains of operations in order -to perform higher-level operations, like aggregations and joins, that can work on any type of input data. -Let's look at the implementation of the `Aggregate.count` function: - - package com.cloudera.crunch.lib; - - import com.cloudera.crunch.CombineFn; - import com.cloudera.crunch.MapFn; - import com.cloudera.crunch.PCollection; - import com.cloudera.crunch.PTable; - import com.cloudera.crunch.Pair; - import com.cloudera.crunch.type.PTypeFamily; - - public class Aggregate { - - private static class Counter<S> extends MapFn<S, Pair<S, Long>> { - public Pair<S, Long> map(S input) { - return Pair.of(input, 1L); - } - } - - public static <S> PTable<S, Long> count(PCollection<S> collect) { - PTypeFamily tf = collect.getTypeFamily(); - - // Create a PTable from the PCollection by mapping each element - // to a key of the PTable with the value equal to 1L - PTable<S, Long> withCounts = collect.parallelDo("count:" + collect.getName(), - new Counter<S>(), tf.tableOf(collect.getPType(), tf.longs())); - - // Group the records of the PTable based on their key. - PGroupedTable<S, Long> grouped = withCounts.groupByKey(); - - // Sum the 1L values associated with the keys to get the - // count of each element in this PCollection, and return it - // as a PTable so that it may be processed further or written - // out for storage. - return grouped.combineValues(CombineFn.<S>SUM_LONGS()); - } - } - -First, we get the PTypeFamily that is associated with the PType for the collection. The -call to parallelDo converts each record in this PCollection into a Pair of the input record -and the number one by extending the `MapFn` convenience subclass of DoFn, and uses the -`tableOf` method of the PTypeFamily to specify that the returned PCollection should be a -PTable instance, with the key being the PType of the PCollection and the value being the Long -implementation for this PTypeFamily. - -The next line features the second of Crunch's four operations, `groupByKey`. The groupByKey -operation may only be applied to a PTable, and returns an instance of the `PGroupedTable` -interface, which references the grouping of all of the values in the PTable that have the same key. -The groupByKey operation is what triggers the reduce phase of a MapReduce within Crunch. - -The last line in the function returns the output of the third of Crunch's four operations, -`combineValues`. The combineValues operator takes a `CombineFn` as an argument, which is a -specialized subclass of DoFn that operates on an implementation of Java's Iterable interface. The -use of combineValues (as opposed to parallelDo) signals to Crunch that the CombineFn may be used to -aggregate values for the same key on the map side of a MapReduce job as well as the reduce side. - -### Step 4: Writing the output and running the pipeline - -The Pipeline object also provides a `writeTextFile` convenience method for indicating that a -PCollection should be written to a text file. There are also output targets for SequenceFiles and -Avro container files, available in the com.cloudera.crunch.io package. - -After you are finished constructing a pipeline and specifying the output destinations, call the -pipeline's blocking `run` method in order to compile the pipeline into one or more MapReduce -jobs and execute them. - -## Writing Your Own Pipelines - -This section discusses the different steps of creating your own Crunch pipelines in more detail. - -### Writing a DoFn - -The DoFn class is designed to keep the complexity of the MapReduce APIs out of your way when you -don't need them while still keeping them accessible when you do. - -#### Serialization - -First, all DoFn instances are required to be `java.io.Serializable`. This is a key aspect of Crunch's design: -once a particular DoFn is assigned to the Map or Reduce stage of a MapReduce job, all of the state -of that DoFn is serialized so that it may be distributed to all of the nodes in the Hadoop cluster that -will be running that task. There are two important implications of this for developers: - -1. All member values of a DoFn must be either serializable or marked as `transient`. -2. All anonymous DoFn instances must be defined in a static method or in a class that is itself serializable. - -Because sometimes you will need to work with non-serializable objects inside of a DoFn, every DoFn provides an -`initialize` method that is called before the `process` method is ever called so that any initialization tasks, -such as creating a non-serializable member variable, can be performed before processing begins. Similarly, all -DoFn instances have a `cleanup` method that may be called after processing has finished to perform any required -cleanup tasks. - -#### Scale Factor - -The DoFn class defines a `scaleFactor` method that can be used to signal to the MapReduce compiler that a particular -DoFn implementation will yield an output PCollection that is larger (scaleFactor > 1) or smaller (0 < scaleFactor < 1) -than the input PCollection it is applied to. The compiler may use this information to determine how to optimally -split processing tasks between the Map and Reduce phases of dependent MapReduce jobs. - -#### Other Utilities - -The DoFn base class provides convenience methods for accessing the `Configuration` and `Counter` objects that -are associated with a MapReduce stage, so that they may be accessed during initialization, processing, and cleanup. - -### Performing Cogroups and Joins - -In Crunch, cogroups and joins are performed on PTable instances that have the same key type. This section walks through -the basic flow of a cogroup operation, explaining how this higher-level operation is composed of Crunch's four primitives. -In general, these common operations are provided as part of the core Crunch library or in extensions, you do not need -to write them yourself. But it can be useful to understand how they work under the covers. - -Assume we have a `PTable<K, U>` named "a" and a different `PTable<K, V>` named "b" that we would like to combine into a -single `PTable<K, Pair<Collection<U>, Collection<V>>>`. First, we need to apply parallelDo operations to a and b that -convert them into the same Crunch type, `PTable<K, Pair<U, V>>`: - - // Perform the "tagging" operation as a parallelDo on PTable a - PTable<K, Pair<U, V>> aPrime = a.parallelDo("taga", new MapFn<Pair<K, U>, Pair<K, Pair<U, V>>>() { - public Pair<K, Pair<U, V>> map(Pair<K, U> input) { - return Pair.of(input.first(), Pair.of(input.second(), null)); - } - }, tableOf(a.getKeyType(), pair(a.getValueType(), b.getValueType()))); - - // Perform the "tagging" operation as a parallelDo on PTable b - PTable<K, Pair<U, V>> bPrime = b.parallelDo("tagb", new MapFn<Pair<K, V>, Pair<K, Pair<U, V>>>() { - public Pair<K, Pair<U, V>> map(Pair<K, V> input) { - return Pair.of(input.first(), Pair.of(null, input.second())); - } - }, tableOf(a.getKeyType(), pair(a.getValueType(), b.getValueType()))); - -Once the input PTables are tagged into a single type, we can apply the union operation to create a single PTable -reference that includes both of the tagged PTables and then group the unioned PTable by the common key: - - PTable<K, Pair<U, V>> both = aPrime.union(bPrime); - PGroupedTable<K, Pair<U, V>> grouped = both.groupByKey(); - -The grouping operation will create an `Iterable<Pair<U, V>>` which we can then convert to a `Pair<Collection<U>, Collection<V>>`: - - grouped.parallelDo("cogroup", new MapFn<Pair<K, Iterable<Pair<U, V>>>, Pair<K, Pair<Collection<U>, Collection<V>>>>() { - public Pair<K, Pair<Collection<U>, Collection<V>>> map(Pair<K, Iterable<Pair<U, V>>> input) { - Collection<U> uValues = new ArrayList<U>(); - Collection<V> vValues = new ArrayList<V>(); - for (Pair<U, V> pair : input.second()) { - if (pair.first() != null) { - uValues.add(pair.first()); - } else { - vValues.add(pair.second()); - } - } - return Pair.of(input.first(), Pair.of(uValues, vValues)); - }, - }, tableOf(grouped.getKeyType(), pair(collections(a.getValueType()), collections(b.getValueType())))); - -## Current Limitations and Future Work - -This section contains an almost certainly incomplete list of known limitations of Crunch and plans for future work. - -* We would like to have easy support for reading and writing data from/to HCatalog. -* The decision of how to split up processing tasks between dependent MapReduce jobs is very naiive right now- we simply -delegate all of the work to the reduce stage of the predecessor job. We should take advantage of information about the -expected size of different PCollections to optimize this processing. -* The Crunch optimizer does not yet merge different groupByKey operations that run over the same input data into a single -MapReduce job. Implementing this optimization will provide a major performance benefit for a number of problems. http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/15e24a24/crunch/pom.xml ---------------------------------------------------------------------- diff --git a/crunch/pom.xml b/crunch/pom.xml index 1d9af31..8f207d9 100644 --- a/crunch/pom.xml +++ b/crunch/pom.xml @@ -150,22 +150,6 @@ under the License. <build> <plugins> <plugin> - <groupId>com.github.github</groupId> - <artifactId>site-maven-plugin</artifactId> - <version>0.5</version> - <configuration> - <message>Building site for ${project.version}</message> - </configuration> - <executions> - <execution> - <goals> - <goal>site</goal> - </goals> - <phase>site</phase> - </execution> - </executions> - </plugin> - <plugin> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/15e24a24/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e4bee92..c4d1ebb 100644 --- a/pom.xml +++ b/pom.xml @@ -26,6 +26,7 @@ under the License. <packaging>pom</packaging> <name>Apache Incubator Crunch</name> + <url>http://incubator.apache.org/crunch/</url> <modules> <module>crunch</module> @@ -467,6 +468,15 @@ under the License. </repository> </repositories> + <distributionManagement> + <site> + <id>apache.website</id> + <url> + scpexe://people.apache.invalid/www/crunch.apache.org/docs/${project.version} + </url> + </site> + </distributionManagement> + <build> <plugins> <plugin> @@ -501,32 +511,51 @@ under the License. </executions> </plugin> <plugin> - <groupId>com.github.github</groupId> - <artifactId>site-maven-plugin</artifactId> - <version>0.5</version> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <version>3.1</version> + <dependencies> + <dependency> + <groupId>org.apache.maven.wagon</groupId> + <artifactId>wagon-ssh</artifactId> + <version>1.0</version> + </dependency> + <dependency> + <groupId>org.apache.maven.doxia</groupId> + <artifactId>doxia-module-markdown</artifactId> + <version>1.3</version> + </dependency> + </dependencies> <configuration> - <message>Building site for ${project.version}</message> + <inputEncoding>UTF-8</inputEncoding> + <outputEncoding>UTF-8</outputEncoding> + <reportPlugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-project-info-reports-plugin</artifactId> + <version>2.4</version> + <reportSets> + <reportSet> + <reports> + <report>cim</report> + <report>issue-tracking</report> + <report>license</report> + <report>mailing-list</report> + <report>project-team</report> + <report>scm</report> + </reports> + </reportSet> + </reportSets> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>cobertura-maven-plugin</artifactId> + <version>2.5.1</version> + </plugin> + </reportPlugins> </configuration> - <executions> - <execution> - <goals> - <goal>site</goal> - </goals> - <phase>site</phase> - </execution> - </executions> </plugin> </plugins> </build> - <reporting> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>cobertura-maven-plugin</artifactId> - <version>2.5.1</version> - </plugin> - </plugins> - </reporting> - </project> http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/15e24a24/src/site/markdown/index.md ---------------------------------------------------------------------- diff --git a/src/site/markdown/index.md b/src/site/markdown/index.md new file mode 100644 index 0000000..f5bae20 --- /dev/null +++ b/src/site/markdown/index.md @@ -0,0 +1,288 @@ +# Crunch - Simple and Efficient Java Library for MapReduce Pipelines + +## Introduction + +Crunch is a Java library for writing, testing, and running MapReduce pipelines, based on +Google's FlumeJava. Its goal is to make pipelines that are composed of many user-defined +functions simple to write, easy to test, and efficient to run. + +## Build and Installation + +Crunch uses Maven for dependency management. The code in the examples/ subdirectory relies +on the top-level crunch libraries. In order to execute the included WordCount application, run: + + mvn install + cd examples/ + mvn package + hadoop jar target/crunch-examples-0.2.0-job.jar com.cloudera.crunch.examples.WordCount <inputfile> <outputdir> + +## High Level Concepts + +### Data Model and Operators + +Crunch is centered around three interfaces that represent distributed datasets: `PCollection<T>`, `PTable<K, V>`, and `PGroupedTable<K, V>`. + +A `PCollection<T>` represents a distributed, unordered collection of elements of type T. For example, we represent a text file in Crunch as a +`PCollection<String>` object. PCollection provides a method, `parallelDo`, that applies a function to each element in a PCollection in parallel, +and returns a new PCollection as its result. + +A `PTable<K, V>` is a sub-interface of PCollection that represents a distributed, unordered multimap of its key type K to its value type V. +In addition to the parallelDo operation, PTable provides a `groupByKey` operation that aggregates all of the values in the PTable that +have the same key into a single record. It is the groupByKey operation that triggers the sort phase of a MapReduce job. + +The result of a groupByKey operation is a `PGroupedTable<K, V>` object, which is a distributed, sorted map of keys of type K to an Iterable +collection of values of type V. In addition to parallelDo, the PGroupedTable provides a `combineValues` operation, which allows for +a commutative and associative aggregation operator to be applied to the values of the PGroupedTable instance on both the map side and the +reduce side of a MapReduce job. + +Finally, PCollection, PTable, and PGroupedTable all support a `union` operation, which takes a series of distinct PCollections and treats +them as a single, virtual PCollection. The union operator is required for operations that combine multiple inputs, such as cogroups and +joins. + +### Pipeline Building and Execution + +Every Crunch pipeline starts with a `Pipeline` object that is used to coordinate building the pipeline and executing the underlying MapReduce +jobs. For efficiency, Crunch uses lazy evaluation, so it will only construct MapReduce jobs from the different stages of the pipelines when +the Pipeline object's `run` or `done` methods are called. + +## A Detailed Example + +Here is the classic WordCount application using Crunch: + + import com.cloudera.crunch.DoFn; + import com.cloudera.crunch.Emitter; + import com.cloudera.crunch.PCollection; + import com.cloudera.crunch.PTable; + import com.cloudera.crunch.Pipeline; + import com.cloudera.crunch.impl.mr.MRPipeline; + import com.cloudera.crunch.lib.Aggregate; + import com.cloudera.crunch.type.writable.Writables; + + public class WordCount { + public static void main(String[] args) throws Exception { + Pipeline pipeline = new MRPipeline(WordCount.class); + PCollection<String> lines = pipeline.readTextFile(args[0]); + + PCollection<String> words = lines.parallelDo("my splitter", new DoFn<String, String>() { + public void process(String line, Emitter<String> emitter) { + for (String word : line.split("\\s+")) { + emitter.emit(word); + } + } + }, Writables.strings()); + + PTable<String, Long> counts = Aggregate.count(words); + + pipeline.writeTextFile(counts, args[1]); + pipeline.run(); + } + } + +Let's walk through the example line by line. + +### Step 1: Creating a Pipeline and referencing a text file + +The `MRPipeline` implementation of the Pipeline interface compiles the individual stages of a +pipeline into a series of MapReduce jobs. The MRPipeline constructor takes a class argument +that is used to tell Hadoop where to find the code that is used in the pipeline execution. + +We now need to tell the Pipeline about the inputs it will be consuming. The Pipeline interface +defines a `readTextFile` method that takes in a String and returns a PCollection of Strings. +In addition to text files, Crunch supports reading data from SequenceFiles and Avro container files, +via the `SequenceFileSource` and `AvroFileSource` classes defined in the com.cloudera.crunch.io package. + +Note that each PCollection is a _reference_ to a source of data- no data is actually loaded into a +PCollection on the client machine. + +### Step 2: Splitting the lines of text into words + +Crunch defines a small set of primitive operations that can be composed in order to build complex data +pipelines. The first of these primitives is the `parallelDo` function, which applies a function (defined +by a subclass of `DoFn`) to every record in a PCollection, and returns a new PCollection that contains +the results. + +The first argument to parallelDo is a string that is used to identify this step in the pipeline. When +a pipeline is composed into a series of MapReduce jobs, it is often the case that multiple stages will +run within the same Mapper or Reducer. Having a string that identifies each processing step is useful +for debugging errors that occur in a running pipeline. + +The second argument to parallelDo is an anonymous subclass of DoFn. Each DoFn subclass must override +the `process` method, which takes in a record from the input PCollection and an `Emitter` object that +may have any number of output values written to it. In this case, our DoFn splits each lines up into +words, using a blank space as a separator, and emits the words from the split to the output PCollection. + +The last argument to parallelDo is an instance of the `PType` interface, which specifies how the data +in the output PCollection is serialized. While Crunch takes advantage of Java Generics to provide +compile-time type safety, the generic type information is not available at runtime. Crunch needs to know +how to map the records stored in each PCollection into a Hadoop-supported serialization format in order +to read and write data to disk. Two serialization implementations are supported in crunch via the +`PTypeFamily` interface: a Writable-based system that is defined in the com.cloudera.crunch.type.writable +package, and an Avro-based system that is defined in the com.cloudera.crunch.type.avro package. Each +implementation provides convenience methods for working with the common PTypes (Strings, longs, bytes, etc.) +as well as utility methods for creating PTypes from existing Writable classes or Avro schemas. + +### Step 3: Counting the words + +Out of Crunch's simple primitive operations, we can build arbitrarily complex chains of operations in order +to perform higher-level operations, like aggregations and joins, that can work on any type of input data. +Let's look at the implementation of the `Aggregate.count` function: + + package com.cloudera.crunch.lib; + + import com.cloudera.crunch.CombineFn; + import com.cloudera.crunch.MapFn; + import com.cloudera.crunch.PCollection; + import com.cloudera.crunch.PTable; + import com.cloudera.crunch.Pair; + import com.cloudera.crunch.type.PTypeFamily; + + public class Aggregate { + + private static class Counter<S> extends MapFn<S, Pair<S, Long>> { + public Pair<S, Long> map(S input) { + return Pair.of(input, 1L); + } + } + + public static <S> PTable<S, Long> count(PCollection<S> collect) { + PTypeFamily tf = collect.getTypeFamily(); + + // Create a PTable from the PCollection by mapping each element + // to a key of the PTable with the value equal to 1L + PTable<S, Long> withCounts = collect.parallelDo("count:" + collect.getName(), + new Counter<S>(), tf.tableOf(collect.getPType(), tf.longs())); + + // Group the records of the PTable based on their key. + PGroupedTable<S, Long> grouped = withCounts.groupByKey(); + + // Sum the 1L values associated with the keys to get the + // count of each element in this PCollection, and return it + // as a PTable so that it may be processed further or written + // out for storage. + return grouped.combineValues(CombineFn.<S>SUM_LONGS()); + } + } + +First, we get the PTypeFamily that is associated with the PType for the collection. The +call to parallelDo converts each record in this PCollection into a Pair of the input record +and the number one by extending the `MapFn` convenience subclass of DoFn, and uses the +`tableOf` method of the PTypeFamily to specify that the returned PCollection should be a +PTable instance, with the key being the PType of the PCollection and the value being the Long +implementation for this PTypeFamily. + +The next line features the second of Crunch's four operations, `groupByKey`. The groupByKey +operation may only be applied to a PTable, and returns an instance of the `PGroupedTable` +interface, which references the grouping of all of the values in the PTable that have the same key. +The groupByKey operation is what triggers the reduce phase of a MapReduce within Crunch. + +The last line in the function returns the output of the third of Crunch's four operations, +`combineValues`. The combineValues operator takes a `CombineFn` as an argument, which is a +specialized subclass of DoFn that operates on an implementation of Java's Iterable interface. The +use of combineValues (as opposed to parallelDo) signals to Crunch that the CombineFn may be used to +aggregate values for the same key on the map side of a MapReduce job as well as the reduce side. + +### Step 4: Writing the output and running the pipeline + +The Pipeline object also provides a `writeTextFile` convenience method for indicating that a +PCollection should be written to a text file. There are also output targets for SequenceFiles and +Avro container files, available in the com.cloudera.crunch.io package. + +After you are finished constructing a pipeline and specifying the output destinations, call the +pipeline's blocking `run` method in order to compile the pipeline into one or more MapReduce +jobs and execute them. + +## Writing Your Own Pipelines + +This section discusses the different steps of creating your own Crunch pipelines in more detail. + +### Writing a DoFn + +The DoFn class is designed to keep the complexity of the MapReduce APIs out of your way when you +don't need them while still keeping them accessible when you do. + +#### Serialization + +First, all DoFn instances are required to be `java.io.Serializable`. This is a key aspect of Crunch's design: +once a particular DoFn is assigned to the Map or Reduce stage of a MapReduce job, all of the state +of that DoFn is serialized so that it may be distributed to all of the nodes in the Hadoop cluster that +will be running that task. There are two important implications of this for developers: + +1. All member values of a DoFn must be either serializable or marked as `transient`. +2. All anonymous DoFn instances must be defined in a static method or in a class that is itself serializable. + +Because sometimes you will need to work with non-serializable objects inside of a DoFn, every DoFn provides an +`initialize` method that is called before the `process` method is ever called so that any initialization tasks, +such as creating a non-serializable member variable, can be performed before processing begins. Similarly, all +DoFn instances have a `cleanup` method that may be called after processing has finished to perform any required +cleanup tasks. + +#### Scale Factor + +The DoFn class defines a `scaleFactor` method that can be used to signal to the MapReduce compiler that a particular +DoFn implementation will yield an output PCollection that is larger (scaleFactor > 1) or smaller (0 < scaleFactor < 1) +than the input PCollection it is applied to. The compiler may use this information to determine how to optimally +split processing tasks between the Map and Reduce phases of dependent MapReduce jobs. + +#### Other Utilities + +The DoFn base class provides convenience methods for accessing the `Configuration` and `Counter` objects that +are associated with a MapReduce stage, so that they may be accessed during initialization, processing, and cleanup. + +### Performing Cogroups and Joins + +In Crunch, cogroups and joins are performed on PTable instances that have the same key type. This section walks through +the basic flow of a cogroup operation, explaining how this higher-level operation is composed of Crunch's four primitives. +In general, these common operations are provided as part of the core Crunch library or in extensions, you do not need +to write them yourself. But it can be useful to understand how they work under the covers. + +Assume we have a `PTable<K, U>` named "a" and a different `PTable<K, V>` named "b" that we would like to combine into a +single `PTable<K, Pair<Collection<U>, Collection<V>>>`. First, we need to apply parallelDo operations to a and b that +convert them into the same Crunch type, `PTable<K, Pair<U, V>>`: + + // Perform the "tagging" operation as a parallelDo on PTable a + PTable<K, Pair<U, V>> aPrime = a.parallelDo("taga", new MapFn<Pair<K, U>, Pair<K, Pair<U, V>>>() { + public Pair<K, Pair<U, V>> map(Pair<K, U> input) { + return Pair.of(input.first(), Pair.of(input.second(), null)); + } + }, tableOf(a.getKeyType(), pair(a.getValueType(), b.getValueType()))); + + // Perform the "tagging" operation as a parallelDo on PTable b + PTable<K, Pair<U, V>> bPrime = b.parallelDo("tagb", new MapFn<Pair<K, V>, Pair<K, Pair<U, V>>>() { + public Pair<K, Pair<U, V>> map(Pair<K, V> input) { + return Pair.of(input.first(), Pair.of(null, input.second())); + } + }, tableOf(a.getKeyType(), pair(a.getValueType(), b.getValueType()))); + +Once the input PTables are tagged into a single type, we can apply the union operation to create a single PTable +reference that includes both of the tagged PTables and then group the unioned PTable by the common key: + + PTable<K, Pair<U, V>> both = aPrime.union(bPrime); + PGroupedTable<K, Pair<U, V>> grouped = both.groupByKey(); + +The grouping operation will create an `Iterable<Pair<U, V>>` which we can then convert to a `Pair<Collection<U>, Collection<V>>`: + + grouped.parallelDo("cogroup", new MapFn<Pair<K, Iterable<Pair<U, V>>>, Pair<K, Pair<Collection<U>, Collection<V>>>>() { + public Pair<K, Pair<Collection<U>, Collection<V>>> map(Pair<K, Iterable<Pair<U, V>>> input) { + Collection<U> uValues = new ArrayList<U>(); + Collection<V> vValues = new ArrayList<V>(); + for (Pair<U, V> pair : input.second()) { + if (pair.first() != null) { + uValues.add(pair.first()); + } else { + vValues.add(pair.second()); + } + } + return Pair.of(input.first(), Pair.of(uValues, vValues)); + }, + }, tableOf(grouped.getKeyType(), pair(collections(a.getValueType()), collections(b.getValueType())))); + +## Current Limitations and Future Work + +This section contains an almost certainly incomplete list of known limitations of Crunch and plans for future work. + +* We would like to have easy support for reading and writing data from/to HCatalog. +* The decision of how to split up processing tasks between dependent MapReduce jobs is very naiive right now- we simply +delegate all of the work to the reduce stage of the predecessor job. We should take advantage of information about the +expected size of different PCollections to optimize this processing. +* The Crunch optimizer does not yet merge different groupByKey operations that run over the same input data into a single +MapReduce job. Implementing this optimization will provide a major performance benefit for a number of problems. http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/15e24a24/src/site/resources/css/site.css ---------------------------------------------------------------------- diff --git a/src/site/resources/css/site.css b/src/site/resources/css/site.css new file mode 100644 index 0000000..8ee551e --- /dev/null +++ b/src/site/resources/css/site.css @@ -0,0 +1,34 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +/* Borrowed from Apache Whirr */ + +#banner { + height: 93px; + background: none; +} + +#bannerLeft img { + height: 90px; + margin-left: 30px; + margin-top: 4px; +} + +#bannerRight img { + margin: 17px; +} + http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/15e24a24/src/site/site.xml ---------------------------------------------------------------------- diff --git a/src/site/site.xml b/src/site/site.xml new file mode 100644 index 0000000..dc6888d --- /dev/null +++ b/src/site/site.xml @@ -0,0 +1,65 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project name="Crunch" + xmlns="http://maven.apache.org/DECORATION/1.3.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/DECORATION/1.3.0 + http://maven.apache.org/xsd/decoration-1.3.0.xsd"> + + <skin> + <groupId>org.apache.maven.skins</groupId> + <artifactId>maven-stylus-skin</artifactId> + <version>1.4</version> + </skin> + + <bannerLeft> + <name></name> + </bannerLeft> + + <bannerRight> + <src>http://www.apache.org/images/asf_logo_wide.png</src> + <href>http://www.apache.org/</href> + </bannerRight> + + <publishDate position="right"/> + <version position="right"/> + + <body> + <!-- Note: Breadcrumbs for Doxia's Markdown parser are currently broken, + see https://jira.codehaus.org/browse/DOXIA-472 --> + <breadcrumbs> + <item name="Apache" href="http://www.apache.org/" /> + <item name="Crunch" href="http://incubator.apache.org/crunch/"/> + </breadcrumbs> + + <menu name="Crunch"> + <item name="About" href="http://incubator.apache.org/crunch/"/> + <item name="Wiki" href="https://cwiki.apache.org/confluence/display/CRUNCH/" /> + </menu> + + <menu name="Project Information"> + <item name="Mailing Lists" href="mail-lists.html" /> + <item name="Source Code" href="source-repository.html" /> + <item name="Issue Tracking" href="issue-tracking.html" /> + <item name="Continuous Integration" href="integration.html" /> + <item name="Team" href="team-list.html" /> + <item name="License" href="license.html" /> + </menu> + </body> + +</project>
