This is an automated email from the ASF dual-hosted git repository. mergebot-role pushed a commit to branch mergebot in repository https://gitbox.apache.org/repos/asf/beam-site.git
commit 515cd4ea04eb6f8f4179c036db6914c894fa1a78 Author: melissa <meliss...@google.com> AuthorDate: Tue Feb 20 14:18:14 2018 -0800 Explicitly define section id due to kramdown id generation changes --- src/documentation/programming-guide.md | 195 ++++++++++++++++----------------- 1 file changed, 95 insertions(+), 100 deletions(-) diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md index 7f6aea5..6b86743 100644 --- a/src/documentation/programming-guide.md +++ b/src/documentation/programming-guide.md @@ -26,12 +26,7 @@ how to implement Beam concepts in your pipelines. </ul> </nav> -**Table of Contents:** -* TOC -{:toc} - - -## 1. Overview +## 1. Overview {#overview} To use Beam, you need to first create a driver program using the classes in one of the Beam SDKs. Your driver program *defines* your pipeline, including all of @@ -94,7 +89,7 @@ objects you've created and transforms that you've applied. That graph is then executed using the appropriate distributed processing back-end, becoming an asynchronous "job" (or equivalent) on that back-end. -## 2. Creating a pipeline +## 2. Creating a pipeline {#creating-a-pipeline} The `Pipeline` abstraction encapsulates all the data and steps in your data processing task. Your Beam driver program typically starts by constructing a @@ -122,7 +117,7 @@ Pipeline p = Pipeline.create(options); %} ``` -### 2.1. Configuring pipeline options +### 2.1. Configuring pipeline options {#configuring-pipeline-options} Use the pipeline options to configure different aspects of your pipeline, such as the pipeline runner that will execute your pipeline and any runner-specific @@ -134,7 +129,7 @@ When you run the pipeline on a runner of your choice, a copy of the PipelineOptions will be available to your code. For example, you can read PipelineOptions from a DoFn's Context. -#### 2.1.1. Setting PipelineOptions from command-line arguments +#### 2.1.1. Setting PipelineOptions from command-line arguments {#pipeline-options-cli} While you can configure your pipeline by creating a `PipelineOptions` object and setting the fields directly, the Beam SDKs include a command-line parser that @@ -167,7 +162,7 @@ a command-line argument. > demonstrates how to set pipeline options at runtime by using command-line > options. -#### 2.1.2. Creating custom options +#### 2.1.2. Creating custom options {#creating-custom-options} You can add your own custom options in addition to the standard `PipelineOptions`. To add your own options, define an interface with getter and @@ -223,7 +218,7 @@ MyOptions options = PipelineOptionsFactory.fromArgs(args) Now your pipeline can accept `--myCustomOption=value` as a command-line argument. -## 3. PCollections +## 3. PCollections {#pcollections} The <span class="language-java">[PCollection]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/values/PCollection.html)</span> <span class="language-py">`PCollection`</span> abstraction represents a @@ -236,7 +231,7 @@ After you've created your `Pipeline`, you'll need to begin by creating at least one `PCollection` in some form. The `PCollection` you create serves as the input for the first operation in your pipeline. -### 3.1. Creating a PCollection +### 3.1. Creating a PCollection {#creating-a-pcollection} You create a `PCollection` by either reading data from an external source using Beam's [Source API](#pipeline-io), or you can create a `PCollection` of data @@ -246,7 +241,7 @@ contain adapters to help you read from external sources like large cloud-based files, databases, or subscription services. The latter is primarily useful for testing and debugging purposes. -#### 3.1.1. Reading from an external source +#### 3.1.1. Reading from an external source {#reading-external-source} To read from an external source, you use one of the [Beam-provided I/O adapters](#pipeline-io). The adapters vary in their exact usage, but all of them @@ -283,7 +278,7 @@ public static void main(String[] args) { See the [section on I/O](#pipeline-io) to learn more about how to read from the various data sources supported by the Beam SDK. -#### 3.1.2. Creating a PCollection from in-memory data +#### 3.1.2. Creating a PCollection from in-memory data {#creating-pcollection-in-memory} {:.language-java} To create a `PCollection` from an in-memory Java `Collection`, you use the @@ -326,14 +321,14 @@ public static void main(String[] args) { %} ``` -### 3.2. PCollection characteristics +### 3.2. PCollection characteristics {#pcollection-characteristics} A `PCollection` is owned by the specific `Pipeline` object for which it is created; multiple pipelines cannot share a `PCollection`. In some respects, a `PCollection` functions like a collection class. However, a `PCollection` can differ in a few key ways: -#### 3.2.1. Element type +#### 3.2.1. Element type {#element-type} The elements of a `PCollection` may be of any type, but must all be of the same type. However, to support distributed processing, Beam needs to be able to @@ -342,19 +337,19 @@ around to distributed workers). The Beam SDKs provide a data encoding mechanism that includes built-in encoding for commonly-used types as well as support for specifying custom encodings as needed. -#### 3.2.2. Immutability +#### 3.2.2. Immutability {#immutability} A `PCollection` is immutable. Once created, you cannot add, remove, or change individual elements. A Beam Transform might process each element of a `PCollection` and generate new pipeline data (as a new `PCollection`), *but it does not consume or modify the original input collection*. -#### 3.2.3. Random access +#### 3.2.3. Random access {#random-access} A `PCollection` does not support random access to individual elements. Instead, Beam Transforms consider every element in a `PCollection` individually. -#### 3.2.4. Size and boundedness +#### 3.2.4. Size and boundedness {#size-and-boundedness} A `PCollection` is a large, immutable "bag" of elements. There is no upper limit on how many elements a `PCollection` can contain; any given `PCollection` might @@ -385,7 +380,7 @@ on a per-window basis — as the data set is generated, they process each `PCollection` as a succession of these finite windows. -#### 3.2.5. Element timestamps +#### 3.2.5. Element timestamps {#element-timestamps} Each element in a `PCollection` has an associated intrinsic **timestamp**. The timestamp for each element is initially assigned by the [Source](#pipeline-io) @@ -407,11 +402,11 @@ source doesn't do it for you. You'll want to do this if the elements have an inherent timestamp, but the timestamp is somewhere in the structure of the element itself (such as a "time" field in a server log entry). Beam has [Transforms](#transforms) that take a `PCollection` as input and output an -identical `PCollection` with timestamps attached; see [Assigning +identical `PCollection` with timestamps attached; see [Adding Timestamps](#adding-timestamps-to-a-pcollections-elements) for more information about how to do so. -## 4. Transforms +## 4. Transforms {#transforms} Transforms are the operations in your pipeline, and provide a generic processing framework. You provide processing logic in the form of a function @@ -430,7 +425,7 @@ combine one or more of the core transforms in a useful processing pattern, such as counting or combining elements in a collection. You can also define your own more complex composite transforms to fit your pipeline's exact use case. -### 4.1. Applying transforms +### 4.1. Applying transforms {#applying-transforms} To invoke a transform, you must **apply** it to the input `PCollection`. Each transform in the Beam SDKs has a generic `apply` method <span class="language-py">(or pipe operator `|`)</span>. @@ -503,7 +498,7 @@ nest multiple sub-steps inside a single, larger transform. Composite transforms are particularly useful for building a reusable sequence of simple steps that get used in a lot of different places. -### 4.2. Core Beam transforms +### 4.2. Core Beam transforms {#core-beam-transforms} Beam provides the following core transforms, each of which represents a different processing paradigm: @@ -515,7 +510,7 @@ processing paradigm: * `Flatten` * `Partition` -#### 4.2.1. ParDo +#### 4.2.1. ParDo {#pardo} `ParDo` is a Beam transform for generic parallel processing. The `ParDo` processing paradigm is similar to the "Map" phase of a Map/Shuffle/Reduce-style @@ -553,7 +548,7 @@ processing function. > When you create a subclass of `DoFn`, note that your subclass should adhere > to > the [Requirements for writing user code for Beam > transforms](#requirements-for-writing-user-code-for-beam-transforms). -##### 4.2.1.1. Applying ParDo +##### 4.2.1.1. Applying ParDo {#applying-pardo} Like all Beam transforms, you apply `ParDo` by calling the `apply` method on the input `PCollection` and passing `ParDo` as an argument, as shown in the @@ -666,7 +661,7 @@ following requirements: * Once you output a value using `ProcessContext.output()` or `ProcessContext.sideOutput()`, you should not modify that value in any way. -##### 4.2.1.3. Lightweight DoFns and other abstractions +##### 4.2.1.3. Lightweight DoFns and other abstractions {#lightweight-dofns} If your function is relatively straightforward, you can simplify your use of `ParDo` by providing a lightweight `DoFn` in-line, as @@ -737,7 +732,7 @@ words = ... > **Note:** You can use Java 8 lambda functions with several other Beam > transforms, including `Filter`, `FlatMapElements`, and `Partition`. -#### 4.2.2. GroupByKey +#### 4.2.2. GroupByKey {#groupbykey} `GroupByKey` is a Beam transform for processing collections of key/value pairs. It's a parallel reduction operation, analogous to the Shuffle phase of a @@ -794,7 +789,7 @@ tree, [2] Thus, `GroupByKey` represents a transform from a multimap (multiple keys to individual values) to a uni-map (unique keys to collections of values). -##### 4.2.2.1 GroupByKey and unbounded PCollections +##### 4.2.2.1 GroupByKey and unbounded PCollections {#groupbykey-and-unbounded-pcollections} If you are using unbounded `PCollection`s, you must use either [non-global windowing](#setting-your-pcollections-windowing-function) or an @@ -820,7 +815,7 @@ If your pipeline attempts to use `GroupByKey` or `CoGroupByKey` to merge `PCollection`s with incompatible windows, Beam generates an IllegalStateException error at pipeline construction time. -#### 4.2.3. CoGroupByKey +#### 4.2.3. CoGroupByKey {#cogroupbykey} `CoGroupByKey` performs a relational join of two or more key/value `PCollection`s that have the same key type. @@ -911,7 +906,7 @@ The formatted data looks like this: {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_group_by_key_cogroupbykey_tuple_formatted_outputs %}``` -#### 4.2.4. Combine +#### 4.2.4. Combine {#combine} <span class="language-java">[`Combine`]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/transforms/Combine.html)</span> <span class="language-py">[`Combine`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py)</span> @@ -934,7 +929,7 @@ function. More complex combination operations might require you to create a subclass of `CombineFn` that has an accumulation type distinct from the input/output type. -##### 4.2.4.1. Simple combinations using simple functions +##### 4.2.4.1. Simple combinations using simple functions {#simple-combines} The following example code shows a simple combine function. @@ -956,7 +951,7 @@ public static class SumInts implements SerializableFunction<Iterable<Integer>, I {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:combine_bounded_sum %}``` -##### 4.2.4.2. Advanced combinations using CombineFn +##### 4.2.4.2. Advanced combinations using CombineFn {#advanced-combines} For more complex combine functions, you can define a subclass of `CombineFn`. You should use `CombineFn` if the combine function requires a more sophisticated @@ -1034,7 +1029,7 @@ you need the combining strategy to change based on the key (for example, MIN for some users and MAX for other users), you can define a `KeyedCombineFn` to access the key within the combining strategy. -##### 4.2.4.3. Combining a PCollection into a single value +##### 4.2.4.3. Combining a PCollection into a single value {#combining-pcollection} Use the global combine to transform all of the elements in a given `PCollection` into a single value, represented in your pipeline as a new `PCollection` @@ -1057,7 +1052,7 @@ pc = ... {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:combine_custom_average_execute %}``` -##### 4.2.4.4. Combine and global windowing +##### 4.2.4.4. Combine and global windowing {#combine-global-windowing} If your input `PCollection` uses the default global windowing, the default behavior is to return a `PCollection` containing one item. That item's value @@ -1080,7 +1075,7 @@ pc = ... sum = pc | beam.CombineGlobally(sum).without_defaults() ``` -##### 4.2.4.5. Combine and non-global windowing +##### 4.2.4.5. Combine and non-global windowing {#combine-non-global-windowing} If your `PCollection` uses any non-global windowing function, Beam does not provide the default behavior. You must specify one of the following options when @@ -1094,7 +1089,7 @@ applying `Combine`: the result of your pipeline's `Combine` is to be used as a side input later in the pipeline. -##### 4.2.4.6. Combining values in a keyed PCollection +##### 4.2.4.6. Combining values in a keyed PCollection {#combining-values-in-a-keyed-pcollection} After creating a keyed PCollection (for example, by using a `GroupByKey` transform), a common pattern is to combine the collection of values associated @@ -1140,7 +1135,7 @@ player_accuracies = ... %} ``` -#### 4.2.5. Flatten +#### 4.2.5. Flatten {#flatten} <span class="language-java">[`Flatten`]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/transforms/Flatten.html)</span> <span class="language-py">[`Flatten`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py)</span> and @@ -1170,14 +1165,14 @@ github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets %} ``` -##### 4.2.5.1. Data encoding in merged collections +##### 4.2.5.1. Data encoding in merged collections {#data-encoding-merged-collections} By default, the coder for the output `PCollection` is the same as the coder for the first `PCollection` in the input `PCollectionList`. However, the input `PCollection` objects can each use different coders, as long as they all contain the same data type in your chosen language. -##### 4.2.5.2. Merging windowed collections +##### 4.2.5.2. Merging windowed collections {#merging-windowed-collections} When using `Flatten` to merge `PCollection` objects that have a windowing strategy applied, all of the `PCollection` objects you want to merge must use a @@ -1189,7 +1184,7 @@ If your pipeline attempts to use `Flatten` to merge `PCollection` objects with incompatible windows, Beam generates an `IllegalStateException` error when your pipeline is constructed. -#### 4.2.6. Partition +#### 4.2.6. Partition {#partition} <span class="language-java">[`Partition`]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/transforms/Partition.html)</span> <span class="language-py">[`Partition`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py)</span> @@ -1237,7 +1232,7 @@ students = ... %} ``` -### 4.3. Requirements for writing user code for Beam transforms +### 4.3. Requirements for writing user code for Beam transforms {#requirements-for-writing-user-code-for-beam-transforms} When you build user code for a Beam transform, you should keep in mind the distributed nature of execution. For example, there might be many copies of your @@ -1261,7 +1256,7 @@ In addition, it's recommended that you make your function object **idempotent**. > with the [Combine](#combine) transform), and `WindowFn` (a function object > used with the [Window](#windowing) transform). -#### 4.3.1. Serializability +#### 4.3.1. Serializability {#user-code-serializability} Any function object you provide to a transform must be **fully serializable**. This is because a copy of the function needs to be serialized and transmitted to @@ -1282,7 +1277,7 @@ Some other serializability factors you should keep in mind are: That enclosing class will also be serialized, and thus the same considerations that apply to the function object itself also apply to this outer class. -#### 4.3.2. Thread-compatibility +#### 4.3.2. Thread-compatibility {#user-code-thread-compatibility} Your function object should be thread-compatible. Each instance of your function object is accessed by a single thread on a worker instance, unless you @@ -1292,7 +1287,7 @@ provide your own synchronization. Note that static members in your function object are not passed to worker instances and that multiple instances of your function may be accessed from different threads. -#### 4.3.3. Idempotence +#### 4.3.3. Idempotence {#user-code-idempotence} It's recommended that you make your function object idempotent--that is, that it can be repeated or retried as often as necessary without causing unintended side @@ -1301,7 +1296,7 @@ user code might be invoked or retried; as such, keeping your function object idempotent keeps your pipeline's output deterministic, and your transforms' behavior more predictable and easier to debug. -### 4.4. Side inputs +### 4.4. Side inputs {#side-inputs} In addition to the main input `PCollection`, you can provide additional inputs to a `ParDo` transform in the form of side inputs. A side input is an additional @@ -1316,7 +1311,7 @@ needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline. -#### 4.4.1. Passing side inputs to ParDo +#### 4.4.1. Passing side inputs to ParDo {#side-inputs-pardo} ```java // Pass side inputs to your ParDo transform by invoking .withSideInputs. @@ -1366,7 +1361,7 @@ words = ... ... ``` -#### 4.4.2. Side inputs and windowing +#### 4.4.2. Side inputs and windowing {#side-inputs-windowing} A windowed `PCollection` may be infinite and thus cannot be compressed into a single value (or single collection class). When you create a `PCollectionView` @@ -1395,7 +1390,7 @@ If the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger. -### 4.5. Additional outputs +### 4.5. Additional outputs {#additional-outputs} While `ParDo` always produces a main output `PCollection` (as the return value from `apply`), you can also have your `ParDo` produce any number of additional @@ -1403,7 +1398,7 @@ output `PCollection`s. If you choose to have multiple outputs, your `ParDo` returns all of the output `PCollection`s (including the main output) bundled together. -#### 4.5.1. Tags for multiple outputs +#### 4.5.1. Tags for multiple outputs {#output-tags} ```java // To emit elements to multiple output PCollections, create a TupleTag object to identify each collection @@ -1468,7 +1463,7 @@ together. {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_with_tagged_outputs_iter %}``` -#### 4.5.2. Emitting to multiple outputs in your DoFn +#### 4.5.2. Emitting to multiple outputs in your DoFn {#multiple-outputs-dofn} ```java // Inside your ParDo's DoFn, you can emit an element to a specific output PCollection by passing in the @@ -1508,7 +1503,7 @@ together. {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_with_undeclared_outputs %}``` -### 4.6. Composite transforms +### 4.6. Composite transforms {#composite-transforms} Transforms can have a nested structure, where a complex transform performs multiple simpler transforms (such as more than one `ParDo`, `Combine`, @@ -1521,7 +1516,7 @@ reference pages for a list of transforms: * [Pre-written Beam transforms for Java]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/transforms/package-summary.html) * [Pre-written Beam transforms for Python]({{ site.baseurl }}/documentation/sdks/pydoc/{{ site.release_latest }}/apache_beam.transforms.html) -#### 4.6.1. An example composite transform +#### 4.6.1. An example composite transform {#composite-transform-example} The `CountWords` transform in the [WordCount example program]({{ site.baseurl }}/get-started/wordcount-example/) is an example of a composite transform. `CountWords` is a `PTransform` subclass @@ -1567,7 +1562,7 @@ transform's intermediate data changes type multiple times. {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:pipeline_monitoring_composite %}``` -#### 4.6.2. Creating a composite transform +#### 4.6.2. Creating a composite transform {#composite-transform-creation} To create your own composite transform, create a subclass of the `PTransform` class and override the `expand` method to specify the actual processing logic. @@ -1629,14 +1624,14 @@ directly by the user of a transform. Instead, you should call the `apply` method on the `PCollection` itself, with the transform as an argument. This allows transforms to be nested within the structure of your pipeline. -#### 4.6.3. PTransform Style Guide +#### 4.6.3. PTransform Style Guide {#ptransform-style-guide} The [PTransform Style Guide]({{ site.baseurl }}/contribute/ptransform-style-guide/) contains additional information not included here, such as style guidelines, logging and testing guidance, and language-specific considerations. The guide is a useful starting point when you want to write new composite PTransforms. -## 5. Pipeline I/O +## 5. Pipeline I/O {#pipeline-io} When you create a pipeline, you often need to read data from some external source, such as a file or a database. Likewise, you may @@ -1647,7 +1642,7 @@ to read from or write to a data storage format that isn't supported by the built-in transforms, you can [implement your own read and write transforms]({{site.baseurl }}/documentation/io/io-toc/). -### 5.1. Reading input data +### 5.1. Reading input data {#pipeline-io-reading-data} Read transforms read data from an external source and return a `PCollection` representation of the data for use by your pipeline. You can use a read @@ -1662,7 +1657,7 @@ PCollection<String> lines = p.apply(TextIO.read().from("gs://some/inputData.txt" lines = pipeline | beam.io.ReadFromText('gs://some/inputData.txt') ``` -### 5.2. Writing output data +### 5.2. Writing output data {#pipeline-io-writing-data} Write transforms write the data in a `PCollection` to an external data source. You will most often use write transforms at the end of your pipeline to output @@ -1677,9 +1672,9 @@ output.apply(TextIO.write().to("gs://some/outputData")); output | beam.io.WriteToText('gs://some/outputData') ``` -### 5.3. File-based input and output data +### 5.3. File-based input and output data {#file-based-data} -#### 5.3.1. Reading from multiple locations +#### 5.3.1. Reading from multiple locations {#file-based-reading-multiple-locations} Many read transforms support reading from multiple input files matching a glob operator you provide. Note that glob operators are filesystem-specific and obey @@ -1701,7 +1696,7 @@ To read data from disparate sources into a single `PCollection`, read each one independently and then use the [Flatten](#flatten) transform to create a single `PCollection`. -#### 5.3.2. Writing to multiple output files +#### 5.3.2. Writing to multiple output files {#file-based-writing-multiple-files} For file-based output data, write transforms write to multiple output files by default. When you pass an output file name to a write transform, the file name @@ -1723,12 +1718,12 @@ records.apply("WriteToText", %} ``` -### 5.4. Beam-provided I/O transforms +### 5.4. Beam-provided I/O transforms {#provided-io-transforms} See the [Beam-provided I/O Transforms]({{site.baseurl }}/documentation/io/built-in/) page for a list of the currently available I/O transforms. -## 6. Data encoding and type safety +## 6. Data encoding and type safety {#data-encoding-and-type-safety} When Beam runners execute your pipeline, they often need to materialize the intermediate data in your `PCollection`s, which requires converting elements to @@ -1762,7 +1757,7 @@ package. > input data that uses BigEndianIntegerCoder, and Integer-typed output data > that > uses VarIntCoder. -### 6.1. Specifying coders +### 6.1. Specifying coders {#specifying-coders} The Beam SDKs require a coder for every `PCollection` in your pipeline. In most cases, the Beam SDK is able to automatically infer a `Coder` for a `PCollection` @@ -1823,7 +1818,7 @@ Python will automatically infer the default `Coder` for the output `PCollection` When using `Create`, the simplest way to ensure that you have the correct coder is by invoking `withCoder` when you apply the `Create` transform. -### 6.2. Default coders and the CoderRegistry +### 6.2. Default coders and the CoderRegistry {#default-coders-and-the-coderregistry} Each Pipeline object has a `CoderRegistry` object, which maps language types to the default coder the pipeline should use for those types. You can use the @@ -1932,7 +1927,7 @@ The following table shows the standard mapping: </tbody> </table> -#### 6.2.1. Looking up a default coder +#### 6.2.1. Looking up a default coder {#default-coder-lookup} {:.language-java} You can use the method `CoderRegistry.getDefaultCoder` to determine the default @@ -1947,7 +1942,7 @@ You can use the method `CoderRegistry.get_coder` to determine the default Coder for a Python type. You can use `coders.registry` to access the `CoderRegistry`. This allows you to determine (or set) the default Coder for a Python type. -#### 6.2.2. Setting the default coder for a type +#### 6.2.2. Setting the default coder for a type {#setting-default-coder} To set the default Coder for a <span class="language-java">Java</span><span class="language-py">Python</span> @@ -1977,7 +1972,7 @@ cr.registerCoder(Integer.class, BigEndianIntegerCoder.class); apache_beam.coders.registry.register_coder(int, BigEndianIntegerCoder) ``` -#### 6.2.3. Annotating a custom data type with a default coder +#### 6.2.3. Annotating a custom data type with a default coder {#annotating-custom-type-default-coder} {:.language-java} If your pipeline program defines a custom data type, you can use the @@ -2014,7 +2009,7 @@ The Beam SDK for Python does not support annotating data types with a default coder. If you would like to set a default coder, use the method described in the previous section, *Setting the default coder for a type*. -## 7. Windowing +## 7. Windowing {#windowing} Windowing subdivides a `PCollection` according to the timestamps of its individual elements. Transforms that aggregate multiple elements, such as @@ -2028,7 +2023,7 @@ windowing strategy for your `PCollection`. Triggers allow you to deal with late-arriving data or to provide early results. See the [triggers](#triggers) section for more information. -### 7.1. Windowing basics +### 7.1. Windowing basics {#windowing-basics} Some Beam transforms, such as `GroupByKey` and `Combine`, group multiple elements by a common key. Ordinarily, that grouping operation groups all of the @@ -2061,7 +2056,7 @@ your unbounded `PCollection` and subsequently use a grouping transform such as `GroupByKey` or `Combine`, your pipeline will generate an error upon construction and your job will fail. -#### 7.1.1. Windowing constraints +#### 7.1.1. Windowing constraints {#windowing-constraints} After you set the windowing function for a `PCollection`, the elements' windows are used the next time you apply a grouping transform to that `PCollection`. @@ -2084,7 +2079,7 @@ windows are not actually used until they're needed for the `GroupByKey`. Subsequent transforms, however, are applied to the result of the `GroupByKey` -- data is grouped by both key and window. -#### 7.1.2. Windowing with bounded PCollections +#### 7.1.2. Windowing with bounded PCollections {#windowing-bounded-collections} You can use windowing with fixed-size data sets in **bounded** `PCollection`s. However, note that windowing considers only the implicit timestamps attached to @@ -2127,7 +2122,7 @@ for that `PCollection`. The `GroupByKey` transform groups the elements of the subsequent `ParDo` transform gets applied multiple times per key, once for each window. -### 7.2. Provided windowing functions +### 7.2. Provided windowing functions {#provided-windowing-functions} You can define different kinds of windows to divide the elements of your `PCollection`. Beam provides several windowing functions, including: @@ -2146,7 +2141,7 @@ overlapping windows wherein a single element can be assigned to multiple windows. -#### 7.2.1. Fixed time windows +#### 7.2.1. Fixed time windows {#fixed-time-windows} The simplest form of windowing is using **fixed time windows**: given a timestamped `PCollection` which might be continuously updating, each window @@ -2164,7 +2159,7 @@ the second window, and so on. **Figure:** Fixed time windows, 30s in duration. -#### 7.2.2. Sliding time windows +#### 7.2.2. Sliding time windows {#sliding-time-windows} A **sliding time window** also represents time intervals in the data stream; however, sliding time windows can overlap. For example, each window might @@ -2184,7 +2179,7 @@ example. **Figure:** Sliding time windows, with 1 minute window duration and 30s window period. -#### 7.2.3. Session windows +#### 7.2.3. Session windows {#session-windows} A **session window** function defines windows that contain elements that are within a certain gap duration of another element. Session windowing applies on a @@ -2199,7 +2194,7 @@ the start of a new window. **Figure:** Session windows, with a minimum gap duration. Note how each data key has different windows, according to its data distribution. -#### 7.2.4. The single global window +#### 7.2.4. The single global window {#single-global-window} By default, all data in a `PCollection` is assigned to the single global window, and late data is discarded. If your data set is of a fixed size, you can use the @@ -2213,7 +2208,7 @@ processing, which is not possible with continuously updating data. To perform aggregations on an unbounded `PCollection` that uses global windowing, you should specify a non-default trigger for that `PCollection`. -### 7.3. Setting your PCollection's windowing function +### 7.3. Setting your PCollection's windowing function {#setting-your-pcollections-windowing-function} You can set the windowing function for a `PCollection` by applying the `Window` transform. When you apply the `Window` transform, you must provide a `WindowFn`. @@ -2226,7 +2221,7 @@ and emitted, and helps refine how the windowing function performs with respect to late data and computing early results. See the [triggers](#triggers) section for more information. -#### 7.3.1. Fixed-time windows +#### 7.3.1. Fixed-time windows {#using-fixed-time-windows} The following example code shows how to apply `Window` to divide a `PCollection` into fixed windows, each one minute in length: @@ -2241,7 +2236,7 @@ into fixed windows, each one minute in length: %} ``` -#### 7.3.2. Sliding time windows +#### 7.3.2. Sliding time windows {#using-sliding-time-windows} The following example code shows how to apply `Window` to divide a `PCollection` into sliding time windows. Each window is 30 minutes in length, and a new window @@ -2257,7 +2252,7 @@ begins every five seconds: %} ``` -#### 7.3.3. Session windows +#### 7.3.3. Session windows {#using-session-windows} The following example code shows how to apply `Window` to divide a `PCollection` into session windows, where each session must be separated by a time gap of at @@ -2276,7 +2271,7 @@ least 10 minutes: Note that the sessions are per-key — each key in the collection will have its own session groupings depending on the data distribution. -#### 7.3.4. Single global window +#### 7.3.4. Single global window {#using-single-global-window} If your `PCollection` is bounded (the size is fixed), you can assign all the elements to a single global window. The following example code shows how to set @@ -2292,7 +2287,7 @@ a single global window for a `PCollection`: %} ``` -### 7.4. Watermarks and late data +### 7.4. Watermarks and late data {#watermarks-and-late-data} In any data processing system, there is a certain amount of lag between the time a data event occurs (the "event time", determined by the timestamp on the data @@ -2332,7 +2327,7 @@ a `PCollection`. You can use triggers to decide when each individual window aggregates and reports its results, including how the window emits late elements. -#### 7.4.1. Managing late data +#### 7.4.1. Managing late data {#managing-late-data} > **Note:** Managing late data is not supported in the Beam SDK for Python. @@ -2354,7 +2349,7 @@ propagates forward to any subsequent `PCollection` derived from the first lateness later in your pipeline, you must do so explictly by applying `Window.configure().withAllowedLateness()`. -### 7.5. Adding timestamps to a PCollection's elements +### 7.5. Adding timestamps to a PCollection's elements {#adding-timestamps-to-a-pcollections-elements} An unbounded source provides a timestamp for each element. Depending on your unbounded source, you may need to configure how the timestamp is extracted from @@ -2392,7 +2387,7 @@ with a `DoFn` to attach the timestamps to each element in your `PCollection`. %} ``` -## 8. Triggers +## 8. Triggers {#triggers} > **NOTE:** This content applies only to the Beam SDK for Java. The Beam SDK > for > Python does not support triggers. @@ -2400,7 +2395,7 @@ with a `DoFn` to attach the timestamps to each element in your `PCollection`. When collecting and grouping data into windows, Beam uses **triggers** to determine when to emit the aggregated results of each window (referred to as a *pane*). If you use Beam's default windowing configuration and [default -trigger](#the-default-trigger), Beam outputs the aggregated result when it +trigger](#default-trigger), Beam outputs the aggregated result when it [estimates all data has arrived](#watermarks-and-late-data), and discards all subsequent data for that window. @@ -2451,7 +2446,7 @@ you want your pipeline to provide periodic updates on an unbounded data set — for example, a running average of all data provided to the present time, updated every N seconds or every N elements. -### 8.1. Event time triggers +### 8.1. Event time triggers {#event-time-triggers} The `AfterWatermark` trigger operates on *event time*. The `AfterWatermark` trigger emits the contents of a window after the @@ -2483,7 +2478,7 @@ firings: # The Beam SDK for Python does not support triggers. ``` -#### 8.1.1. The default trigger +#### 8.1.1. Default trigger {#default-trigger} The default trigger for a `PCollection` is based on event time, and emits the results of the window when the Beam's watermark passes the end of the window, @@ -2495,7 +2490,7 @@ discarded. This is because the default windowing configuration has an allowed lateness value of 0. See the Handling Late Data section for information about modifying this behavior. -### 8.2. Processing time triggers +### 8.2. Processing time triggers {#processing-time-triggers} The `AfterProcessingTime` trigger operates on *processing time*. For example, the `AfterProcessingTime.pastFirstElementInPane() ` trigger emits a window after @@ -2507,7 +2502,7 @@ The `AfterProcessingTime` trigger is useful for triggering early results from a window, particularly a window with a large time frame such as a single global window. -### 8.3. Data-driven triggers +### 8.3. Data-driven triggers {#data-driven-triggers} Beam provides one data-driven trigger, `AfterPane.elementCountAtLeast()`. This trigger works on an element count; it fires after the current pane has collected @@ -2522,7 +2517,7 @@ triggers](#composite-triggers) to combine multiple conditions. This allows you to specify multiple firing conditions such as “fire either when I receive 50 elements, or every 1 second”. -### 8.4. Setting a trigger +### 8.4. Setting a trigger {#setting-a-trigger} When you set a windowing function for a `PCollection` by using the `Window` transform, you can also specify a trigger. @@ -2546,7 +2541,7 @@ results one minute after the first element in that window has been processed. The last line in the code sample, `.discardingFiredPanes()`, is the window's **accumulation mode**. -#### 8.4.1. Window accumulation modes +#### 8.4.1. Window accumulation modes {#window-accumulation-modes} When you specify a trigger, you must also set the the window's **accumulation mode**. When a trigger fires, it emits the current contents of the window as a @@ -2574,7 +2569,7 @@ we'll assume that the events all arrive in the pipeline in order. ![Diagram of data events for acculumating mode example]({{ "/images/trigger-accumulation.png" | prepend: site.baseurl }} "Data events for accumulating mode example") -##### 8.4.1.1. Accumulating mode +##### 8.4.1.1. Accumulating mode {#accumulating-mode} If our trigger is set to `.accumulatingFiredPanes`, the trigger emits the following values each time it fires. Keep in mind that the trigger fires every @@ -2587,7 +2582,7 @@ time three elements arrive: ``` -##### 8.4.1.2. Discarding mode +##### 8.4.1.2. Discarding mode {#discarding-mode} If our trigger is set to `.discardingFiredPanes`, the trigger emits the following values on each firing: @@ -2598,7 +2593,7 @@ following values on each firing: Third trigger firing: [9, 13, 10] ``` -#### 8.4.2. Handling late data +#### 8.4.2. Handling late data {#handling-late-data} If you want your pipeline to process data that arrives after the watermark passes the end of the window, you can apply an *allowed lateness* when you set @@ -2626,13 +2621,13 @@ allowed lateness later in your pipeline, you can apply `Window.configure().withAllowedLateness()` again, explicitly. -### 8.5. Composite triggers +### 8.5. Composite triggers {#composite-triggers} You can combine multiple triggers to form **composite triggers**, and can specify a trigger to emit results repeatedly, at most once, or under other custom conditions. -#### 8.5.1. Composite trigger types +#### 8.5.1. Composite trigger types {#composite-trigger-types} Beam includes the following composite triggers: @@ -2656,7 +2651,7 @@ Beam includes the following composite triggers: * `orFinally` can serve as a final condition to cause any trigger to fire one final time and never fire again. -#### 8.5.2. Composition with AfterWatermark.pastEndOfWindow +#### 8.5.2. Composition with AfterWatermark.pastEndOfWindow {#afterwatermark-pastendofwindow} Some of the most useful composite triggers fire a single time when Beam estimates that all the data has arrived (i.e. when the watermark passes the end @@ -2690,7 +2685,7 @@ example, the following example trigger code fires on the following conditions: # The Beam SDK for Python does not support triggers. ``` -#### 8.5.3. Other composite triggers +#### 8.5.3. Other composite triggers {#other-composite-triggers} You can also build other sorts of composite triggers. The following example code shows a simple composite trigger that fires whenever the pane has at least 100 -- To stop receiving notification emails like this one, please contact mergebot-r...@apache.org.