Updated Branches: refs/heads/master 374bf3de6 -> 06ef56e60
CRUNCH-124: Additional javadoc details and fix a few generics warnings Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/06ef56e6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/06ef56e6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/06ef56e6 Branch: refs/heads/master Commit: 06ef56e6017d296242ec05ca9b8ed36b9661ab84 Parents: 8ce493a Author: Josh Wills <[email protected]> Authored: Fri Dec 7 11:45:35 2012 -0800 Committer: Josh Wills <[email protected]> Committed: Fri Dec 7 11:45:35 2012 -0800 ---------------------------------------------------------------------- crunch/src/main/java/org/apache/crunch/io/At.java | 30 ++++++++++- .../src/main/java/org/apache/crunch/io/From.java | 41 ++++++++++++--- crunch/src/main/java/org/apache/crunch/io/To.java | 40 ++++++++++++-- 3 files changed, 97 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/06ef56e6/crunch/src/main/java/org/apache/crunch/io/At.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/At.java b/crunch/src/main/java/org/apache/crunch/io/At.java index d91647b..a6f0782 100644 --- a/crunch/src/main/java/org/apache/crunch/io/At.java +++ b/crunch/src/main/java/org/apache/crunch/io/At.java @@ -33,8 +33,34 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; /** - * Static factory methods for creating common {@link SourceTarget} types, which may be treated as both a {@code Source} - * and a {@code Target}. + * <p>Static factory methods for creating common {@link SourceTarget} types, which may be treated as both a {@code Source} + * and a {@code Target}.</p> + * + * <p>The {@code At} methods is analogous to the {@link From} and {@link To} factory methods, but is used for + * storing intermediate outputs that need to be passed from one run of a MapReduce pipeline to another run. The + * {@code SourceTarget} object acts as both a {@code Source} and a {@Target}, which enables it to provide this + * functionality. + * + * <code> + * Pipeline pipeline = new MRPipeline(this.getClass()); + * // Create our intermediate storage location + * SourceTarget<String> intermediate = At.textFile("/temptext"); + * ... + * // Write out the output of the first phase of a pipeline. + * pipeline.write(phase1, intermediate); + * + * // Explicitly call run to kick off the pipeline. + * pipeline.run(); + * + * // And then kick off a second phase by consuming the output + * // from the first phase. + * PCollection<String> phase2Input = pipeline.read(intermediate); + * ... + * </code> + * </p> + * + * <p>The {@code SourceTarget} abstraction is useful when we care about reading the intermediate + * outputs of a pipeline as well as the final results.</p> */ public class At { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/06ef56e6/crunch/src/main/java/org/apache/crunch/io/From.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/From.java b/crunch/src/main/java/org/apache/crunch/io/From.java index 371f934..e4cfb6a 100644 --- a/crunch/src/main/java/org/apache/crunch/io/From.java +++ b/crunch/src/main/java/org/apache/crunch/io/From.java @@ -36,13 +36,38 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; /** - * Static factory methods for creating common {@link Source} types. + * <p>Static factory methods for creating common {@link Source} types.</p> + * + * <p>The {@code From} class is intended to provide a literate API for creating + * Crunch pipelines from common input file types. + * + * <code> + * Pipeline pipeline = new MRPipeline(this.getClass()); + * + * // Reference the lines of a text file by wrapping the TextInputFormat class. + * PCollection<String> lines = pipeline.read(From.textFile("/path/to/myfiles")); + * + * // Reference entries from a sequence file where the key is a LongWritable and the + * // value is a custom Writable class. + * PTable<LongWritable, MyWritable> table = pipeline.read(From.sequenceFile( + * "/path/to/seqfiles", LongWritable.class, MyWritable.class)); + * + * // Reference the records from an Avro file, where MyAvroObject implements Avro's + * // SpecificRecord interface. + * PCollection<MyAvroObject> myObjects = pipeline.read(From.avroFile("/path/to/avrofiles", + * MyAvroObject.class)); + * + * // References the key-value pairs from a custom extension of FileInputFormat: + * PTable<KeyWritable, ValueWritable> custom = pipeline.read(From.formattedFile( + * "/custom", MyFileInputFormat.class, KeyWritable.class, ValueWritable.class)); + * </code> + * </p> */ public class From { /** * Creates a {@code TableSource<K, V>} for reading data from files that have custom - * {@code FileInputFormat} implementations not covered by the provided {@code TableSource} + * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code TableSource} * and {@code Source} factory methods. * * @param pathName The name of the path to the data on the filesystem @@ -52,14 +77,14 @@ public class From { * @return A new {@code TableSource<K, V>} instance */ public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile( - String pathName, Class<? extends FileInputFormat> formatClass, + String pathName, Class<? extends FileInputFormat<K, V>> formatClass, Class<K> keyClass, Class<V> valueClass) { return formattedFile(new Path(pathName), formatClass, keyClass, valueClass); } /** * Creates a {@code TableSource<K, V>} for reading data from files that have custom - * {@code FileInputFormat} implementations not covered by the provided {@code TableSource} + * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code TableSource} * and {@code Source} factory methods. * * @param The {@code Path} to the data @@ -69,7 +94,7 @@ public class From { * @return A new {@code TableSource<K, V>} instance */ public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile( - Path path, Class<? extends FileInputFormat> formatClass, + Path path, Class<? extends FileInputFormat<K, V>> formatClass, Class<K> keyClass, Class<V> valueClass) { return formattedFile(path, formatClass, Writables.writables(keyClass), Writables.writables(valueClass)); @@ -86,7 +111,8 @@ public class From { * @param valueType The {@code PType} to use for the value * @return A new {@code TableSource<K, V>} instance */ - public static <K, V> TableSource<K, V> formattedFile(String pathName, Class<? extends FileInputFormat> formatClass, + public static <K, V> TableSource<K, V> formattedFile(String pathName, + Class<? extends FileInputFormat<?, ?>> formatClass, PType<K> keyType, PType<V> valueType) { return formattedFile(new Path(pathName), formatClass, keyType, valueType); } @@ -102,7 +128,8 @@ public class From { * @param valueType The {@code PType} to use for the value * @return A new {@code TableSource<K, V>} instance */ - public static <K, V> TableSource<K, V> formattedFile(Path path, Class<? extends FileInputFormat> formatClass, + public static <K, V> TableSource<K, V> formattedFile(Path path, + Class<? extends FileInputFormat<?, ?>> formatClass, PType<K> keyType, PType<V> valueType) { PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, valueType); return new FileTableSourceImpl<K, V>(path, tableType, formatClass); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/06ef56e6/crunch/src/main/java/org/apache/crunch/io/To.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/To.java b/crunch/src/main/java/org/apache/crunch/io/To.java index d7af01b..d62d294 100644 --- a/crunch/src/main/java/org/apache/crunch/io/To.java +++ b/crunch/src/main/java/org/apache/crunch/io/To.java @@ -23,10 +23,39 @@ import org.apache.crunch.io.impl.FileTargetImpl; import org.apache.crunch.io.seq.SeqFileTarget; import org.apache.crunch.io.text.TextFileTarget; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** - * Static factory methods for creating common {@link Target} types. + * <p>Static factory methods for creating common {@link Target} types.</p> + * + * <p>The {@code To} class is intended to be used as part of a literate API + * for writing the output of Crunch pipelines to common file types. We can use + * the {@code Target} objects created by the factory methods in the {@code To} + * class with either the {@code write} method on the {@code Pipeline} class or + * the convenience {@code write} method on {@code PCollection} and {@code PTable} + * instances. + * + * <code> + * Pipeline pipeline = new MRPipeline(this.getClass()); + * ... + * // Write a PCollection<String> to a text file: + * PCollection<String> words = ...; + * pipeline.write(words, To.textFile("/put/my/words/here")); + * + * // Write a PTable<Text, Text> to a sequence file: + * PTable<Text, Text> textToText = ...; + * textToText.write(To.sequenceFile("/words/to/words")); + * + * // Write a PCollection<MyAvroObject> to an Avro data file: + * PCollection<MyAvroObject> objects = ...; + * objects.write(To.avroFile("/my/avro/files")); + * + * // Write a PTable to a custom FileOutputFormat: + * PTable<KeyWritable, ValueWritable> custom = ...; + * pipeline.write(custom, To.formattedFile("/custom", MyFileFormat.class)); + * </code> + * </p> */ public class To { @@ -35,10 +64,11 @@ public class To { * a custom {@code FileOutputFormat}. * * @param pathName The name of the path to write the data to on the filesystem - * @param formatClass The {@code FileOutputFormat} to write the data to + * @param formatClass The {@code FileOutputFormat<K, V>} to write the data to * @return A new {@code Target} instance */ - public static Target formattedFile(String pathName, Class<? extends FileOutputFormat> formatClass) { + public static <K extends Writable, V extends Writable> Target formattedFile( + String pathName, Class<? extends FileOutputFormat<K, V>> formatClass) { return formattedFile(new Path(pathName), formatClass); } @@ -50,7 +80,8 @@ public class To { * @param formatClass The {@code FileOutputFormat} to write the data to * @return A new {@code Target} instance */ - public static Target formattedFile(Path path, Class<? extends FileOutputFormat> formatClass) { + public static <K extends Writable, V extends Writable> Target formattedFile( + Path path, Class<? extends FileOutputFormat<K, V>> formatClass) { return new FileTargetImpl(path, formatClass, new SequentialFileNamingScheme()); } @@ -119,5 +150,4 @@ public class To { public static Target textFile(Path path) { return new TextFileTarget(path); } - }
