Updated Branches: refs/heads/master ef10e80bc -> ed7481d9c
Add javadoc to MRPipeline constructors Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/ed7481d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/ed7481d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/ed7481d9 Branch: refs/heads/master Commit: ed7481d9c88fc9968efa24a18d9b63ce25f0f9d0 Parents: ef10e80 Author: Gabriel Reid <[email protected]> Authored: Fri Oct 5 14:02:22 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Fri Oct 5 14:02:22 2012 +0200 ---------------------------------------------------------------------- .../java/org/apache/crunch/impl/mr/MRPipeline.java | 58 ++++++++++---- 1 files changed, 41 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/ed7481d9/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index f32783a..043f7b1 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -58,6 +58,9 @@ import org.apache.log4j.Logger; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +/** + * Pipeline implementation that is executed within Hadoop MapReduce. + */ public class MRPipeline implements Pipeline { private static final Log LOG = LogFactory.getLog(MRPipeline.class); @@ -74,18 +77,43 @@ public class MRPipeline implements Pipeline { private Configuration conf; + /** + * Instantiate with a default Configuration and name. + * + * @param jarClass Class containing the main driver method for running the pipeline + */ public MRPipeline(Class<?> jarClass) { this(jarClass, new Configuration()); } + /** + * Instantiate with a custom pipeline name. The name will be displayed in the Hadoop JobTracker. + * + * @param jarClass Class containing the main driver method for running the pipeline + * @param name Display name of the pipeline + */ public MRPipeline(Class<?> jarClass, String name) { this(jarClass, name, new Configuration()); } + /** + * Instantiate with a custom configuration and default naming. + * + * @param jarClass Class containing the main driver method for running the pipeline + * @param conf Configuration to be used within all MapReduce jobs run in the pipeline + */ public MRPipeline(Class<?> jarClass, Configuration conf) { this(jarClass, jarClass.getName(), conf); } + /** + * Instantiate with a custom name and configuration. The name will be displayed in the Hadoop + * JobTracker. + * + * @param jarClass Class containing the main driver method for running the pipeline + * @param name Display name of the pipeline + * @param conf Configuration to be used within all MapReduce jobs run in the pipeline + */ public MRPipeline(Class<?> jarClass, String name, Configuration conf) { this.jarClass = jarClass; this.name = name; @@ -165,8 +193,8 @@ public class MRPipeline implements Pipeline { if (pcollection instanceof PGroupedTableImpl) { pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup(); } else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) { - pcollection = pcollection.parallelDo("UnionCollectionWrapper", (MapFn) IdentityFn.<Object> getInstance(), - pcollection.getPType()); + pcollection = pcollection.parallelDo("UnionCollectionWrapper", + (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType()); } addOutput((PCollectionImpl<?>) pcollection, target); } @@ -192,17 +220,14 @@ public class MRPipeline implements Pipeline { } /** - * Retrieve a ReadableSourceTarget that provides access to the contents of a - * {@link PCollection}. This is primarily intended as a helper method to - * {@link #materialize(PCollection)}. The underlying data of the - * ReadableSourceTarget may not be actually present until the pipeline is run. + * Retrieve a ReadableSourceTarget that provides access to the contents of a {@link PCollection}. + * This is primarily intended as a helper method to {@link #materialize(PCollection)}. The + * underlying data of the ReadableSourceTarget may not be actually present until the pipeline is + * run. * - * @param pcollection - * The collection for which the ReadableSourceTarget is to be - * retrieved + * @param pcollection The collection for which the ReadableSourceTarget is to be retrieved * @return The ReadableSourceTarget - * @throws IllegalArgumentException - * If no ReadableSourceTarget can be retrieved for the given + * @throws IllegalArgumentException If no ReadableSourceTarget can be retrieved for the given * PCollection */ public <T> ReadableSourceTarget<T> getMaterializeSourceTarget(PCollection<T> pcollection) { @@ -237,11 +262,10 @@ public class MRPipeline implements Pipeline { } /** - * Safely cast a PCollection into a PCollectionImpl, including handling the - * case of UnionCollections. + * Safely cast a PCollection into a PCollectionImpl, including handling the case of + * UnionCollections. * - * @param pcollection - * The PCollection to be cast/transformed + * @param pcollection The PCollection to be cast/transformed * @return The PCollectionImpl representation */ private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> pcollection) { @@ -282,8 +306,8 @@ public class MRPipeline implements Pipeline { @Override public <T> void writeTextFile(PCollection<T> pcollection, String pathName) { // Ensure that this is a writable pcollection instance. - pcollection = pcollection.parallelDo("asText", IdentityFn.<T> getInstance(), - WritableTypeFamily.getInstance().as(pcollection.getPType())); + pcollection = pcollection.parallelDo("asText", IdentityFn.<T> getInstance(), WritableTypeFamily + .getInstance().as(pcollection.getPType())); write(pcollection, At.textFile(pathName)); }
