Repository: beam-site
Updated Branches:
  refs/heads/asf-site 3b3bc65c2 -> 61de614fb


Add windowing section to programming guide


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/ddb60795
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/ddb60795
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/ddb60795

Branch: refs/heads/asf-site
Commit: ddb6079538a1683b798c72644eaa32f5aa7211bb
Parents: 3b3bc65
Author: melissa <meliss...@google.com>
Authored: Wed Mar 1 19:54:17 2017 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Mar 20 14:56:17 2017 -0700

----------------------------------------------------------------------
 src/documentation/programming-guide.md      | 226 ++++++++++++++++++++++-
 src/images/fixed-time-windows.png           | Bin 0 -> 11717 bytes
 src/images/session-windows.png              | Bin 0 -> 16697 bytes
 src/images/sliding-time-windows.png         | Bin 0 -> 16537 bytes
 src/images/unwindowed-pipeline-bounded.png  | Bin 0 -> 9589 bytes
 src/images/windowing-pipeline-bounded.png   | Bin 0 -> 13325 bytes
 src/images/windowing-pipeline-unbounded.png | Bin 0 -> 21890 bytes
 7 files changed, 222 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/ddb60795/src/documentation/programming-guide.md
----------------------------------------------------------------------
diff --git a/src/documentation/programming-guide.md 
b/src/documentation/programming-guide.md
index 57b49e8..e91a856 100644
--- a/src/documentation/programming-guide.md
+++ b/src/documentation/programming-guide.md
@@ -197,7 +197,7 @@ A `PCollection` can be either **bounded** or **unbounded** 
in size. A **bounded*
 
 The bounded (or unbounded) nature of your `PCollection` affects how Beam 
processes your data. A bounded `PCollection` can be processed using a batch 
job, which might read the entire data set once, and perform processing in a job 
of finite length. An unbounded `PCollection` must be processed using a 
streaming job that runs continuously, as the entire collection can never be 
available for processing at any one time.
 
-When performing an operation that groups elements in an unbounded 
`PCollection`, Beam requires a concept called **Windowing** to divide a 
continuously updating data set into logical windows of finite size.  Beam 
processes each window as a bundle, and processing continues as the data set is 
generated. These logical windows are determined by some characteristic 
associated with a data element, such as a **timestamp**.
+When performing an operation that groups elements in an unbounded 
`PCollection`, Beam requires a concept called **windowing** to divide a 
continuously updating data set into logical windows of finite size.  Beam 
processes each window as a bundle, and processing continues as the data set is 
generated. These logical windows are determined by some characteristic 
associated with a data element, such as a **timestamp**.
 
 #### <a name="pctimestamps"></a>Element timestamps
 
@@ -1193,7 +1193,7 @@ To set the default Coder for a <span 
class="language-java">Java</span> <span cla
 
 The following example code demonstrates how to set a default Coder, in this 
case `BigEndianIntegerCoder`, for <span class="language-java">Integer</span> 
<span class="language-py">int</span> values for a pipeline.
 
-```java  
+```java
 PipelineOptions options = PipelineOptionsFactory.create();
 Pipeline p = Pipeline.create(options);
 
