This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new fc52b63 Update SDF programming guide. new 774185b Merge pull request #13326 from [BEAM-10480] Update SDF programming guide fc52b63 is described below commit fc52b633a2c1fe58d98290d131dd9dcb373165a8 Author: Boyuan Zhang <boyu...@google.com> AuthorDate: Thu Nov 12 14:54:42 2020 -0800 Update SDF programming guide. --- .../content/en/documentation/programming-guide.md | 85 ++++++++++++++++++---- 1 file changed, 72 insertions(+), 13 deletions(-) diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index abd682e..d8b3ec0 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -5188,16 +5188,70 @@ restriction pairs. #### 12.1.1. A basic SDF {#a-basic-sdf} A basic SDF is composed of three parts: a restriction, a restriction provider, and a -restriction tracker. The restriction is used to represent a subset of work for a given element. -The restriction provider lets SDF authors override default implementations for splitting, sizing, -watermark estimation, and so forth. In [Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92) +restriction tracker. If you want to control the watermark, especially in a streaming +pipeline, two more components are needed: a watermark estimator provider and a watermark estimator. + +The restriction is a user-defined object that is used to represent a subset of +work for a given element. For example, we defined `OffsetRange` as a restriction to represent offset +positions in [Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/range/OffsetRange.html) +and [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.io.restriction_trackers.html#apache_beam.io.restriction_trackers.OffsetRange). + +The restriction provider lets SDF authors override default implementations, including the ones for +splitting and sizing. In [Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html) and [Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226), -this is the `DoFn`. [Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213) -has a dedicated RestrictionProvider type. The restriction tracker is responsible for tracking -what subset of the restriction has been completed during processing. +this is the `DoFn`. [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.RestrictionProvider) +has a dedicated `RestrictionProvider` type. + +The restriction tracker is responsible for tracking which subset of the restriction has been +completed during processing. For APIs details, read the [Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.html) +and [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker) +reference documentation. + +There are some built-in `RestrictionTracker` implementations defined in Java: +1. [OffsetRangeTracker](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html) +2. [GrowableOffsetRangeTracker](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.html) +3. [ByteKeyRangeTracker](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.html) + +The SDF also has a built-in `RestrictionTracker` implementation in Python: +1. [OffsetRangeTracker](https://beam.apache.org/releases/pydoc/current/apache_beam.io.restriction_trackers.html#apache_beam.io.restriction_trackers.OffsetRestrictionTracker) + +The watermark state is a user-defined object which is used to create a `WatermarkEstimator` from a +`WatermarkEstimatorProvider`. The simplest watermark state could be a `timestamp`. + +The watermark estimator provider lets SDF authors define how to initialize the watermark state and +create a watermark estimator. In [Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html) +this is the `DoFn`. [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.WatermarkEstimatorProvider) +has a dedicated `WatermarkEstimatorProvider` type. + +The watermark estimator tracks the watermark when an element-restriction pair is in progress. +For APIs details, read the [Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.html) +and [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.io.iobase.html#apache_beam.io.iobase.WatermarkEstimator) +reference documentation. + +There are some built-in `WatermarkEstimator` implementations in Java: +1. [Manual](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.Manual.html) +2. [MonotonicallyIncreasing](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.MonotonicallyIncreasing.html) +3. [WallTime](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.WallTime.html) + +Along with the default `WatermarkEstimatorProvider`, there are the same set of built-in +`WatermarkEstimator` implementations in Python: +1. [ManualWatermarkEstimator](https://beam.apache.org/releases/pydoc/current/apache_beam.io.watermark_estimators.html#apache_beam.io.watermark_estimators.ManualWatermarkEstimator) +2. [MonotonicWatermarkEstimator](https://beam.apache.org/releases/pydoc/current/apache_beam.io.watermark_estimators.html#apache_beam.io.watermark_estimators.MonotonicWatermarkEstimator) +3. [WalltimeWatermarkEstimator](https://beam.apache.org/releases/pydoc/current/apache_beam.io.watermark_estimators.html#apache_beam.io.watermark_estimators.WalltimeWatermarkEstimator) To define an SDF, you must choose whether the SDF is bounded (default) or -unbounded and define a way to initialize an initial restriction for an element. +unbounded and define a way to initialize an initial restriction for an element. The distinction is +based on how the amount of work is represented: +* Bounded DoFns are those where the work represented by an element is well-known beforehand and has +an end. Examples of bounded elements include a file or group of files. +* Unbounded DoFns are those where the amount of work does not have a specific end or the +amount of work is not known befrehand. Examples of unbounded elements include a Kafka or a PubSub +topic. + +In Java, you can use [@UnboundedPerElement](https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/transforms/DoFn.UnboundedPerElement.html) +or [@BoundedPerElement](https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/transforms/DoFn.BoundedPerElement.html) +to annotate your `DoFn`. In Python, you can use [@unbounded_per_element](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.unbounded_per_element) +to annotate the `DoFn`. {{< highlight java >}} {{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExample >}} @@ -5324,10 +5378,15 @@ resource utilization. A runner at any time may attempt to split a restriction while it is being processed. This allows the runner to either pause processing of the restriction so that other work may be done (common for unbounded restrictions to limit the amount of output and/or improve latency) or split the restriction -into two pieces, increasing the available parallelism within the system. It is important to author a -SDF with this in mind since the end of the restriction may change. Thus when writing the -processing loop, it is important to use the result from trying to claim a piece of the restriction -instead of assuming one can process till the end. +into two pieces, increasing the available parallelism within the system. Different runners (e.g., +Dataflow, Flink, Spark) have different strategies to issue splits under batch and streaming +execution. + +Author an SDF with this in mind since the end of the restriction may change. When writing the +processing loop, use the result from trying to claim a piece of the restriction instead of assuming +you can process until the end. + +One incorrect example could be: {{< highlight java >}} {{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BadTryClaimLoop >}} @@ -5390,7 +5449,7 @@ estimate while external clock observing watermark estimators control the waterma is not associated to any individual output, such as the local clock of the machine or a clock exposed through an external service. -The restriction provider lets you override the default watermark estimation logic and use an existing +The watermark estimator provider lets you override the default watermark estimation logic and use an existing watermark estimator implementation. You can also provide your own watermark estimator implementation. {{< highlight java >}} @@ -5431,4 +5490,4 @@ use case. {{< highlight py >}} {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" BundleFinalize >}} -{{< /highlight >}} \ No newline at end of file +{{< /highlight >}}