This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new d852114 Remove managing late data not supported by python sdk note new a8af0e1 Merge pull request #10761 from y1chi/fix_doc d852114 is described below commit d85211428f5e39ba59be72ec11510455e89e329e Author: Yichi Zhang <zyi...@google.com> AuthorDate: Mon Feb 3 18:09:45 2020 -0800 Remove managing late data not supported by python sdk note --- website/src/documentation/programming-guide.md | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/website/src/documentation/programming-guide.md b/website/src/documentation/programming-guide.md index 510d7cd..ca12d67 100644 --- a/website/src/documentation/programming-guide.md +++ b/website/src/documentation/programming-guide.md @@ -2577,7 +2577,6 @@ elements. #### 7.4.1. Managing late data {#managing-late-data} -> **Note:** Managing late data is not supported in the Beam SDK for Python. You can allow late data by invoking the `.withAllowedLateness` operation when you set your `PCollection`'s windowing strategy. The following code example @@ -2591,6 +2590,15 @@ the end of a window. .withAllowedLateness(Duration.standardDays(2))); ``` +```py + pc = [Initial PCollection] + pc | beam.WindowInto( + FixedWindows(60), + trigger=trigger_fn, + accumulation_mode=accumulation_mode, + timestamp_combiner=timestamp_combiner, + allowed_lateness=Duration(seconds=2*24*60*60)) # 2 days +``` When you set `.withAllowedLateness` on a `PCollection`, that allowed lateness propagates forward to any subsequent `PCollection` derived from the first `PCollection` you applied allowed lateness to. If you want to change the allowed @@ -2858,7 +2866,6 @@ on each firing: #### 8.4.2. Handling late data {#handling-late-data} -> The Beam SDK for Python does not currently support allowed lateness. If you want your pipeline to process data that arrives after the watermark passes the end of the window, you can apply an *allowed lateness* when you set @@ -2877,7 +2884,13 @@ windowing function: .withAllowedLateness(Duration.standardMinutes(30)); ``` ```py - # The Beam SDK for Python does not currently support allowed lateness. + pc = [Initial PCollection] + pc | beam.WindowInto( + FixedWindows(60), + trigger=AfterProcessingTime(60), + allowed_lateness=1800) # 30 minutes + | ... + ``` This allowed lateness propagates to all `PCollection`s derived as a result of @@ -3107,4 +3120,4 @@ public class MyMetricsDoFn extends DoFn<Integer, Integer> { context.output(context.element()); } } -``` \ No newline at end of file +```