robertwb commented on code in PR #27632: URL: https://github.com/apache/beam/pull/27632#discussion_r1289267358
########## website/www/site/content/en/contribute/runner-guide.md: ########## @@ -89,6 +117,13 @@ But if your data is arriving as a stream, then you will want to terminate a bundle in order to achieve appropriate latency, so bundles may be just a few elements. +A bundle is the unit of commitment in Beam. If an error is encountered while +processing a bundle, all the prior outputs of that bundle (including any +modifications to state or timers) must be discarded and the entire bundle +retried. Upon successful completion of a bundle, its outputs, together with +any state/timer modifications and watermark updates, must be committed +atomically. + #### The DoFn Lifecycle While each language's SDK is free to make different decisions, the Python and Review Comment: I mostly left this here because I wasn't sure it was documented elsewhere, but it turns out it is so I'm just pointing to a reference and leaving this more portable-centric. (Agree this could use a much more invasive portable-first refactoring, but given limited time I wanted to focus on getting the basic points out first.) ########## website/www/site/content/en/contribute/runner-guide.md: ########## @@ -249,8 +251,9 @@ to group in a way that is consistent with grouping by those bytes, even if you have some special knowledge of the types involved. The elements you are processing will be key-value pairs, and you'll need to extract -the keys. For this reason, the format of key-value pairs is standardized and -shared across all SDKS. See either +the keys. For this reason, the format of key-value pairs is +[standardized and shared](https://github.com/apache/beam/blob/release-2.49.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L838) +across all SDKS. See either [`KvCoder`](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/coders/KvCoder.html) in Java or [`TupleCoder`](https://beam.apache.org/releases/pydoc/2.0.0/apache_beam.coders.html#apache_beam.coders.coders.TupleCoder.key_coder) Review Comment: Done. ########## website/www/site/content/en/contribute/runner-guide.md: ########## @@ -160,64 +161,51 @@ it from the main input, which is processed one element at a time. The SDK/user prepares a `PCollection` adequately, the runner materializes it, and then the runner feeds it to the `DoFn`. -What you will need to implement is to inspect the materialization requested for -the side input, and prepare it appropriately, and corresponding interactions -when a `DoFn` reads the side inputs. - -The details and available support code vary by language. - -**Java** - -If you are using one of the above `DoFnRunner` classes, then the interface for -letting them request side inputs is -[`SideInputReader`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java). -It is a simple mapping from side input and window to a value. The `DoFnRunner` -will perform a mapping with the -[`WindowMappingFn`](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java) -to request the appropriate window so you do not worry about invoking this UDF. -When using the Fn API, it will be the SDK harness that maps windows as well. - -A simple, but not necessarily optimal approach to building a -[`SideInputReader`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java) -is to use a state backend. In our Java support code, this is called -[`StateInternals`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java) -and you can build a -[`SideInputHandler`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java) -that will use your `StateInternals` to materialize a `PCollection` into the -appropriate side input view and then yield the value when requested for a -particular side input and window. +Unlike main input data, which is *pushed* by the runner to the `ParDo` (generally +via the FnApi Data channel), side input data is *pulled* by the `ParDo` +from the runner (generally over the FnAPI State channel). + +A side input is accessed via a specific `access_pattern`. +There are currently two access patterns enumerated in the +`StandardSideInputTypes` proto: `beam:side_input:iterable:v1` which indicates +the runner must return all values in a PCollection corresponding to a specific +window and `beam:side_input:multimap:v1` which indicates the runner must return +all values corresponding to a specific key and window. +Being able to serve these access patterns efficiently may influence how a +runner materializes this PCollection. + +SideInputs can be detected by looking at the `side_inputs` map in the +`ParDoPayload` of `ParDo` transforms. +The `ParDo` operation itself is responsible for invoking the +`window_mapping_fn` (before invoking the runner) and `view_fn` (on the +runner-returned values), so the runner need not concern itself with these +fields. When a side input is needed but the side input has no data associated with it for a given window, elements in that window must be deferred until the side -input has some data. The aforementioned +input has some data or the watermark has advances sufficiently such that +we can be sure there will be no data for that window. The [`PushBackSideInputDoFnRunner`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java) -is used to implement this. - -**Python** - -In Python, [`SideInputMap`](https://beam.apache.org/releases/pydoc/2.0.0/apache_beam.transforms.html#apache_beam.transforms.sideinputs.SideInputMap) maps -windows to side input values. The `WindowMappingFn` manifests as a simple -function. See -[sideinputs.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sideinputs.py). +is an example of implementing this. #### State and Timers _Main design document: [https://s.apache.org/beam-state](https://s.apache.org/beam-state)_ When a `ParDo` includes state and timers, its execution on your runner is usually -very different. See the full details beyond those covered here. +very different. In particular, the state must be persisted when the bundle +completes and retrieved for future bundles. Timers that are set must also be +injected into future bundles as the watermark advances sufficiently. -State and timers are partitioned per key and window. You may need or want to +State and timers are partitioned per key and window, that is, a `DoFn` +processing a given key must have a consistent view of the state and timers +across all elements that share this key. You may need or want to explicitly shuffle data to support this. +Once the watermark has passed the end of the window (plus an allowance for +allowed lateness, if any), state associated with this window can be dropped. -**Java** - -We provide -[`StatefulDoFnRunner`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java) -to help with state cleanup. The non-user-facing interface -[`StateInternals`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java) -is what a runner generally implements, and then the Beam support code can use -this to implement user-facing state. +State setting and retrieval is performed on the FnAPI State channel, whereas +timer setting and firing happens on the FnAPI Data channel. #### Splittable DoFn Review Comment: Rewritten. ########## website/www/site/content/en/contribute/runner-guide.md: ########## @@ -57,6 +56,19 @@ native environment, this may look like throwing an `UnsupportedOperationException`. The Runner API RPCs will make this explicit, for cross-language portability. +### Implementing the Impulse primitive + +`Impulse` is a PTransform that takes no inputs and produces exactly one output +during the lifetime of the pipeline which should be the empty bytes in the +global window with the minimum timestamp. This has the encoded value of +`7f df 3b 64 5a 1c ac 09 00 00 00 01 0f 00` when encoded with the standard +windowed value coder. + +Though `Impulse` is generally not invoked by a user, it is the only root +primitive operation, and other root operations (like Reads and `Create`) Review Comment: Here I mention Reads as they are what a general user might think of as a special "root" operation to point out that they are in fact composites and do not need primitive treatment. ########## website/www/site/content/en/contribute/runner-guide.md: ########## @@ -399,59 +370,28 @@ fast path as an optimization. ### Special mention: the Combine composite A composite transform that is almost always treated specially by a runner is -`Combine` (per key), which applies an associative and commutative operator to +`CombinePerKey`, which applies an associative and commutative operator to the elements of a `PCollection`. This composite is not a primitive. It is implemented in terms of `ParDo` and `GroupByKey`, so your runner will work without treating it - but it does carry additional information that you probably want to use for optimizations: the associative-commutative operator, known as a `CombineFn`. +Generally runners will want to implement this via what is called +combiner lifting, where a new operation is placed before the `GroupByKey` +that does partial (within-bundle) combining, which often requires a slight +modification of what comes after the `GroupByKey` as well. +An example of this transformation can be found in the Review Comment: Those are indeed very nice comments. Linked. ########## website/www/site/content/en/contribute/runner-guide.md: ########## @@ -555,6 +495,10 @@ All runner code should go in it's own package in `apache_beam/runners` directory Register the new runner in the `create_runner` function of `runner.py` so that the partial name is matched with the correct class to be used. +Python Runners can also be identified (e.g. when passing the runner parameter) +by their fully qualified name whether or not they live in the Beam repository. + + Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
