Repository: beam-site Updated Branches: refs/heads/asf-site 7b3e24f3b -> 8c9a89ebf
Transfer some content from Create Your Pipeline to the Programming Guide. Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/8ea44819 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/8ea44819 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/8ea44819 Branch: refs/heads/asf-site Commit: 8ea448195fdf7ea04c44e53886d23a148dfcadfc Parents: 7b3e24f Author: Hadar Hod <had...@google.com> Authored: Wed May 3 15:08:40 2017 -0700 Committer: Davor Bonaci <da...@google.com> Committed: Thu May 4 00:36:12 2017 -0700 ---------------------------------------------------------------------- .../pipelines/create-your-pipeline.md | 76 +---------- src/documentation/programming-guide.md | 133 ++++++++++++------- 2 files changed, 91 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam-site/blob/8ea44819/src/documentation/pipelines/create-your-pipeline.md ---------------------------------------------------------------------- diff --git a/src/documentation/pipelines/create-your-pipeline.md b/src/documentation/pipelines/create-your-pipeline.md index 120ec35..b765467 100644 --- a/src/documentation/pipelines/create-your-pipeline.md +++ b/src/documentation/pipelines/create-your-pipeline.md @@ -22,7 +22,7 @@ A Beam program often starts by creating a `Pipeline` object. In the Beam SDKs, each pipeline is represented by an explicit object of type `Pipeline`. Each `Pipeline` object is an independent entity that encapsulates both the data the pipeline operates over and the transforms that get applied to that data. -To create a pipeline, declare a `Pipeline` object, and pass it some configuration options, which are explained in a section below. You pass the configuration options by creating an object of type `PipelineOptions`, which you can build by using the static method `PipelineOptionsFactory.create()`. +To create a pipeline, declare a `Pipeline` object, and pass it some [configuration options]({{ site.baseurl }}/documentation/programming-guide#options). ```java // Start by defining the options for the pipeline. @@ -32,71 +32,6 @@ PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); ``` -### Configuring Pipeline Options - -Use the pipeline options to configure different aspects of your pipeline, such as the pipeline runner that will execute your pipeline and any runner-specific configuration required by the chosen runner. Your pipeline options will potentially include information such as your project ID or a location for storing files. - -When you run the pipeline on a runner of your choice, a copy of the PipelineOptions will be available to your code. For example, you can read PipelineOptions from a DoFn's Context. - -#### Setting PipelineOptions from Command-Line Arguments - -While you can configure your pipeline by creating a `PipelineOptions` object and setting the fields directly, the Beam SDKs include a command-line parser that you can use to set fields in `PipelineOptions` using command-line arguments. - -To read options from the command-line, construct your `PipelineOptions` object as demonstrated in the following example code: - -```java -MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); -``` - -This interprets command-line arguments that follow the format: - -```java ---<option>=<value> -``` - -> **Note:** Appending the method `.withValidation` will check for required command-line arguments and validate argument values. - -Building your `PipelineOptions` this way lets you specify any of the options as a command-line argument. - -> **Note:** The [WordCount example pipeline]({{ site.baseurl }}/get-started/wordcount-example) demonstrates how to set pipeline options at runtime by using command-line options. - -#### Creating Custom Options - -You can add your own custom options in addition to the standard `PipelineOptions`. To add your own options, define an interface with getter and setter methods for each option, as in the following example: - -```java -public interface MyOptions extends PipelineOptions { - String getMyCustomOption(); - void setMyCustomOption(String myCustomOption); - } -``` - -You can also specify a description, which appears when a user passes `--help` as a command-line argument, and a default value. - -You set the description and default value using annotations, as follows: - -```java -public interface MyOptions extends PipelineOptions { - @Description("My custom command line argument.") - @Default.String("DEFAULT") - String getMyCustomOption(); - void setMyCustomOption(String myCustomOption); - } -``` - -It's recommended that you register your interface with `PipelineOptionsFactory` and then pass the interface when creating the `PipelineOptions` object. When you register your interface with `PipelineOptionsFactory`, the `--help` can find your custom options interface and add it to the output of the `--help` command. `PipelineOptionsFactory` will also validate that your custom options are compatible with all other registered options. - -The following example code shows how to register your custom options interface with `PipelineOptionsFactory`: - -```java -PipelineOptionsFactory.register(MyOptions.class); -MyOptions options = PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(MyOptions.class); -``` - -Now your pipeline can accept `--myCustomOption=value` as a command-line argument. - ## Reading Data Into Your Pipeline To create your pipeline's initial `PCollection`, you apply a root transform to your pipeline object. A root transform creates a `PCollection` from either an external data source or some local data you specify. @@ -112,13 +47,7 @@ PCollection<String> lines = p.apply( ## Applying Transforms to Process Pipeline Data -To use transforms in your pipeline, you **apply** them to the `PCollection` that you want to transform. - -To apply a transform, you call the `apply` method on each `PCollection` that you want to process, passing the desired transform object as an argument. - -The Beam SDKs contain a number of different transforms that you can apply to your pipeline's `PCollection`s. These include general-purpose core transforms, such as [ParDo]({{ site.baseurl }}/documentation/programming-guide/#transforms-pardo) or [Combine]({{ site.baseurl }}/documentation/programming-guide/#transforms-combine). There are also pre-written [composite transforms]({{ site.baseurl }}/documentation/programming-guide/#transforms-composite) included in the SDKs, which combine one or more of the core transforms in a useful processing pattern, such as counting or combining elements in a collection. You can also define your own more complex composite transforms to fit your pipeline's exact use case. - -In the Beam Java SDK, each transform is a subclass of the base class `PTransform`. When you call `apply` on a `PCollection`, you pass the `PTransform` you want to use as an argument. +You can manipulate your data using the various [transforms]({{ site.baseurl }}/documentation/programming-guide/#transforms) provided in the Beam SDKs. To do this, you **apply** the trannsforms to your pipeline's `PCollection` by calling the `apply` method on each `PCollection` that you want to process and passing the desired transform object as an argument. The following code shows how to `apply` a transform to a `PCollection` of strings. The transform is a user-defined custom transform that reverses the contents of each string and outputs a new `PCollection` containing the reversed strings. @@ -158,4 +87,5 @@ p.run().waitUntilFinish(); ## What's next +* [Programming Guide]({{ site.baseurl }}/documentation/programming-guide) - Learn the details of creating your pipeline, configuring pipeline options, and applying transforms. * [Test your pipeline]({{ site.baseurl }}/documentation/pipelines/test-your-pipeline). http://git-wip-us.apache.org/repos/asf/beam-site/blob/8ea44819/src/documentation/programming-guide.md ---------------------------------------------------------------------- diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md index e3cd2d9..11ec86d 100644 --- a/src/documentation/programming-guide.md +++ b/src/documentation/programming-guide.md @@ -11,7 +11,6 @@ redirect_from: The **Beam Programming Guide** is intended for Beam users who want to use the Beam SDKs to create data processing pipelines. It provides guidance for using the Beam SDK classes to build and test your pipeline. It is not intended as an exhaustive reference, but as a language-agnostic, high-level guide to programmatically building your Beam pipeline. As the programming guide is filled out, the text will include code samples in multiple languages to help illustrate how to implement Beam concepts in your pipelines. - <nav class="language-switcher"> <strong>Adapt for:</strong> <ul> @@ -24,14 +23,10 @@ The **Beam Programming Guide** is intended for Beam users who want to use the Be * [Overview](#overview) * [Creating the Pipeline](#pipeline) + * [Configuring Pipeline Options](#options) * [Working with PCollections](#pcollection) * [Creating a PCollection](#pccreate) * [PCollection Characteristics](#pccharacteristics) - * [Element Type](#pcelementtype) - * [Immutability](#pcimmutability) - * [Random Access](#pcrandomaccess) - * [Size and Boundedness](#pcsizebound) - * [Element Timestamps](#pctimestamps) * [Applying Transforms](#transforms) * [Using ParDo](#transforms-pardo) * [Using GroupByKey](#transforms-gbk) @@ -42,7 +37,6 @@ The **Beam Programming Guide** is intended for Beam users who want to use the Be * [Additional Outputs](#transforms-outputs) * [Composite Transforms](#transforms-composite) * [Pipeline I/O](#io) -* [Running the Pipeline](#running) * [Data Encoding and Type Safety](#coders) * [Working with Windowing](#windowing) * [Working with Triggers](#triggers) @@ -77,30 +71,100 @@ The `Pipeline` abstraction encapsulates all the data and steps in your data proc To use Beam, your driver program must first create an instance of the Beam SDK class `Pipeline` (typically in the `main()` function). When you create your `Pipeline`, you'll also need to set some **configuration options**. You can set your pipeline's configuration options programatically, but it's often easier to set the options ahead of time (or read them from the command line) and pass them to the `Pipeline` object when you create the object. -The pipeline configuration options determine, among other things, the `PipelineRunner` that determines where the pipeline gets executed: locally, or using a distributed back-end of your choice. Depending on where your pipeline gets executed and what your specifed Runner requires, the options can also help you specify other aspects of execution. +```java +// Start by defining the options for the pipeline. +PipelineOptions options = PipelineOptionsFactory.create(); + +// Then create the pipeline. +Pipeline p = Pipeline.create(options); +``` + +```py +{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:pipelines_constructing_creating +%} +``` + +### <a name="options"></a>Configuring Pipeline Options + +Use the pipeline options to configure different aspects of your pipeline, such as the pipeline runner that will execute your pipeline and any runner-specific configuration required by the chosen runner. Your pipeline options will potentially include information such as your project ID or a location for storing files. + +When you run the pipeline on a runner of your choice, a copy of the PipelineOptions will be available to your code. For example, you can read PipelineOptions from a DoFn's Context. -To set your pipeline's configuration options and create the pipeline, create an object of type <span class="language-java">[PipelineOptions]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/options/PipelineOptions.html)</span><span class="language-py">[PipelineOptions](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/pipeline_options.py)</span> and pass it to `Pipeline.Create()`. The most common way to do this is by parsing arguments from the command-line: +#### Setting PipelineOptions from Command-Line Arguments + +While you can configure your pipeline by creating a `PipelineOptions` object and setting the fields directly, the Beam SDKs include a command-line parser that you can use to set fields in `PipelineOptions` using command-line arguments. + +To read options from the command-line, construct your `PipelineOptions` object as demonstrated in the following example code: ```java -public static void main(String[] args) { - // Will parse the arguments passed into the application and construct a PipelineOptions - // Note that --help will print registered options, and --help=PipelineOptionsClassName - // will print out usage for the specific class. - PipelineOptions options = - PipelineOptionsFactory.fromArgs(args).create(); +MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); +``` - Pipeline p = Pipeline.create(options); +```py +{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:pipelines_constructing_creating +%} +``` + +This interprets command-line arguments that follow the format: + +``` +--<option>=<value> +``` + +> **Note:** Appending the method `.withValidation` will check for required command-line arguments and validate argument values. + +Building your `PipelineOptions` this way lets you specify any of the options as a command-line argument. + +> **Note:** The [WordCount example pipeline]({{ site.baseurl }}/get-started/wordcount-example) demonstrates how to set pipeline options at runtime by using command-line options. + +#### Creating Custom Options + +You can add your own custom options in addition to the standard `PipelineOptions`. To add your own options, define an interface with getter and setter methods for each option, as in the following example: + +```java +public interface MyOptions extends PipelineOptions { + String getMyCustomOption(); + void setMyCustomOption(String myCustomOption); + } ``` ```py -# Will parse the arguments passed into the application and construct a PipelineOptions object. -# Note that --help will print registered options. +{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:pipeline_options_define_custom +%} +``` -{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:pipelines_constructing_creating +You can also specify a description, which appears when a user passes `--help` as a command-line argument, and a default value. + +You set the description and default value using annotations, as follows: + +```java +public interface MyOptions extends PipelineOptions { + @Description("My custom command line argument.") + @Default.String("DEFAULT") + String getMyCustomOption(); + void setMyCustomOption(String myCustomOption); + } +``` + +```py +{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:pipeline_options_define_custom_with_help_and_default %} ``` -The Beam SDKs contain various subclasses of `PipelineOptions` that correspond to different Runners. For example, `DirectPipelineOptions` contains options for the Direct (local) pipeline runner, while `DataflowPipelineOptions` contains options for using the runner for Google Cloud Dataflow. You can also define your own custom `PipelineOptions` by creating an interface that extends the Beam SDKs' `PipelineOptions` class. +{:.language-java} +It's recommended that you register your interface with `PipelineOptionsFactory` and then pass the interface when creating the `PipelineOptions` object. When you register your interface with `PipelineOptionsFactory`, the `--help` can find your custom options interface and add it to the output of the `--help` command. `PipelineOptionsFactory` will also validate that your custom options are compatible with all other registered options. + +{:.language-java} +The following example code shows how to register your custom options interface with `PipelineOptionsFactory`: + +```java +PipelineOptionsFactory.register(MyOptions.class); +MyOptions options = PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(MyOptions.class); +``` + +Now your pipeline can accept `--myCustomOption=value` as a command-line argument. ## <a name="pcollection"></a>Working with PCollections @@ -125,6 +189,7 @@ public static void main(String[] args) { PipelineOptionsFactory.fromArgs(args).create(); Pipeline p = Pipeline.create(options); + // Create the PCollection 'lines' by applying a 'Read' transform. PCollection<String> lines = p.apply( "ReadMyFile", TextIO.Read.from("protocol://path/to/some/inputData.txt")); } @@ -214,7 +279,9 @@ You can manually assign timestamps to the elements of a `PCollection` if the sou In the Beam SDKs, **transforms** are the operations in your pipeline. A transform takes a `PCollection` (or more than one `PCollection`) as input, performs an operation that you specify on each element in that collection, and produces a new output `PCollection`. To invoke a transform, you must **apply** it to the input `PCollection`. -In Beam SDK each transform has a generic `apply` method <span class="language-py">(or pipe operator `|`)</span>. Invoking multiple Beam transforms is similar to *method chaining*, but with one slight difference: You apply the transform to the input `PCollection`, passing the transform itself as an argument, and the operation returns the output `PCollection`. This takes the general form: +The Beam SDKs contain a number of different transforms that you can apply to your pipeline's `PCollection`s. These include general-purpose core transforms, such as [ParDo]({{ site.baseurl }}/documentation/programming-guide/#transforms-pardo) or [Combine]({{ site.baseurl }}/documentation/programming-guide/#transforms-combine). There are also pre-written [composite transforms]({{ site.baseurl }}/documentation/programming-guide/#transforms-composite) included in the SDKs, which combine one or more of the core transforms in a useful processing pattern, such as counting or combining elements in a collection. You can also define your own more complex composite transforms to fit your pipeline's exact use case. + +Each transform in the Beam SDKs has a generic `apply` method <span class="language-py">(or pipe operator `|`)</span>. Invoking multiple Beam transforms is similar to *method chaining*, but with one slight difference: You apply the transform to the input `PCollection`, passing the transform itself as an argument, and the operation returns the output `PCollection`. This takes the general form: ```java [Output PCollection] = [Input PCollection].apply([Transform]) @@ -1106,29 +1173,6 @@ records.apply("WriteToText", ### Beam-provided I/O Transforms See the [Beam-provided I/O Transforms]({{site.baseurl }}/documentation/io/built-in/) page for a list of the currently available I/O transforms. - -## <a name="running"></a>Running the pipeline - -To run your pipeline, use the `run` method. The program you create sends a specification for your pipeline to a pipeline runner, which then constructs and runs the actual series of pipeline operations. Pipelines are executed asynchronously by default. - -```java -pipeline.run(); -``` - -```py -pipeline.run() -``` - -For blocking execution, append the <span class="language-java">`waitUntilFinish`</span> <span class="language-py">`wait_until_finish`</span> method: - -```java -pipeline.run().waitUntilFinish(); -``` - -```py -pipeline.run().wait_until_finish() -``` - ## <a name="coders"></a>Data encoding and type safety When you create or output pipeline data, you'll need to specify how the elements in your `PCollection`s are encoded and decoded to and from byte strings. Byte strings are used for intermediate storage as well reading from sources and writing to sinks. The Beam SDKs use objects called coders to describe how the elements of a given `PCollection` should be encoded and decoded. @@ -1773,4 +1817,3 @@ You can also build other sorts of composite triggers. The following example code ```py # The Beam SDK for Python does not support triggers. ``` -