This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fc52b63  Update SDF programming guide.
     new 774185b  Merge pull request #13326 from [BEAM-10480] Update SDF 
programming guide
fc52b63 is described below

commit fc52b633a2c1fe58d98290d131dd9dcb373165a8
Author: Boyuan Zhang <boyu...@google.com>
AuthorDate: Thu Nov 12 14:54:42 2020 -0800

    Update SDF programming guide.
---
 .../content/en/documentation/programming-guide.md  | 85 ++++++++++++++++++----
 1 file changed, 72 insertions(+), 13 deletions(-)

diff --git a/website/www/site/content/en/documentation/programming-guide.md 
b/website/www/site/content/en/documentation/programming-guide.md
index abd682e..d8b3ec0 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -5188,16 +5188,70 @@ restriction pairs.
 #### 12.1.1. A basic SDF {#a-basic-sdf}
 
 A basic SDF is composed of three parts: a restriction, a restriction provider, 
and a
-restriction tracker. The restriction is used to represent a subset of work for 
a given element.
-The restriction provider lets SDF authors override default implementations for 
splitting, sizing,
-watermark estimation, and so forth. In 
[Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92)
+restriction tracker. If you want to control the watermark, especially in a 
streaming
+pipeline, two more components are needed: a watermark estimator provider and a 
watermark estimator.
+
+The restriction is a user-defined object that is used to represent a subset of
+work for a given element. For example, we defined `OffsetRange` as a 
restriction to represent offset
+positions in 
[Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/range/OffsetRange.html)
+and 
[Python](https://beam.apache.org/releases/pydoc/current/apache_beam.io.restriction_trackers.html#apache_beam.io.restriction_trackers.OffsetRange).
+
+The restriction provider lets SDF authors override default implementations, 
including the ones for
+splitting and sizing. In 
[Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html)
 and 
[Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226),
-this is the `DoFn`. 
[Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213)
-has a dedicated RestrictionProvider type. The restriction tracker is 
responsible for tracking
-what subset of the restriction has been completed during processing.
+this is the `DoFn`. 
[Python](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.RestrictionProvider)
+has a dedicated `RestrictionProvider` type.
+
+The restriction tracker is responsible for tracking which subset of the 
restriction has been
+completed during processing. For APIs details, read the 
[Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.html)
+and 
[Python](https://beam.apache.org/releases/pydoc/current/apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker)
+reference documentation.
+
+There are some built-in `RestrictionTracker` implementations defined in Java:
+1. 
[OffsetRangeTracker](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html)
+2. 
[GrowableOffsetRangeTracker](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.html)
+3. 
[ByteKeyRangeTracker](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.html)
+
+The SDF also has a built-in `RestrictionTracker` implementation in Python:
+1. 
[OffsetRangeTracker](https://beam.apache.org/releases/pydoc/current/apache_beam.io.restriction_trackers.html#apache_beam.io.restriction_trackers.OffsetRestrictionTracker)
+
+The watermark state is a user-defined object which is used to create a 
`WatermarkEstimator` from a
+`WatermarkEstimatorProvider`. The simplest watermark state could be a 
`timestamp`.
+
+The watermark estimator provider lets SDF authors define how to initialize the 
watermark state and
+create a watermark estimator. In 
[Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html)
+this is the `DoFn`. 
[Python](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.WatermarkEstimatorProvider)
+has a dedicated `WatermarkEstimatorProvider` type.
+
+The watermark estimator tracks the watermark when an element-restriction pair 
is in progress.
+For APIs details, read the 
[Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.html)
+and 
[Python](https://beam.apache.org/releases/pydoc/current/apache_beam.io.iobase.html#apache_beam.io.iobase.WatermarkEstimator)
+reference documentation.
+
+There are some built-in `WatermarkEstimator` implementations in Java:
+1. 
[Manual](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.Manual.html)
+2. 
[MonotonicallyIncreasing](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.MonotonicallyIncreasing.html)
+3. 
[WallTime](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.WallTime.html)
+
+Along with the default `WatermarkEstimatorProvider`, there are the same set of 
built-in
+`WatermarkEstimator` implementations in Python:
+1. 
[ManualWatermarkEstimator](https://beam.apache.org/releases/pydoc/current/apache_beam.io.watermark_estimators.html#apache_beam.io.watermark_estimators.ManualWatermarkEstimator)
+2. 
[MonotonicWatermarkEstimator](https://beam.apache.org/releases/pydoc/current/apache_beam.io.watermark_estimators.html#apache_beam.io.watermark_estimators.MonotonicWatermarkEstimator)
+3. 
[WalltimeWatermarkEstimator](https://beam.apache.org/releases/pydoc/current/apache_beam.io.watermark_estimators.html#apache_beam.io.watermark_estimators.WalltimeWatermarkEstimator)
 
 To define an SDF, you must choose whether the SDF is bounded (default) or
-unbounded and define a way to initialize an initial restriction for an element.
+unbounded and define a way to initialize an initial restriction for an 
element. The distinction is
+based on how the amount of work is represented:
+* Bounded DoFns are those where the work represented by an element is 
well-known beforehand and has
+an end. Examples of bounded elements include a file or group of files.
+* Unbounded DoFns are those where the amount of work does not have a specific 
end or the
+amount of work is not known befrehand. Examples of unbounded elements include 
a Kafka or a PubSub
+topic.
+
+In Java, you can use 
[@UnboundedPerElement](https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/transforms/DoFn.UnboundedPerElement.html)
+or 
[@BoundedPerElement](https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/transforms/DoFn.BoundedPerElement.html)
+to annotate your `DoFn`. In Python, you can use 
[@unbounded_per_element](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.unbounded_per_element)
+to annotate the `DoFn`.
 
 {{< highlight java >}}
 {{< code_sample 
"examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" 
SDF_BasicExample >}}
@@ -5324,10 +5378,15 @@ resource utilization.
 A runner at any time may attempt to split a restriction while it is being 
processed. This allows the
 runner to either pause processing of the restriction so that other work may be 
done (common for
 unbounded restrictions to limit the amount of output and/or improve latency) 
or split the restriction
-into two pieces, increasing the available parallelism within the system. It is 
important to author a
-SDF with this in mind since the end of the restriction may change. Thus when 
writing the
-processing loop, it is important to use the result from trying to claim a 
piece of the restriction
-instead of assuming one can process till the end.
+into two pieces, increasing the available parallelism within the system. 
Different runners (e.g.,
+Dataflow, Flink, Spark) have different strategies to issue splits under batch 
and streaming
+execution.
+
+Author an SDF with this in mind since the end of the restriction may change. 
When writing the
+processing loop, use the result from trying to claim a piece of the 
restriction instead of assuming
+you can process until the end.
+
+One incorrect example could be:
 
 {{< highlight java >}}
 {{< code_sample 
"examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" 
SDF_BadTryClaimLoop >}}
@@ -5390,7 +5449,7 @@ estimate while external clock observing watermark 
estimators control the waterma
 is not associated to any individual output, such as the local clock of the 
machine or a clock exposed
 through an external service.
 
-The restriction provider lets you override the default watermark estimation 
logic and use an existing
+The watermark estimator provider lets you override the default watermark 
estimation logic and use an existing
 watermark estimator implementation. You can also provide your own watermark 
estimator implementation.
 
 {{< highlight java >}}
@@ -5431,4 +5490,4 @@ use case.
 
 {{< highlight py >}}
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" 
BundleFinalize >}}
-{{< /highlight >}}
\ No newline at end of file
+{{< /highlight >}}

Reply via email to