@@ -1235,7 +1235,225 @@ public class MyCustomDataType {
 {:.language-py}
 The Beam SDK for Python does not support annotating data types with a default 
coder. If you would like to set a default coder, use the method described in 
the previous section, *Setting the default coder for a type*.
 
-<a name="windowing"></a>
-<a name="triggers"></a>
+## <a name="windowing"></a>Working with windowing
+
+Windowing subdivides a `PCollection` according to the timestamps of its 
individual elements. Transforms that aggregate multiple elements, such as 
`GroupByKey` and `Combine`, work implicitly on a per-window basis—that is, 
they process each `PCollection` as a succession of multiple, finite windows, 
though the entire collection itself may be of unbounded size.
+
+A related concept, called **triggers**, determines when to emit the results of 
aggregation as unbounded data arrives. Using a trigger can help to refine the 
windowing strategy for your `PCollection` to deal with late-arriving data or to 
provide early results. See the [triggers](#triggers) section for more 
information.
+
+### Windowing basics
+
+Some Beam transforms, such as `GroupByKey` and `Combine`, group multiple 
elements by a common key. Ordinarily, that grouping operation groups all of the 
elements that have the same key within the entire data set. With an unbounded 
data set, it is impossible to collect all of the elements, since new elements 
are constantly being added and may be infinitely many (e.g. streaming data). If 
you are working with unbounded `PCollection`s, windowing is especially useful.
+
+In the Beam model, any `PCollection` (including unbounded `PCollection`s) can 
be subdivided into logical windows. Each element in a `PCollection` is assigned 
to one or more windows according to the `PCollection`'s windowing function, and 
each individual window contains a finite number of elements. Grouping 
transforms then consider each `PCollection`'s elements on a per-window basis. 
`GroupByKey`, for example, implicitly groups the elements of a `PCollection` by 
_key and window_.
+
+**Caution:** The default windowing behavior is to assign all elements of a 
`PCollection` to a single, global window, _even for unbounded `PCollection`s_. 
Before you use a grouping transform such as `GroupByKey` on an unbounded 
`PCollection`, you must do at least one of the following:
+ * Set a non-global windowing function. See [Setting your PCollection's 
windowing function](#setwindowingfunction).
+ * Set a non-default [trigger](#triggers). This allows the global window to 
emit results under other conditions, since the default windowing behavior 
(waiting for all data to arrive) will never occur.
+
+If you don't set a non-global windowing function or a non-default trigger for 
your unbounded `PCollection` and subsequently use a grouping transform such as 
`GroupByKey` or `Combine`, your pipeline will generate an error upon 
construction and your job will fail.
+
+#### Windowing constraints
+
+After you set the windowing function for a `PCollection`, the elements' 
windows are used the next time you apply a grouping transform to that 
`PCollection`. Window grouping occurs on an as-needed basis. If you set a 
windowing function using the `Window` transform, each element is assigned to a 
window, but the windows are not considered until `GroupByKey` or `Combine` 
aggregates across a window and key. This can have different effects on your 
pipeline.
+Consider the example pipeline in the figure below:
+
+![Diagram of pipeline applying windowing]({{ 
"/images/windowing-pipeline-unbounded.png" | prepend: site.baseurl }} "Pipeline 
applying windowing")
+
+**Figure:** Pipeline applying windowing
+
+In the above pipeline, we create an unbounded `PCollection` by reading a set 
of key/value pairs using `KafkaIO`, and then apply a windowing function to that 
collection using the `Window` transform. We then apply a `ParDo` to the the 
collection, and then later group the result of that `ParDo` using `GroupByKey`. 
The windowing function has no effect on the `ParDo` transform, because the 
windows are not actually used until they're needed for the `GroupByKey`.
+Subsequent transforms, however, are applied to the result of the `GroupByKey` 
-- data is grouped by both key and window.
+
+#### Using windowing with bounded PCollections
+
+You can use windowing with fixed-size data sets in **bounded** `PCollection`s. 
However, note that windowing considers only the implicit timestamps attached to 
each element of a `PCollection`, and data sources that create fixed data sets 
(such as `TextIO`) assign the same timestamp to every element. This means that 
all the elements are by default part of a single, global window.
+
+To use windowing with fixed data sets, you can assign your own timestamps to 
each element. To assign timestamps to elements, use a `ParDo` transform with a 
`DoFn` that outputs each element with a new timestamp (for example, the 
[WithTimestamps]({{ site.baseurl }}/documentation/sdks/javadoc/{{ 
site.release_latest 
}}/index.html?org/apache/beam/sdk/transforms/WithTimestamps.html) transform in 
the Beam SDK for Java).
+
+To illustrate how windowing with a bounded `PCollection` can affect how your 
pipeline processes data, consider the following pipeline:
+
+![Diagram of GroupByKey and ParDo without windowing, on a bounded 
collection]({{ "/images/unwindowed-pipeline-bounded.png" | prepend: 
site.baseurl }} "GroupByKey and ParDo without windowing, on a bounded 
collection")
+
+**Figure:** `GroupByKey` and `ParDo` without windowing, on a bounded 
collection.
+
+In the above pipeline, we create a bounded `PCollection` by reading a set of 
key/value pairs using `TextIO`. We then group the collection using 
`GroupByKey`, and apply a `ParDo` transform to the grouped `PCollection`. In 
this example, the `GroupByKey` creates a collection of unique keys, and then 
`ParDo` gets applied exactly once per key.
+
+Note that even if you don’t set a windowing function, there is still a 
window -- all elements in your `PCollection` are assigned to a single global 
window.
+
+Now, consider the same pipeline, but using a windowing function:
+
+![Diagram of GroupByKey and ParDo with windowing, on a bounded collection]({{ 
"/images/windowing-pipeline-bounded.png" | prepend: site.baseurl }} "GroupByKey 
and ParDo with windowing, on a bounded collection")
+
+**Figure:** `GroupByKey` and `ParDo` with windowing, on a bounded collection.
+
+As before, the pipeline creates a bounded `PCollection` of key/value pairs. We 
then set a [windowing function](#setwindowingfunction) for that `PCollection`. 
The `GroupByKey` transform groups the elements of the `PCollection` by both key 
and window, based on the windowing function. The subsequent `ParDo` transform 
gets applied multiple times per key, once for each window.
+
+### Windowing functions
+
+You can define different kinds of windows to divide the elements of your 
`PCollection`. Beam provides several windowing functions, including:
+
+*  Fixed Time Windows
+*  Sliding Time Windows
+*  Per-Session Windows
+*  Single Global Window
+*  Calendar-based Windows (not supported by the Beam SDK for Python)
+
+Note that each element can logically belong to more than one window, depending 
on the windowing function you use. Sliding time windowing, for example, creates 
overlapping windows wherein a single element can be assigned to multiple 
windows.
+
+#### Fixed time windows
+
+The simplest form of windowing is using **fixed time windows**: given a 
timestamped `PCollection` which might be continuously updating, each window 
might capture (for example) all elements with timestamps that fall into a five 
minute interval.
+
+A fixed time window represents a consistent duration, non overlapping time 
interval in the data stream. Consider windows with a five-minute duration: all 
of the elements in your unbounded `PCollection` with timestamp values from 
0:00:00 up to (but not including) 0:05:00 belong to the first window, elements 
with timestamp values from 0:05:00 up to (but not including) 0:10:00 belong to 
the second window, and so on.
+
+![Diagram of fixed time windows, 30s in duration]({{ 
"/images/fixed-time-windows.png" | prepend: site.baseurl }} "Fixed time 
windows, 30s in duration")
+
+**Figure:** Fixed time windows, 30s in duration.
+
+#### Sliding time windows
+
+A **sliding time window** also represents time intervals in the data stream; 
however, sliding time windows can overlap. For example, each window might 
capture five minutes worth of data, but a new window starts every ten seconds. 
The frequency with which sliding windows begin is called the _period_. 
Therefore, our example would have a window _duration_ of five minutes and a 
_period_ of ten seconds.
+
+Because multiple windows overlap, most elements in a data set will belong to 
more than one window. This kind of windowing is useful for taking running 
averages of data; using sliding time windows, you can compute a running average 
of the past five minutes' worth of data, updated every ten seconds, in our 
example.
+
+![Diagram of sliding time windows, with 1 minute window duration and 30s 
window period]({{ "/images/sliding-time-windows.png" | prepend: site.baseurl }} 
"Sliding time windows, with 1 minute window duration and 30s window period")
+
+**Figure:** Sliding time windows, with 1 minute window duration and 30s window 
period.
+
+#### Session windows
+
+A **session window** function defines windows that contain elements that are 
within a certain gap duration of another element. Session windowing applies on 
a per-key basis and is useful for data that is irregularly distributed with 
respect to time. For example, a data stream representing user mouse activity 
may have long periods of idle time interspersed with high concentrations of 
clicks. If data arrives after the minimum specified gap duration time, this 
initiates the start of a new window.
+
+![Diagram of session windows with a minimum gap duration]({{ 
"/images/session-windows.png" | prepend: site.baseurl }} "Session windows, with 
a minimum gap duration")
+
+**Figure:** Session windows, with a minimum gap duration. Note how each data 
key has different windows, according to its data distribution.
+
+#### Single global window
+
+By default, all data in a `PCollection` is assigned to a single global window. 
If your data set is of a fixed size, you can leave the global window default 
for your `PCollection`.
+
+You can use a single global window if you are working with an unbounded data 
set, e.g. from a streaming data source; however, use caution when applying 
aggregating transforms such as `GroupByKey` and `Combine`. A single global 
window with a default trigger generally requires the entire data set to be 
available before processing, which is not possible with continuously updating 
data. To perform aggregations on an unbounded `PCollection` that uses global 
windowing, you should specify a non-default trigger for that `PCollection`.
+
+### <a name="setwindowingfunction"></a>Setting your PCollection's windowing 
function
+
+You can set the windowing function for a `PCollection` by applying the 
`Window` transform. When you apply the `Window` transform, you must provide a 
`WindowFn`. The `WindowFn` determines the windowing function your `PCollection` 
will use for subsequent grouping transforms, such as a fixed or sliding time 
window.
+
+Beam provides pre-defined `WindownFn`s for the basic windowing functions 
described here. You can also define your own `WindowFn` if you have a more 
complex need.
+
+When setting a windowing function, you may also want to set a trigger for your 
`PCollection`. The trigger determines when each individual window is aggregated 
and emitted, and helps refine how the windowing function performs with respect 
to late data and computing early results. See the [triggers](#triggers) section 
for more information.
+
+#### Setting fixed-time windows
+
+The following example code shows how to apply `Window` to divide a 
`PCollection` into fixed windows, each one minute in length:
+```java
+    PCollection<String> items = ...;
+    PCollection<String> fixed_windowed_items = items.apply(
+        Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
+```
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py
 tag:setting_fixed_windows
+%}
+```
+
+#### Setting sliding time windows
+
+The following example code shows how to apply `Window` to divide a 
`PCollection` into sliding time windows. Each window is 30 minutes in length, 
and a new window begins every five seconds:
+```java
+    PCollection<String> items = ...;
+    PCollection<String> sliding_windowed_items = items.apply(
+        
Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));
+```
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py
 tag:setting_sliding_windows
+%}
+```
+
+#### Setting session windows
+
+The following example code shows how to apply `Window` to divide a 
`PCollection` into session windows, where each session must be separated by a 
time gap of at least 10 minutes:
+```java
+    PCollection<String> items = ...;
+    PCollection<String> session_windowed_items = items.apply(
+        
Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(10))));
+```
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py
 tag:setting_session_windows
+%}
+```
+
+Note that the sessions are per-key — each key in the collection will have 
its own session groupings depending on the data distribution.
+
+#### Setting a single global window
+
+If your `PCollection` is bounded (the size is fixed), you can assign all the 
elements to a single global window. The following example code shows how to set 
a single global window for a `PCollection`:
+
+```java
+    PCollection<String> items = ...;
+    PCollection<String> batch_items = items.apply(
+        Window.<String>into(new GlobalWindows()));
+```
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py
 tag:setting_global_window
+%}
+```
+
+### Time skew, data lag, and late data
+
+In any data processing system, there is a certain amount of lag between the 
time a data event occurs (the "event time", determined by the timestamp on the 
data element itself) and the time the actual data element gets processed at any 
stage in your pipeline (the "processing time", determined by the clock on the 
system processing the element). In addition, there are no guarantees that data 
events will appear in your pipeline in the same order that they were generated.
+
+For example, let's say we have a `PCollection` that's using fixed-time 
windowing, with windows that are five minutes long. For each window, Beam must 
collect all the data with an _event time_ timestamp in the given window range 
(between 0:00 and 4:59 in the first window, for instance). Data with timestamps 
outside that range (data from 5:00 or later) belong to a different window.
+
+However, data isn't always guaranteed to arrive in a pipeline in time order, 
or to always arrive at predictable intervals. Beam tracks a _watermark_, which 
is the system's notion of when all data in a certain window can be expected to 
have arrived in the pipeline. Data that arrives with a timestamp after the 
watermark is considered **late data**.
+
+From our example, suppose we have a simple watermark that assumes 
approximately 30s of lag time between the data timestamps (the event time) and 
the time the data appears in the pipeline (the processing time), then Beam 
would close the first window at 5:30. If a data record arrives at 5:34, but 
with a timestamp that would put it in the 0:00-4:59 window (say, 3:38), then 
that record is late data.
+
+Note: For simplicity, we've assumed that we're using a very straightforward 
watermark that estimates the lag time/time skew. In practice, your 
`PCollection`'s data source determines the watermark, and watermarks can be 
more precise or complex.
+
+#### Managing time skew and late data
+
+> **Note:** Managing time skew and late data is not supported in the Beam SDK 
for Python.
+
+You can allow late data by invoking the `.withAllowedLateness` operation when 
you set your `PCollection`'s windowing strategy. The following code example 
demonstrates a windowing strategy that will allow late data up to two days 
after the end of a window.
+```java
+    PCollection<String> items = ...;
+    PCollection<String> fixed_windowed_items = items.apply(
+        Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
+              .withAllowedLateness(Duration.standardDays(2)));
+```
+
+When you set `.withAllowedLateness` on a `PCollection`, that allowed lateness 
propagates forward to any subsequent `PCollection` derived from the first 
`PCollection` you applied allowed lateness to. If you want to change the 
allowed lateness later in your pipeline, you must do so explictly by applying 
`Window.withAllowedLateness()` again.
+
+You can also use triggers to help you refine the windowing strategy for a 
`PCollection`. You can use triggers to determine exactly when each individual 
window aggregates and reports its results, including how the window emits late 
elements.
+
+### Adding timestamps to a PCollection's elements
+
+An unbounded source provides a timestamp for each element. Depending on your 
unbounded source, you may need to configure how the timestamp is extracted from 
the raw data stream.
+
+However, bounded sources (such as a file from `TextIO`) do not provide 
timestamps. If you need timestamps, you must add them to your `PCollection`’s 
elements.
+
+You can assign new timestamps to the elements of a `PCollection` by applying a 
[ParDo](#transforms-pardo) transform that outputs new elements with timestamps 
that you set.
+
+An example might be if your pipeline reads log records from an input file, and 
each log record includes a timestamp field; since your pipeline reads the 
records in from a file, the file source doesn't assign timestamps 
automatically. You can parse the timestamp field from each record and use a 
`ParDo` transform with a `DoFn` to attach the timestamps to each element in 
your `PCollection`.
+
+```java
+      PCollection<LogEntry> unstampedLogs = ...;
+      PCollection<LogEntry> stampedLogs =
+          unstampedLogs.apply(ParDo.of(new DoFn<LogEntry, LogEntry>() {
+            public void processElement(ProcessContext c) {
+              // Extract the timestamp from log entry we're currently 
processing.
+              Instant logTimeStamp = extractTimeStampFromLogEntry(c.element());
+              // Use ProcessContext.outputWithTimestamp (rather than
+              // ProcessContext.output) to emit the entry with timestamp 
attached.
+              c.outputWithTimestamp(c.element(), logTimeStamp);
+            }
+          }));
+```
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py
 tag:setting_timestamp
+%}
+```
+
+## <a name="triggers"></a>Working with triggers
 
 > **Note:** This guide is still in progress. There is an open issue to finish 
 > the guide ([BEAM-193](https://issues.apache.org/jira/browse/BEAM-193))

http://git-wip-us.apache.org/repos/asf/beam-site/blob/ddb60795/src/images/fixed-time-windows.png
----------------------------------------------------------------------
diff --git a/src/images/fixed-time-windows.png 
b/src/images/fixed-time-windows.png
new file mode 100644
index 0000000..832dc64
Binary files /dev/null and b/src/images/fixed-time-windows.png differ

http://git-wip-us.apache.org/repos/asf/beam-site/blob/ddb60795/src/images/session-windows.png
----------------------------------------------------------------------
diff --git a/src/images/session-windows.png b/src/images/session-windows.png
new file mode 100644
index 0000000..3ce844c
Binary files /dev/null and b/src/images/session-windows.png differ

http://git-wip-us.apache.org/repos/asf/beam-site/blob/ddb60795/src/images/sliding-time-windows.png
----------------------------------------------------------------------
diff --git a/src/images/sliding-time-windows.png 
b/src/images/sliding-time-windows.png
new file mode 100644
index 0000000..056732b
Binary files /dev/null and b/src/images/sliding-time-windows.png differ

http://git-wip-us.apache.org/repos/asf/beam-site/blob/ddb60795/src/images/unwindowed-pipeline-bounded.png
----------------------------------------------------------------------
diff --git a/src/images/unwindowed-pipeline-bounded.png 
b/src/images/unwindowed-pipeline-bounded.png
new file mode 100644
index 0000000..7725f34
Binary files /dev/null and b/src/images/unwindowed-pipeline-bounded.png differ

http://git-wip-us.apache.org/repos/asf/beam-site/blob/ddb60795/src/images/windowing-pipeline-bounded.png
----------------------------------------------------------------------
diff --git a/src/images/windowing-pipeline-bounded.png 
b/src/images/windowing-pipeline-bounded.png
new file mode 100644
index 0000000..198ed11
Binary files /dev/null and b/src/images/windowing-pipeline-bounded.png differ

http://git-wip-us.apache.org/repos/asf/beam-site/blob/ddb60795/src/images/windowing-pipeline-unbounded.png
----------------------------------------------------------------------
diff --git a/src/images/windowing-pipeline-unbounded.png 
b/src/images/windowing-pipeline-unbounded.png
new file mode 100644
index 0000000..b5c5ee0
Binary files /dev/null and b/src/images/windowing-pipeline-unbounded.png differ

Reply via email to