This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 39613ea [BEAM-11758] Update basics page: Aggregation, Runner, UDF, Schema (#15763) 39613ea is described below commit 39613ea89fee6ae888095fcb5ac9ca5fd6d687f9 Author: Melissa Pashniak <meliss...@google.com> AuthorDate: Mon Oct 25 11:26:28 2021 -0700 [BEAM-11758] Update basics page: Aggregation, Runner, UDF, Schema (#15763) * [BEAM-11758] Update basics page: Aggregation, Runner, UDF, Schema * Address most review feedback * Fix type in intro list * Address remaining two feedback comments --- .../www/site/content/en/documentation/basics.md | 205 ++++++++++++++++----- .../content/en/documentation/programming-guide.md | 17 ++ website/www/site/static/images/aggregation.png | Bin 0 -> 14065 bytes 3 files changed, 171 insertions(+), 51 deletions(-) diff --git a/website/www/site/content/en/documentation/basics.md b/website/www/site/content/en/documentation/basics.md index bf7f20d..cb79e5d 100644 --- a/website/www/site/content/en/documentation/basics.md +++ b/website/www/site/content/en/documentation/basics.md @@ -17,10 +17,9 @@ limitations under the License. # Basics of the Beam model -Suppose you have a data processing engine that can pretty easily process graphs -of operations. You want to integrate it with the Beam ecosystem to get access -to other languages, great event time processing, and a library of connectors. -You need to know the core vocabulary: +Apache Beam is a unified model for defining both batch and streaming +data-parallel processing pipelines. To get started with Beam, you'll need to +understand an important set of core concepts: * [_Pipeline_](#pipeline) - A pipeline is a user-constructed graph of transformations that defines the desired data processing operations. @@ -30,16 +29,22 @@ You need to know the core vocabulary: data processing operation, or a step, in your pipeline. A transform is applied to zero or more `PCollection` objects, and produces zero or more `PCollection` objects. - * _SDK_ - A language-specific library for pipeline authors (we often call them - "users" even though we have many kinds of users) to build transforms, - construct their pipelines and submit them to a runner - * _Runner_ - You are going to write a piece of software called a runner that - takes a Beam pipeline and executes it using the capabilities of your data - processing engine. - -These concepts may be very similar to your processing engine's concepts. Since -Beam's design is for cross-language operation and reusable libraries of -transforms, there are some special features worth highlighting. + * [_Aggregation_](#aggregation) - Aggregation is computing a value from + multiple (1 or more) input elements. + * [_User-defined function (UDF)_](#user-defined-function-udf) - Some Beam + operations allow you to run user-defined code as a way to configure the + transform. + * [_Schema_](#schema) - A schema is a language-independent type definition for + a `PCollection`. The schema for a `PCollection` defines elements of that + `PCollection` as an ordered list of named fields. + * [_SDK_](/documentation/sdks/java/) - A language-specific library that lets + pipeline authors build transforms, construct their pipelines, and submit + them to a runner. + * [_Runner_](#runner) - A runner runs a Beam pipeline using the capabilities of + your chosen data processing engine. + +The following sections cover these concepts in more detail and provide links to +additional documentation. ### Pipeline @@ -215,45 +220,143 @@ For more information about PCollections, see the following page: * [Beam Programming Guide: PCollections](/documentation/programming-guide/#pcollections) -### User-Defined Functions (UDFs) - -Beam has seven varieties of user-defined function (UDF). A Beam pipeline -may contain UDFs written in a language other than your runner, or even multiple -languages in the same pipeline (see the [Runner API](#the-runner-api)) so the -definitions are language-independent (see the [Fn API](#the-fn-api)). - -The UDFs of Beam are: - - * _DoFn_ - per-element processing function (used in ParDo) - * _WindowFn_ - places elements in windows and merges windows (used in Window - and GroupByKey) - * _Source_ - emits data read from external sources, including initial and - dynamic splitting for parallelism (used in Read) - * _ViewFn_ - adapts a materialized PCollection to a particular interface (used - in side inputs) - * _WindowMappingFn_ - maps one element's window to another, and specifies - bounds on how far in the past the result window will be (used in side - inputs) - * _CombineFn_ - associative and commutative aggregation (used in Combine and - state) - * _Coder_ - encodes user data; some coders have standard formats and are not really UDFs - -The various types of user-defined functions will be described further alongside -the [_PTransforms_](#ptransforms) that use them. +### Aggregation + +Aggregation is computing a value from multiple (1 or more) input elements. In +Beam, the primary computational pattern for aggregation is to group all elements +with a common key and window then combine each group of elements using an +associative and commutative operation. This is similar to the "Reduce" operation +in the [MapReduce](https://en.wikipedia.org/wiki/MapReduce) model, though it is +enhanced to work with unbounded input streams as well as bounded data sets. + +<img src="/images/aggregation.png" alt="Aggregation of elements." width="120px"> + +*Figure 1: Aggregation of elements. Elements with the same color represent those +with a common key and window.* + +Some simple aggregation transforms include `Count` (computes the count of all +elements in the aggregation), `Max` (computes the maximum element in the +aggregation), and `Sum` (computes the sum of all elements in the aggregation). + +When elements are grouped and emitted as a bag, the aggregation is known as +`GroupByKey` (the associative/commutative operation is bag union). In this case, +the output is no smaller than the input. Often, you will apply an operation such +as summation, called a `CombineFn`, in which the output is significantly smaller +than the input. In this case the aggregation is called `CombinePerKey`. + +In a real application, you might have millions of keys and/or windows; that is +why this is still an "embarassingly parallel" computational pattern. In those +cases where you have fewer keys, you can add parallelism by adding a +supplementary key, splitting each of your problem's natural keys into many +sub-keys. After these sub-keys are aggregated, the results can be further +combined into a result for the original natural key for your problem. The +associativity of your aggregation function ensures that this yields the same +answer, but with more parallelism. + +When your input is unbounded, the computational pattern of grouping elements by +key and window is roughly the same, but governing when and how to emit the +results of aggregation involves three concepts: + + * Windowing, which partitions your input into bounded subsets that can be + complete. + * Watermarks, which estimate the completeness of your input. + * Triggers, which govern when and how to emit aggregated results. + +For more information about available aggregation transforms, see the following +pages: + + * [Beam Programming Guide: Core Beam transforms](/documentation/programming-guide/#core-beam-transforms) + * Beam Transform catalog + ([Java](/documentation/transforms/java/overview/#aggregation), + [Python](/documentation/transforms/python/overview/#aggregation)) + +### User-defined function (UDF) + +Some Beam operations allow you to run user-defined code as a way to configure +the transform. For example, when using `ParDo`, user-defined code specifies what +operation to apply to every element. For `Combine`, it specifies how values +should be combined. By using [cross-language transforms](/documentation/patterns/cross-language/), +a Beam pipeline can contain UDFs written in a different language, or even +multiple languages in the same pipeline. + +Beam has several varieties of UDFs: + + * [_DoFn_](/programming-guide/#pardo) - per-element processing function (used + in `ParDo`) + * [_WindowFn_](/programming-guide/#setting-your-pcollections-windowing-function) - + places elements in windows and merges windows (used in `Window` and + `GroupByKey`) + * [_ViewFn_](/documentation/programming-guide/#side-inputs) - adapts a + materialized `PCollection` to a particular interface (used in side inputs) + * [_WindowMappingFn_](/documentation/programming-guide/#side-inputs-windowing) - + maps one element's window to another, and specifies bounds on how far in the + past the result window will be (used in side inputs) + * [_CombineFn_](/documentation/programming-guide/#combine) - associative and + commutative aggregation (used in `Combine` and state) + * [_Coder_](/documentation/programming-guide/#data-encoding-and-type-safety) - + encodes user data; some coders have standard formats and are not really UDFs + +Each language SDK has its own idiomatic way of expressing the user-defined +functions in Beam, but there are common requirements. When you build user code +for a Beam transform, you should keep in mind the distributed nature of +execution. For example, there might be many copies of your function running on a +lot of different machines in parallel, and those copies function independently, +without communicating or sharing state with any of the other copies. Each copy +of your user code function might be retried or run multiple times, depending on +the pipeline runner and the processing backend that you choose for your +pipeline. Beam also supports stateful processing through the +[stateful processing API](/blog/stateful-processing/). + +For more information about user-defined functions, see the following pages: + + * [Requirements for writing user code for Beam transforms](/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms) + * [Beam Programming Guide: ParDo](/documentation/programming-guide/#pardo) + * [Beam Programming Guide: WindowFn](/programming-guide/#setting-your-pcollections-windowing-function) + * [Beam Programming Guide: CombineFn](/documentation/programming-guide/#combine) + * [Beam Programming Guide: Coder](/documentation/programming-guide/#data-encoding-and-type-safety) + * [Beam Programming Guide: Side inputs](/documentation/programming-guide/#side-inputs) + +### Schema + +A schema is a language-independent type definition for a `PCollection`. The +schema for a `PCollection` defines elements of that `PCollection` as an ordered +list of named fields. Each field has a name, a type, and possibly a set of user +options. + +In many cases, the element type in a `PCollection` has a structure that can be +introspected. Some examples are JSON, Protocol Buffer, Avro, and database row +objects. All of these formats can be converted to Beam Schemas. Even within a +SDK pipeline, Simple Java POJOs (or equivalent structures in other languages) +are often used as intermediate types, and these also have a clear structure that +can be inferred by inspecting the class. By understanding the structure of a +pipeline’s records, we can provide much more concise APIs for data processing. + +Beam provides a collection of transforms that operate natively on schemas. For +example, [Beam SQL](/documentation/dsls/sql/overview/) is a common transform +that operates on schemas. These transforms allow selections and aggregations in +terms of named schema fields. Another advantage of schemas is that they allow +referencing of element fields by name. Beam provides a selection syntax for +referencing fields, including nested and repeated fields. + +For more information about schemas, see the following pages: + + * [Beam Programming Guide: Schemas](/documentation/programming-guide/#schemas) + * [Schema Patterns](/documentation/patterns/schema/) ### Runner -The term "runner" is used for a couple of things. It generally refers to the -software that takes a Beam pipeline and executes it somehow. Often, this is the -translation code that you write. It usually also includes some customized -operators for your data processing engine, and is sometimes used to refer to -the full stack. +A Beam runner runs a Beam pipeline on a specific platform. Most runners are +translators or adapters to massively parallel big data processing systems, such +as Apache Flink, Apache Spark, Google Cloud Dataflow, and more. For example, the +Flink runner translates a Beam pipeline into a Flink job. The Direct Runner runs +pipelines locally so you can test, debug, and validate that your pipeline +adheres to the Apache Beam model as closely as possible. + +For an up-to-date list of Beam runners and which features of the Apache Beam +model they support, see the runner +[capability matrix](/documentation/runners/capability-matrix/). -A runner has just a single method `run(Pipeline)`. From here on, I will often -use code font for proper nouns in our APIs, whether or not the identifiers -match across all SDKs. +For more information about runners, see the following pages: -The `run(Pipeline)` method should be asynchronous and results in a -PipelineResult which generally will be a job descriptor for your data -processing engine, providing methods for checking its status, canceling it, and -waiting for it to terminate. + * [Choosing a Runner](/documentation/#choosing-a-runner) + * [Beam Capability Matrix](/documentation/runners/capability-matrix/) diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 9d3cac9..bde55f6 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -526,6 +526,10 @@ workers across a cluster may execute instances of your user code in parallel. The user code running on each worker generates the output elements that are ultimately added to the final output `PCollection` that the transform produces. +> Aggregation is an important concept to understand when learning about Beam's +> transforms. For an introduction to aggregation, see the Basics of the Beam +> model [Aggregation section](/documentation/basics/#aggregation). + The Beam SDKs contain a number of different transforms that you can apply to your pipeline's `PCollection`s. These include general-purpose core transforms, such as [ParDo](#pardo) or [Combine](#combine). There are also pre-written @@ -1274,6 +1278,19 @@ function. More complex combination operations might require you to create a <span class="language-java language-py">subclass of</span> `CombineFn` that has an accumulation type distinct from the input/output type. +The associativity and commutativity of a `CombineFn` allows runners to +automatically apply some optimizations: + + * **Combiner lifting**: This is the most significant optimization. Input + elements are combined per key and window before they are shuffled, so the + volume of data shuffled might be reduced by many orders of magnitude. Another + term for this optimization is "mapper-side combine." + * **Incremental combining**: When you have a `CombineFn` that reduces the data + size by a lot, it is useful to combine elements as they emerge from a + streaming shuffle. This spreads out the cost of doing combines over the time + that your streaming computation might be idle. Incremental combining also + reduces the storage of intermediate accumulators. + ##### 4.2.4.1. Simple combinations using simple functions {#simple-combines} The following example code shows a simple combine function. diff --git a/website/www/site/static/images/aggregation.png b/website/www/site/static/images/aggregation.png new file mode 100755 index 0000000..c26cc9f Binary files /dev/null and b/website/www/site/static/images/aggregation.png differ