Regenerate website

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/24eb9127
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/24eb9127
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/24eb9127

Branch: refs/heads/asf-site
Commit: 24eb91271785ff16d09c3b69959c7c6f8a9d7e20
Parents: 3627a44
Author: Davor Bonaci <da...@google.com>
Authored: Wed Nov 23 22:07:33 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 22:07:33 2016 -0800

----------------------------------------------------------------------
 .../documentation/programming-guide/index.html  | 501 ++++++++++++++++++-
 1 file changed, 498 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/24eb9127/content/documentation/programming-guide/index.html
----------------------------------------------------------------------
diff --git a/content/documentation/programming-guide/index.html 
b/content/documentation/programming-guide/index.html
index d941e42..aa97bb6 100644
--- a/content/documentation/programming-guide/index.html
+++ b/content/documentation/programming-guide/index.html
@@ -172,6 +172,7 @@
       <li><a href="#transforms-pardo">Using ParDo</a></li>
       <li><a href="#transforms-gbk">Using GroupByKey</a></li>
       <li><a href="#transforms-combine">Using Combine</a></li>
+      <li><a href="#transforms-flatten-partition">Using Flatten and 
Partition</a></li>
       <li><a href="#transforms-usercodereqs">General Requirements for Writing 
User Code for Beam Transforms</a></li>
       <li><a href="#transforms-sideio">Side Inputs and Side Outputs</a></li>
     </ul>
@@ -364,7 +365,7 @@
 </code></pre>
 </div>
 
-<p>The resulting workflow graph from the branching pipeline abouve looks like 
this:</p>
+<p>The resulting workflow graph from the branching pipeline above looks like 
this:</p>
 
 <p>[Branching Graph Graphic]</p>
 
@@ -382,7 +383,7 @@
   <li><code class="highlighter-rouge">ParDo</code></li>
   <li><code class="highlighter-rouge">GroupByKey</code></li>
   <li><code class="highlighter-rouge">Combine</code></li>
-  <li><code class="highlighter-rouge">Flatten</code></li>
+  <li><code class="highlighter-rouge">Flatten</code> and <code 
class="highlighter-rouge">Partition</code></li>
 </ul>
 
 <h4 id="a-nametransforms-pardoapardo"><a name="transforms-pardo"></a>ParDo</h4>
@@ -552,6 +553,270 @@ tree, [2]
 
 <h4 id="a-nametransforms-combineausing-combine"><a 
name="transforms-combine"></a>Using Combine</h4>
 
+<p><span class="language-java"><a 
href="/documentation/sdks/javadoc/0.3.0-incubating/index.html?org/apache/beam/sdk/transforms/Combine.html"><code
 class="highlighter-rouge">Combine</code></a></span><span 
class="language-python"><a 
href="https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/transforms/core.py";><code
 class="highlighter-rouge">Combine</code></a></span> is a Beam transform for 
combining collections of elements or values in your data. <code 
class="highlighter-rouge">Combine</code> has variants that work on entire <code 
class="highlighter-rouge">PCollection</code>s, and some that combine the values 
for each key in <code class="highlighter-rouge">PCollection</code>s of 
key/value pairs.</p>
+
+<p>When you apply a <code class="highlighter-rouge">Combine</code> transform, 
you must provide the function that contains the logic for combining the 
elements or values. The combining function should be commutative and 
associative, as the function is not necessarily invoked exactly once on all 
values with a given key. Because the input data (including the value 
collection) may be distributed across multiple workers, the combining function 
might be called multiple times to perform partial combining on subsets of the 
value collection. The Beam SDK also provides some pre-built combine functions 
for common numeric combination operations such as sum, min, and max.</p>
+
+<p>Simple combine operations, such as sums, can usually be implemented as a 
simple function. More complex combination operations might require you to 
create a subclass of <code class="highlighter-rouge">CombineFn</code> that has 
an accumulation type distinct from the input/output type.</p>
+
+<h5 id="simple-combinations-using-simple-functions"><strong>Simple 
Combinations Using Simple Functions</strong></h5>
+
+<p>The following example code shows a simple combine function.</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="c1">// Sum a collection of Integer values. 
The function SumInts implements the interface SerializableFunction.</span>
+<span class="kd">public</span> <span class="kd">static</span> <span 
class="kd">class</span> <span class="nc">SumInts</span> <span 
class="kd">implements</span> <span class="n">SerializableFunction</span><span 
class="o">&lt;</span><span class="n">Iterable</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;,</span> 
<span class="n">Integer</span><span class="o">&gt;</span> <span 
class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Integer</span> <span 
class="nf">apply</span><span class="o">(</span><span 
class="n">Iterable</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">input</span><span class="o">)</span> <span class="o">{</span>
+    <span class="kt">int</span> <span class="n">sum</span> <span 
class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+    <span class="k">for</span> <span class="o">(</span><span 
class="kt">int</span> <span class="n">item</span> <span class="o">:</span> 
<span class="n">input</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">sum</span> <span class="o">+=</span> <span 
class="n">item</span><span class="o">;</span>
+    <span class="o">}</span>
+    <span class="k">return</span> <span class="n">sum</span><span 
class="o">;</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre 
class="highlight"><code><span class="c"># A bounded sum of positive 
integers.</span>
+<span class="k">def</span> <span class="nf">bounded_sum</span><span 
class="p">(</span><span class="n">values</span><span class="p">,</span> <span 
class="n">bound</span><span class="o">=</span><span class="mi">500</span><span 
class="p">):</span>
+  <span class="k">return</span> <span class="nb">min</span><span 
class="p">(</span><span class="nb">sum</span><span class="p">(</span><span 
class="n">values</span><span class="p">),</span> <span 
class="n">bound</span><span class="p">)</span>
+</code></pre>
+</div>
+
+<h5 id="advanced-combinations-using-combinefn"><strong>Advanced Combinations 
using CombineFn</strong></h5>
+
+<p>For more complex combine functions, you can define a subclass of <code 
class="highlighter-rouge">CombineFn</code>. You should use <code 
class="highlighter-rouge">CombineFn</code> if the combine function requires a 
more sophisticated accumulator, must perform additional pre- or 
post-processing, might change the output type, or takes the key into 
account.</p>
+
+<p>A general combining operation consists of four operations. When you create 
a subclass of <code class="highlighter-rouge">CombineFn</code>, you must 
provide four operations by overriding the corresponding methods:</p>
+
+<ol>
+  <li>
+    <p><strong>Create Accumulator</strong> creates a new “local” 
accumulator. In the example case, taking a mean average, a local accumulator 
tracks the running sum of values (the numerator value for our final average 
division) and the number of values summed so far (the denominator value). It 
may be called any number of times in a distributed fashion.</p>
+  </li>
+  <li>
+    <p><strong>Add Input</strong> adds an input element to an accumulator, 
returning the accumulator value. In our example, it would update the sum and 
increment the count. It may also be invoked in parallel.</p>
+  </li>
+  <li>
+    <p><strong>Merge Accumulators</strong> merges several accumulators into a 
single accumulator; this is how data in multiple accumulators is combined 
before the final calculation. In the case of the mean average computation, the 
accumulators representing each portion of the division are merged together. It 
may be called again on its outputs any number of times.</p>
+  </li>
+  <li>
+    <p><strong>Extract Output</strong> performs the final computation. In the 
case of computing a mean average, this means dividing the combined sum of all 
the values by the number of values summed. It is called once on the final, 
merged accumulator.</p>
+  </li>
+</ol>
+
+<p>The following example code shows how to define a <code 
class="highlighter-rouge">CombineFn</code> that computes a mean average:</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="kd">public</span> <span 
class="kd">class</span> <span class="nc">AverageFn</span> <span 
class="kd">extends</span> <span class="n">CombineFn</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> 
<span class="n">AverageFn</span><span class="o">.</span><span 
class="na">Accum</span><span class="o">,</span> <span 
class="n">Double</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="kd">static</span> <span 
class="kd">class</span> <span class="nc">Accum</span> <span class="o">{</span>
+    <span class="kt">int</span> <span class="n">sum</span> <span 
class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+    <span class="kt">int</span> <span class="n">count</span> <span 
class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+  <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Accum</span> <span 
class="nf">createAccumulator</span><span class="o">()</span> <span 
class="o">{</span> <span class="k">return</span> <span class="k">new</span> 
<span class="n">Accum</span><span class="o">();</span> <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Accum</span> <span 
class="nf">addInput</span><span class="o">(</span><span class="n">Accum</span> 
<span class="n">accum</span><span class="o">,</span> <span 
class="n">Integer</span> <span class="n">input</span><span class="o">)</span> 
<span class="o">{</span>
+      <span class="n">accum</span><span class="o">.</span><span 
class="na">sum</span> <span class="o">+=</span> <span 
class="n">input</span><span class="o">;</span>
+      <span class="n">accum</span><span class="o">.</span><span 
class="na">count</span><span class="o">++;</span>
+      <span class="k">return</span> <span class="n">accum</span><span 
class="o">;</span>
+  <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Accum</span> <span 
class="nf">mergeAccumulators</span><span class="o">(</span><span 
class="n">Iterable</span><span class="o">&lt;</span><span 
class="n">Accum</span><span class="o">&gt;</span> <span 
class="n">accums</span><span class="o">)</span> <span class="o">{</span>
+    <span class="n">Accum</span> <span class="n">merged</span> <span 
class="o">=</span> <span class="n">createAccumulator</span><span 
class="o">();</span>
+    <span class="k">for</span> <span class="o">(</span><span 
class="n">Accum</span> <span class="n">accum</span> <span class="o">:</span> 
<span class="n">accums</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">merged</span><span class="o">.</span><span 
class="na">sum</span> <span class="o">+=</span> <span 
class="n">accum</span><span class="o">.</span><span class="na">sum</span><span 
class="o">;</span>
+      <span class="n">merged</span><span class="o">.</span><span 
class="na">count</span> <span class="o">+=</span> <span 
class="n">accum</span><span class="o">.</span><span 
class="na">count</span><span class="o">;</span>
+    <span class="o">}</span>
+    <span class="k">return</span> <span class="n">merged</span><span 
class="o">;</span>
+  <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Double</span> <span 
class="nf">extractOutput</span><span class="o">(</span><span 
class="n">Accum</span> <span class="n">accum</span><span class="o">)</span> 
<span class="o">{</span>
+    <span class="k">return</span> <span class="o">((</span><span 
class="kt">double</span><span class="o">)</span> <span 
class="n">accum</span><span class="o">.</span><span class="na">sum</span><span 
class="o">)</span> <span class="o">/</span> <span class="n">accum</span><span 
class="o">.</span><span class="na">count</span><span class="o">;</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre 
class="highlight"><code><span class="n">pc</span> <span class="o">=</span> 
<span class="o">...</span>
+<span class="k">class</span> <span class="nc">AverageFn</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">CombineFn</span><span class="p">):</span>
+  <span class="k">def</span> <span class="nf">create_accumulator</span><span 
class="p">(</span><span class="bp">self</span><span class="p">):</span>
+    <span class="k">return</span> <span class="p">(</span><span 
class="mf">0.0</span><span class="p">,</span> <span class="mi">0</span><span 
class="p">)</span>
+
+  <span class="k">def</span> <span class="nf">add_input</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="p">(</span><span class="nb">sum</span><span class="p">,</span> <span 
class="n">count</span><span class="p">),</span> <span 
class="nb">input</span><span class="p">):</span>
+    <span class="k">return</span> <span class="nb">sum</span> <span 
class="o">+</span> <span class="nb">input</span><span class="p">,</span> <span 
class="n">count</span> <span class="o">+</span> <span class="mi">1</span>
+
+  <span class="k">def</span> <span class="nf">merge_accumulators</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">accumulators</span><span class="p">):</span>
+    <span class="n">sums</span><span class="p">,</span> <span 
class="n">counts</span> <span class="o">=</span> <span 
class="nb">zip</span><span class="p">(</span><span class="o">*</span><span 
class="n">accumulators</span><span class="p">)</span>
+    <span class="k">return</span> <span class="nb">sum</span><span 
class="p">(</span><span class="n">sums</span><span class="p">),</span> <span 
class="nb">sum</span><span class="p">(</span><span class="n">counts</span><span 
class="p">)</span>
+
+  <span class="k">def</span> <span class="nf">extract_output</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="p">(</span><span class="nb">sum</span><span class="p">,</span> <span 
class="n">count</span><span class="p">)):</span>
+    <span class="k">return</span> <span class="nb">sum</span> <span 
class="o">/</span> <span class="n">count</span> <span class="k">if</span> <span 
class="n">count</span> <span class="k">else</span> <span 
class="nb">float</span><span class="p">(</span><span 
class="s">'NaN'</span><span class="p">)</span>
+</code></pre>
+</div>
+
+<p>If you are combining a <code class="highlighter-rouge">PCollection</code> 
of key-value pairs, <a href="#transforms-combine-per-key">per-key combining</a> 
is often enough. If you need the combining strategy to change based on the key 
(for example, MIN for some users and MAX for other users), you can define a 
<code class="highlighter-rouge">KeyedCombineFn</code> to access the key within 
the combining strategy.</p>
+
+<h5 id="combining-a-pcollection-into-a-single-value"><strong>Combining a 
PCollection into a Single Value</strong></h5>
+
+<p>Use the global combine to transform all of the elements in a given <code 
class="highlighter-rouge">PCollection</code> into a single value, represented 
in your pipeline as a new <code class="highlighter-rouge">PCollection</code> 
containing one element. The following example code shows how to apply the Beam 
provided sum combine function to produce a single sum value for a <code 
class="highlighter-rouge">PCollection</code> of integers.</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="c1">// Sum.SumIntegerFn() combines the 
elements in the input PCollection.</span>
+<span class="c1">// The resulting PCollection, called sum, contains one value: 
the sum of all the elements in the input PCollection.</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span class="n">pc</span> 
<span class="o">=</span> <span class="o">...;</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span class="n">sum</span> 
<span class="o">=</span> <span class="n">pc</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span>
+   <span class="n">Combine</span><span class="o">.</span><span 
class="na">globally</span><span class="o">(</span><span class="k">new</span> 
<span class="n">Sum</span><span class="o">.</span><span 
class="na">SumIntegerFn</span><span class="o">()));</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre 
class="highlight"><code><span class="c"># sum combines the elements in the 
input PCollection.</span>
+<span class="c"># The resulting PCollection, called result, contains one 
value: the sum of all the elements in the input PCollection.</span>
+<span class="n">pc</span> <span class="o">=</span> <span class="o">...</span>
+<span class="n">result</span> <span class="o">=</span> <span 
class="n">pc</span> <span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">CombineGlobally</span><span 
class="p">(</span><span class="nb">sum</span><span class="p">)</span>
+</code></pre>
+</div>
+
+<h5 id="global-windowing">Global Windowing:</h5>
+
+<p>If your input <code class="highlighter-rouge">PCollection</code> uses the 
default global windowing, the default behavior is to return a <code 
class="highlighter-rouge">PCollection</code> containing one item. That item’s 
value comes from the accumulator in the combine function that you specified 
when applying <code class="highlighter-rouge">Combine</code>. For example, the 
Beam provided sum combine function returns a zero value (the sum of an empty 
input), while the min combine function returns a maximal or infinite value.</p>
+
+<p>To have <code class="highlighter-rouge">Combine</code> instead return an 
empty <code class="highlighter-rouge">PCollection</code> if the input is empty, 
specify <code class="highlighter-rouge">.withoutDefaults</code> when you apply 
your <code class="highlighter-rouge">Combine</code> transform, as in the 
following code example:</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="n">PCollection</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> 
<span class="n">pc</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span class="n">sum</span> 
<span class="o">=</span> <span class="n">pc</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span>
+  <span class="n">Combine</span><span class="o">.</span><span 
class="na">globally</span><span class="o">(</span><span class="k">new</span> 
<span class="n">Sum</span><span class="o">.</span><span 
class="na">SumIntegerFn</span><span class="o">()).</span><span 
class="na">withoutDefaults</span><span class="o">());</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre 
class="highlight"><code><span class="n">pc</span> <span class="o">=</span> 
<span class="o">...</span>
+<span class="nb">sum</span> <span class="o">=</span> <span class="n">pc</span> 
<span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">CombineGlobally</span><span 
class="p">(</span><span class="nb">sum</span><span class="p">)</span><span 
class="o">.</span><span class="n">without_defaults</span><span 
class="p">()</span>
+
+</code></pre>
+</div>
+
+<h5 id="non-global-windowing">Non-Global Windowing:</h5>
+
+<p>If your <code class="highlighter-rouge">PCollection</code> uses any 
non-global windowing function, Beam does not provide the default behavior. You 
must specify one of the following options when applying <code 
class="highlighter-rouge">Combine</code>:</p>
+
+<ul>
+  <li>Specify <code class="highlighter-rouge">.withoutDefaults</code>, where 
windows that are empty in the input <code 
class="highlighter-rouge">PCollection</code> will likewise be empty in the 
output collection.</li>
+  <li>Specify <code class="highlighter-rouge">.asSingletonView</code>, in 
which the output is immediately converted to a <code 
class="highlighter-rouge">PCollectionView</code>, which will provide a default 
value for each empty window when used as a side input. You’ll generally only 
need to use this option if the result of your pipeline’s <code 
class="highlighter-rouge">Combine</code> is to be used as a side input later in 
the pipeline.</li>
+</ul>
+
+<h5 
id="a-nametransforms-combine-per-keyacombining-values-in-a-key-grouped-collection"><a
 name="transforms-combine-per-key"></a><strong>Combining Values in a 
Key-Grouped Collection</strong></h5>
+
+<p>After creating a key-grouped collection (for example, by using a <code 
class="highlighter-rouge">GroupByKey</code> transform) a common pattern is to 
combine the collection of values associated with each key into a single, merged 
value. Drawing on the previous example from <code 
class="highlighter-rouge">GroupByKey</code>, a key-grouped <code 
class="highlighter-rouge">PCollection</code> called <code 
class="highlighter-rouge">groupedWords</code> looks like this:</p>
+
+<div class="highlighter-rouge"><pre class="highlight"><code>  cat, [1,5,9]
+  dog, [5,2]
+  and, [1,2,6]
+  jump, [3]
+  tree, [2]
+  ...
+</code></pre>
+</div>
+
+<p>In the above <code class="highlighter-rouge">PCollection</code>, each 
element has a string key (for example, “cat”) and an iterable of integers 
for its value (in the first element, containing [1, 5, 9]). If our pipeline’s 
next processing step combines the values (rather than considering them 
individually), you can combine the iterable of integers to create a single, 
merged value to be paired with each key. This pattern of a <code 
class="highlighter-rouge">GroupByKey</code> followed by merging the collection 
of values is equivalent to Beam’s Combine PerKey transform. The combine 
function you supply to Combine PerKey must be an associative reduction function 
or a subclass of <code class="highlighter-rouge">CombineFn</code>.</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="c1">// PCollection is grouped by key and 
the Double values associated with each key are combined into a Double.</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Double</span><span class="o">&gt;&gt;</span> <span 
class="n">salesRecords</span> <span class="o">=</span> <span 
class="o">...;</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Double</span><span class="o">&gt;&gt;</span> <span 
class="n">totalSalesPerPerson</span> <span class="o">=</span>
+  <span class="n">salesRecords</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="n">Combine</span><span class="o">.&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Double</span><span class="o">,</span> <span 
class="n">Double</span><span class="o">&gt;</span><span 
class="n">perKey</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">Sum</span><span 
class="o">.</span><span class="na">SumDoubleFn</span><span 
class="o">()));</span>
+
+<span class="c1">// The combined value is of a different type than the 
original collection of values per key.</span>
+<span class="c1">// PCollection has keys of type String and values of type 
Integer, and the combined value is a Double.</span>
+
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">playerAccuracy</span> <span class="o">=</span> <span 
class="o">...;</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Double</span><span class="o">&gt;&gt;</span> <span 
class="n">avgAccuracyPerPlayer</span> <span class="o">=</span>
+  <span class="n">playerAccuracy</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="n">Combine</span><span class="o">.&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Double</span><span class="o">&gt;</span><span 
class="n">perKey</span><span class="o">(</span>
+    <span class="k">new</span> <span class="nf">MeanInts</span><span 
class="o">())));</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre 
class="highlight"><code><span class="c"># PCollection is grouped by key and the 
numeric values associated with each key are averaged into a float.</span>
+<span class="n">player_accuracies</span> <span class="o">=</span> <span 
class="o">...</span>
+<span class="n">avg_accuracy_per_player</span> <span class="o">=</span> <span 
class="p">(</span><span class="n">player_accuracies</span>
+                           <span class="o">|</span> <span 
class="n">beam</span><span class="o">.</span><span 
class="n">CombinePerKey</span><span class="p">(</span>
+                               <span class="n">beam</span><span 
class="o">.</span><span class="n">combiners</span><span class="o">.</span><span 
class="n">MeanCombineFn</span><span class="p">()))</span>
+</code></pre>
+</div>
+
+<h4 id="a-nametransforms-flatten-partitionausing-flatten-and-partition"><a 
name="transforms-flatten-partition"></a>Using Flatten and Partition</h4>
+
+<p><span class="language-java"><a 
href="/documentation/sdks/javadoc/0.3.0-incubating/index.html?org/apache/beam/sdk/transforms/Flatten.html"><code
 class="highlighter-rouge">Flatten</code></a></span><span 
class="language-python"><a 
href="https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/transforms/core.py";><code
 class="highlighter-rouge">Flatten</code></a></span> and <span 
class="language-java"><a 
href="/documentation/sdks/javadoc/0.3.0-incubating/index.html?org/apache/beam/sdk/transforms/Partition.html"><code
 class="highlighter-rouge">Partition</code></a></span><span 
class="language-python"><a 
href="https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/transforms/core.py";><code
 class="highlighter-rouge">Partition</code></a></span> are Beam transforms for 
<code class="highlighter-rouge">PCollection</code> objects that store the same 
data type. <code class="highlighter-rouge">Flatten</code> merges multiple <code 
class="highligh
 ter-rouge">PCollection</code> objects into a single logical <code 
class="highlighter-rouge">PCollection</code>, and <code 
class="highlighter-rouge">Partition</code> splits a single <code 
class="highlighter-rouge">PCollection</code> into a fixed number of smaller 
collections.</p>
+
+<h5 id="flatten"><strong>Flatten</strong></h5>
+
+<p>The following example shows how to apply a <code 
class="highlighter-rouge">Flatten</code> transform to merge multiple <code 
class="highlighter-rouge">PCollection</code> objects.</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="c1">// Flatten takes a PCollectionList of 
PCollection objects of a given type.</span>
+<span class="c1">// Returns a single PCollection that contains all of the 
elements in the PCollection objects in that list.</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">pc1</span> 
<span class="o">=</span> <span class="o">...;</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">pc2</span> 
<span class="o">=</span> <span class="o">...;</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">pc3</span> 
<span class="o">=</span> <span class="o">...;</span>
+<span class="n">PCollectionList</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">collections</span> <span class="o">=</span> <span 
class="n">PCollectionList</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="n">pc1</span><span 
class="o">).</span><span class="na">and</span><span class="o">(</span><span 
class="n">pc2</span><span class="o">).</span><span class="na">and</span><span 
class="o">(</span><span class="n">pc3</span><span class="o">);</span>
+
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">merged</span> <span class="o">=</span> <span 
class="n">collections</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="n">Flatten</span><span class="o">.&lt;</span><span 
class="n">String</span><span class="o">&gt;</span><span 
class="n">pCollections</span><span class="o">());</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre 
class="highlight"><code><span class="c"># Flatten takes a tuple of PCollection 
objects.</span>
+<span class="c"># Returns a single PCollection that contains all of the 
elements in the PCollection objects in that tuple.</span>
+<span class="n">merged</span> <span class="o">=</span> <span class="p">(</span>
+    <span class="p">(</span><span class="n">pcoll1</span><span 
class="p">,</span> <span class="n">pcoll2</span><span class="p">,</span> <span 
class="n">pcoll3</span><span class="p">)</span>
+    <span class="c"># A list of tuples can be "piped" directly into a Flatten 
transform.</span>
+    <span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Flatten</span><span class="p">())</span>
+</code></pre>
+</div>
+
+<h5 id="data-encoding-in-merged-collections">Data Encoding in Merged 
Collections:</h5>
+
+<p>By default, the coder for the output <code 
class="highlighter-rouge">PCollection</code> is the same as the coder for the 
first <code class="highlighter-rouge">PCollection</code> in the input <code 
class="highlighter-rouge">PCollectionList</code>. However, the input <code 
class="highlighter-rouge">PCollection</code> objects can each use different 
coders, as long as they all contain the same data type in your chosen 
language.</p>
+
+<h5 id="merging-windowed-collections">Merging Windowed Collections:</h5>
+
+<p>When using <code class="highlighter-rouge">Flatten</code> to merge <code 
class="highlighter-rouge">PCollection</code> objects that have a windowing 
strategy applied, all of the <code class="highlighter-rouge">PCollection</code> 
objects you want to merge must use a compatible windowing strategy and window 
sizing. For example, all the collections you’re merging must all use 
(hypothetically) identical 5-minute fixed windows or 4-minute sliding windows 
starting every 30 seconds.</p>
+
+<p>If your pipeline attempts to use <code 
class="highlighter-rouge">Flatten</code> to merge <code 
class="highlighter-rouge">PCollection</code> objects with incompatible windows, 
Beam generates an <code class="highlighter-rouge">IllegalStateException</code> 
error when your pipeline is constructed.</p>
+
+<h5 id="partition"><strong>Partition</strong></h5>
+
+<p><code class="highlighter-rouge">Partition</code> divides the elements of a 
<code class="highlighter-rouge">PCollection</code> according to a partitioning 
function that you provide. The partitioning function contains the logic that 
determines how to split up the elements of the input <code 
class="highlighter-rouge">PCollection</code> into each resulting partition 
<code class="highlighter-rouge">PCollection</code>. The number of partitions 
must be determined at graph construction time. You can, for example, pass the 
number of partitions as a command-line option at runtime (which will then be 
used to build your pipeline graph), but you cannot determine the number of 
partitions in mid-pipeline (based on data calculated after your pipeline graph 
is constructed, for instance).</p>
+
+<p>The following example divides a <code 
class="highlighter-rouge">PCollection</code> into percentile groups.</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="c1">// Provide an int value with the 
desired number of result partitions, and a PartitionFn that represents the 
partitioning function.</span>
+<span class="c1">// In this example, we define the PartitionFn in-line.</span>
+<span class="c1">// Returns a PCollectionList containing each of the resulting 
partitions as individual PCollection objects.</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">Student</span><span class="o">&gt;</span> <span 
class="n">students</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="c1">// Split students up into 10 partitions, by percentile:</span>
+<span class="n">PCollectionList</span><span class="o">&lt;</span><span 
class="n">Student</span><span class="o">&gt;</span> <span 
class="n">studentsByPercentile</span> <span class="o">=</span>
+    <span class="n">students</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="n">Partition</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="mi">10</span><span 
class="o">,</span> <span class="k">new</span> <span 
class="n">PartitionFn</span><span class="o">&lt;</span><span 
class="n">Student</span><span class="o">&gt;()</span> <span class="o">{</span>
+        <span class="kd">public</span> <span class="kt">int</span> <span 
class="nf">partitionFor</span><span class="o">(</span><span 
class="n">Student</span> <span class="n">student</span><span class="o">,</span> 
<span class="kt">int</span> <span class="n">numPartitions</span><span 
class="o">)</span> <span class="o">{</span>
+            <span class="k">return</span> <span class="n">student</span><span 
class="o">.</span><span class="na">getPercentile</span><span 
class="o">()</span>  <span class="c1">// 0..99</span>
+                 <span class="o">*</span> <span class="n">numPartitions</span> 
<span class="o">/</span> <span class="mi">100</span><span class="o">;</span>
+        <span class="o">}}));</span>
+
+<span class="c1">// You can extract each partition from the PCollectionList 
using the get method, as follows:</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">Student</span><span class="o">&gt;</span> <span 
class="n">fortiethPercentile</span> <span class="o">=</span> <span 
class="n">studentsByPercentile</span><span class="o">.</span><span 
class="na">get</span><span class="o">(</span><span class="mi">4</span><span 
class="o">);</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre 
class="highlight"><code><span class="c"># Provide an int value with the desired 
number of result partitions, and a partitioning function (partition_fn in this 
example).</span>
+<span class="c"># Returns a tuple of PCollection objects containing each of 
the resulting partitions as individual PCollection objects.</span>
+<span class="k">def</span> <span class="nf">partition_fn</span><span 
class="p">(</span><span class="n">student</span><span class="p">,</span> <span 
class="n">num_partitions</span><span class="p">):</span>
+  <span class="k">return</span> <span class="nb">int</span><span 
class="p">(</span><span class="n">get_percentile</span><span 
class="p">(</span><span class="n">student</span><span class="p">)</span> <span 
class="o">*</span> <span class="n">num_partitions</span> <span 
class="o">/</span> <span class="mi">100</span><span class="p">)</span>
+
+<span class="n">by_decile</span> <span class="o">=</span> <span 
class="n">students</span> <span class="o">|</span> <span 
class="n">beam</span><span class="o">.</span><span 
class="n">Partition</span><span class="p">(</span><span 
class="n">partition_fn</span><span class="p">,</span> <span 
class="mi">10</span><span class="p">)</span>
+
+<span class="c"># You can extract each partition from the tuple of PCollection 
objects as follows:</span>
+<span class="n">fortieth_percentile</span> <span class="o">=</span> <span 
class="n">by_decile</span><span class="p">[</span><span 
class="mi">4</span><span class="p">]</span>
+</code></pre>
+</div>
+
 <h4 
id="a-nametransforms-usercodereqsageneral-requirements-for-writing-user-code-for-beam-transforms"><a
 name="transforms-usercodereqs"></a>General Requirements for Writing User Code 
for Beam Transforms</h4>
 
 <p>When you build user code for a Beam transform, you should keep in mind the 
distributed nature of execution. For example, there might be many copies of 
your function running on a lot of different machines in parallel, and those 
copies function independently, without communicating or sharing state with any 
of the other copies. Depending on the Pipeline Runner and processing back-end 
you choose for your pipeline, each copy of your user code function may be 
retried or run multiple times. As such, you should be cautious about including 
things like state dependency in your user code.</p>
@@ -591,10 +856,240 @@ tree, [2]
 
 <p>It’s recommended that you make your function object idempotent–that is, 
that it can be repeated or retried as often as necessary without causing 
unintended side effects. The Beam model provides no guarantees as to the number 
of times your user code might be invoked or retried; as such, keeping your 
function object idempotent keeps your pipeline’s output deterministic, and 
your transforms’ behavior more predictable and easier to debug.</p>
 
+<h4 id="a-nametransforms-sideioaside-inputs-and-side-outputs"><a 
name="transforms-sideio"></a>Side Inputs and Side Outputs</h4>
+
+<h5 id="side-inputs"><strong>Side Inputs</strong></h5>
+
+<p>In addition to the main input <code 
class="highlighter-rouge">PCollection</code>, you can provide additional inputs 
to a <code class="highlighter-rouge">ParDo</code> transform in the form of side 
inputs. A side input is an additional input that your <code 
class="highlighter-rouge">DoFn</code> can access each time it processes an 
element in the input <code class="highlighter-rouge">PCollection</code>. When 
you specify a side input, you create a view of some other data that can be read 
from within the <code class="highlighter-rouge">ParDo</code> transform’s 
<code class="highlighter-rouge">DoFn</code> while procesing each element.</p>
+
+<p>Side inputs are useful if your <code class="highlighter-rouge">ParDo</code> 
needs to inject additional data when processing each element in the input <code 
class="highlighter-rouge">PCollection</code>, but the additional data needs to 
be determined at runtime (and not hard-coded). Such values might be determined 
by the input data, or depend on a different branch of your pipeline.</p>
+
+<h5 id="passing-side-inputs-to-pardo">Passing Side Inputs to ParDo:</h5>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code>  
<span class="c1">// Pass side inputs to your ParDo transform by invoking 
.withSideInputs.</span>
+  <span class="c1">// Inside your DoFn, access the side input by using the 
method DoFn.ProcessContext.sideInput.</span>
+
+  <span class="c1">// The input PCollection to ParDo.</span>
+  <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> 
<span class="o">=</span> <span class="o">...;</span>
+
+  <span class="c1">// A PCollection of word lengths that we'll combine into a 
single value.</span>
+  <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">wordLengths</span> <span class="o">=</span> <span 
class="o">...;</span> <span class="c1">// Singleton PCollection</span>
+
+  <span class="c1">// Create a singleton PCollectionView from wordLengths 
using Combine.globally and View.asSingleton.</span>
+  <span class="kd">final</span> <span class="n">PCollectionView</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> 
<span class="n">maxWordLengthCutOffView</span> <span class="o">=</span>
+     <span class="n">wordLengths</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="n">Combine</span><span class="o">.</span><span 
class="na">globally</span><span class="o">(</span><span class="k">new</span> 
<span class="n">Max</span><span class="o">.</span><span 
class="na">MaxIntFn</span><span class="o">()).</span><span 
class="na">asSingletonView</span><span class="o">());</span>
+
+
+  <span class="c1">// Apply a ParDo that takes maxWordLengthCutOffView as a 
side input.</span>
+  <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">wordsBelowCutOff</span> <span class="o">=</span>
+  <span class="n">words</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="n">ParDo</span><span class="o">.</span><span 
class="na">withSideInputs</span><span class="o">(</span><span 
class="n">maxWordLengthCutOffView</span><span class="o">)</span>
+                    <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span class="n">DoFn</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;()</span> <span 
class="o">{</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">processElement</span><span class="o">(</span><span 
class="n">ProcessContext</span> <span class="n">c</span><span 
class="o">)</span> <span class="o">{</span>
+        <span class="n">String</span> <span class="n">word</span> <span 
class="o">=</span> <span class="n">c</span><span class="o">.</span><span 
class="na">element</span><span class="o">();</span>
+        <span class="c1">// In our DoFn, access the side input.</span>
+        <span class="kt">int</span> <span class="n">lengthCutOff</span> <span 
class="o">=</span> <span class="n">c</span><span class="o">.</span><span 
class="na">sideInput</span><span class="o">(</span><span 
class="n">maxWordLengthCutOffView</span><span class="o">);</span>
+        <span class="k">if</span> <span class="o">(</span><span 
class="n">word</span><span class="o">.</span><span 
class="na">length</span><span class="o">()</span> <span class="o">&lt;=</span> 
<span class="n">lengthCutOff</span><span class="o">)</span> <span 
class="o">{</span>
+          <span class="n">c</span><span class="o">.</span><span 
class="na">output</span><span class="o">(</span><span 
class="n">word</span><span class="o">);</span>
+        <span class="o">}</span>
+  <span class="o">}}));</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre 
class="highlight"><code><span class="c"># Side inputs are available as extra 
arguments in the DoFn's process method or Map / FlatMap's callable.</span>
+<span class="c"># Optional, positional, and keyword arguments are all 
supported. Deferred arguments are unwrapped into their actual values.</span>
+<span class="c"># For example, using pvalue.AsIter(pcoll) at pipeline 
construction time results in an iterable of the actual elements of pcoll being 
passed into each process invocation.</span>
+<span class="c"># In this example, side inputs are passed to a FlatMap 
transform as extra arguments and consumed by filter_using_length.</span>
+
+<span class="c"># Callable takes additional arguments.</span>
+<span class="k">def</span> <span class="nf">filter_using_length</span><span 
class="p">(</span><span class="n">word</span><span class="p">,</span> <span 
class="n">lower_bound</span><span class="p">,</span> <span 
class="n">upper_bound</span><span class="o">=</span><span 
class="nb">float</span><span class="p">(</span><span 
class="s">'inf'</span><span class="p">)):</span>
+  <span class="k">if</span> <span class="n">lower_bound</span> <span 
class="o">&lt;=</span> <span class="nb">len</span><span class="p">(</span><span 
class="n">word</span><span class="p">)</span> <span class="o">&lt;=</span> 
<span class="n">upper_bound</span><span class="p">:</span>
+    <span class="k">yield</span> <span class="n">word</span>
+
+<span class="c"># Construct a deferred side input.</span>
+<span class="n">avg_word_len</span> <span class="o">=</span> <span 
class="p">(</span><span class="n">words</span>
+                <span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">Map</span><span class="p">(</span><span 
class="nb">len</span><span class="p">)</span>
+                <span class="o">|</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">CombineGlobally</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">combiners</span><span class="o">.</span><span 
class="n">MeanCombineFn</span><span class="p">()))</span>
+
+<span class="c"># Call with explicit side inputs.</span>
+<span class="n">small_words</span> <span class="o">=</span> <span 
class="n">words</span> <span class="o">|</span> <span class="s">'small'</span> 
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">FlatMap</span><span class="p">(</span><span 
class="n">filter_using_length</span><span class="p">,</span> <span 
class="mi">0</span><span class="p">,</span> <span class="mi">3</span><span 
class="p">)</span>
+
+<span class="c"># A single deferred side input.</span>
+<span class="n">larger_than_average</span> <span class="o">=</span> <span 
class="p">(</span><span class="n">words</span> <span class="o">|</span> <span 
class="s">'large'</span> <span class="o">&gt;&gt;</span> <span 
class="n">beam</span><span class="o">.</span><span 
class="n">FlatMap</span><span class="p">(</span>
+    <span class="n">filter_using_length</span><span class="p">,</span>
+    <span class="n">lower_bound</span><span class="o">=</span><span 
class="n">pvalue</span><span class="o">.</span><span 
class="n">AsSingleton</span><span class="p">(</span><span 
class="n">avg_word_len</span><span class="p">)))</span>
+
+<span class="c"># Mix and match.</span>
+<span class="n">small_but_nontrivial</span> <span class="o">=</span> <span 
class="n">words</span> <span class="o">|</span> <span 
class="n">beam</span><span class="o">.</span><span 
class="n">FlatMap</span><span class="p">(</span><span 
class="n">filter_using_length</span><span class="p">,</span>
+                                            <span 
class="n">lower_bound</span><span class="o">=</span><span 
class="mi">2</span><span class="p">,</span>
+                                            <span 
class="n">upper_bound</span><span class="o">=</span><span 
class="n">pvalue</span><span class="o">.</span><span 
class="n">AsSingleton</span><span class="p">(</span>
+                                                <span 
class="n">avg_word_len</span><span class="p">))</span>
+
+
+<span class="c"># We can also pass side inputs to a ParDo transform, which 
will get passed to its process method.</span>
+<span class="c"># The only change is that the first arguments are self and a 
context, rather than the PCollection element itself.</span>
+
+<span class="k">class</span> <span class="nc">FilterUsingLength</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">DoFn</span><span class="p">):</span>
+  <span class="k">def</span> <span class="nf">process</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">context</span><span class="p">,</span> <span 
class="n">lower_bound</span><span class="p">,</span> <span 
class="n">upper_bound</span><span class="o">=</span><span 
class="nb">float</span><span class="p">(</span><span 
class="s">'inf'</span><span class="p">)):</span>
+    <span class="k">if</span> <span class="n">lower_bound</span> <span 
class="o">&lt;=</span> <span class="nb">len</span><span class="p">(</span><span 
class="n">context</span><span class="o">.</span><span 
class="n">element</span><span class="p">)</span> <span class="o">&lt;=</span> 
<span class="n">upper_bound</span><span class="p">:</span>
+      <span class="k">yield</span> <span class="n">context</span><span 
class="o">.</span><span class="n">element</span>
+
+<span class="n">small_words</span> <span class="o">=</span> <span 
class="n">words</span> <span class="o">|</span> <span 
class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span 
class="p">(</span><span class="n">FilterUsingLength</span><span 
class="p">(),</span> <span class="mi">0</span><span class="p">,</span> <span 
class="mi">3</span><span class="p">)</span>
+<span class="o">...</span>
+
+</code></pre>
+</div>
+
+<h5 id="side-inputs-and-windowing">Side Inputs and Windowing:</h5>
+
+<p>A windowed <code class="highlighter-rouge">PCollection</code> may be 
infinite and thus cannot be compressed into a single value (or single 
collection class). When you create a <code 
class="highlighter-rouge">PCollectionView</code> of a windowed <code 
class="highlighter-rouge">PCollection</code>, the <code 
class="highlighter-rouge">PCollectionView</code> represents a single entity per 
window (one singleton per window, one list per window, etc.).</p>
+
+<p>Beam uses the window(s) for the main input element to look up the 
appropriate window for the side input element. Beam projects the main input 
element’s window into the side input’s window set, and then uses the side 
input from the resulting window. If the main input and side inputs have 
identical windows, the projection provides the exact corresponding window. 
However, if the inputs have different windows, Beam uses the projection to 
choose the most appropriate side input window.</p>
+
+<p>For example, if the main input is windowed using fixed-time windows of one 
minute, and the side input is windowed using fixed-time windows of one hour, 
Beam projects the main input window against the side input window set and 
selects the side input value from the appropriate hour-long side input 
window.</p>
+
+<p>If the main input element exists in more than one window, then <code 
class="highlighter-rouge">processElement</code> gets called multiple times, 
once for each window. Each call to <code 
class="highlighter-rouge">processElement</code> projects the “current” 
window for the main input element, and thus might provide a different view of 
the side input each time.</p>
+
+<p>If the side input has multiple trigger firings, Beam uses the value from 
the latest trigger firing. This is particularly useful if you use a side input 
with a single global window and specify a trigger.</p>
+
+<h5 id="side-outputs"><strong>Side Outputs</strong></h5>
+
+<p>While <code class="highlighter-rouge">ParDo</code> always produces a main 
output <code class="highlighter-rouge">PCollection</code> (as the return value 
from apply), you can also have your <code 
class="highlighter-rouge">ParDo</code> produce any number of additional output 
<code class="highlighter-rouge">PCollection</code>s. If you choose to have 
multiple outputs, your <code class="highlighter-rouge">ParDo</code> returns all 
of the output <code class="highlighter-rouge">PCollection</code>s (including 
the main output) bundled together.</p>
+
+<h5 id="tags-for-side-outputs">Tags for Side Outputs:</h5>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="c1">// To emit elements to a side output 
PCollection, create a TupleTag object to identify each collection that your 
ParDo produces.</span>
+<span class="c1">// For example, if your ParDo produces three output 
PCollections (the main output and two side outputs), you must create three 
TupleTags.</span>
+<span class="c1">// The following example code shows how to create TupleTags 
for a ParDo with a main output and two side outputs:</span>
+
+  <span class="c1">// Input PCollection to our ParDo.</span>
+  <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> 
<span class="o">=</span> <span class="o">...;</span>
+
+  <span class="c1">// The ParDo will filter words whose length is below a 
cutoff and add them to</span>
+  <span class="c1">// the main ouput PCollection&lt;String&gt;.</span>
+  <span class="c1">// If a word is above the cutoff, the ParDo will add the 
word length to a side output</span>
+  <span class="c1">// PCollection&lt;Integer&gt;.</span>
+  <span class="c1">// If a word starts with the string "MARKER", the ParDo 
will add that word to a different</span>
+  <span class="c1">// side output PCollection&lt;String&gt;.</span>
+  <span class="kd">final</span> <span class="kt">int</span> <span 
class="n">wordLengthCutOff</span> <span class="o">=</span> <span 
class="mi">10</span><span class="o">;</span>
+
+  <span class="c1">// Create the TupleTags for the main and side 
outputs.</span>
+  <span class="c1">// Main output.</span>
+  <span class="kd">final</span> <span class="n">TupleTag</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> 
<span class="n">wordsBelowCutOffTag</span> <span class="o">=</span>
+      <span class="k">new</span> <span class="n">TupleTag</span><span 
class="o">&lt;</span><span class="n">String</span><span 
class="o">&gt;(){};</span>
+  <span class="c1">// Word lengths side output.</span>
+  <span class="kd">final</span> <span class="n">TupleTag</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> 
<span class="n">wordLengthsAboveCutOffTag</span> <span class="o">=</span>
+      <span class="k">new</span> <span class="n">TupleTag</span><span 
class="o">&lt;</span><span class="n">Integer</span><span 
class="o">&gt;(){};</span>
+  <span class="c1">// "MARKER" words side output.</span>
+  <span class="kd">final</span> <span class="n">TupleTag</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> 
<span class="n">markedWordsTag</span> <span class="o">=</span>
+      <span class="k">new</span> <span class="n">TupleTag</span><span 
class="o">&lt;</span><span class="n">String</span><span 
class="o">&gt;(){};</span>
+
+<span class="c1">// Passing Output Tags to ParDo:</span>
+<span class="c1">// After you specify the TupleTags for each of your ParDo 
outputs, pass the tags to your ParDo by invoking .withOutputTags.</span>
+<span class="c1">// You pass the tag for the main output first, and then the 
tags for any side outputs in a TupleTagList.</span>
+<span class="c1">// Building on our previous example, we pass the three 
TupleTags (one for the main output and two for the side outputs) to our 
ParDo.</span>
+<span class="c1">// Note that all of the outputs (including the main output 
PCollection) are bundled into the returned PCollectionTuple.</span>
+
+  <span class="n">PCollectionTuple</span> <span class="n">results</span> <span 
class="o">=</span>
+      <span class="n">words</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span>
+          <span class="n">ParDo</span>
+          <span class="c1">// Specify the tag for the main output, 
wordsBelowCutoffTag.</span>
+          <span class="o">.</span><span class="na">withOutputTags</span><span 
class="o">(</span><span class="n">wordsBelowCutOffTag</span><span 
class="o">,</span>
+          <span class="c1">// Specify the tags for the two side outputs as a 
TupleTagList.</span>
+                          <span class="n">TupleTagList</span><span 
class="o">.</span><span class="na">of</span><span class="o">(</span><span 
class="n">wordLengthsAboveCutOffTag</span><span class="o">)</span>
+                                      <span class="o">.</span><span 
class="na">and</span><span class="o">(</span><span 
class="n">markedWordsTag</span><span class="o">))</span>
+          <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span class="n">DoFn</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;()</span> <span 
class="o">{</span>
+            <span class="c1">// DoFn continues here.</span>
+            <span class="o">...</span>
+          <span class="o">}</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre 
class="highlight"><code><span class="c"># To emit elements to a side output 
PCollection, invoke with_outputs() on the ParDo, optionally specifying the 
expected tags for the output.</span>
+<span class="c"># with_outputs() returns a DoOutputsTuple object. Tags 
specified in with_outputs are attributes on the returned DoOutputsTuple 
object.</span>
+<span class="c"># The tags give access to the corresponding output 
PCollections.</span>
+
+<span class="n">results</span> <span class="o">=</span> <span 
class="p">(</span><span class="n">words</span> <span class="o">|</span> <span 
class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span 
class="p">(</span><span class="n">ProcessWords</span><span class="p">(),</span> 
<span class="n">cutoff_length</span><span class="o">=</span><span 
class="mi">2</span><span class="p">,</span> <span class="n">marker</span><span 
class="o">=</span><span class="s">'x'</span><span class="p">)</span>
+           <span class="o">.</span><span class="n">with_outputs</span><span 
class="p">(</span><span class="s">'above_cutoff_lengths'</span><span 
class="p">,</span> <span class="s">'marked strings'</span><span 
class="p">,</span>
+                         <span class="n">main</span><span 
class="o">=</span><span class="s">'below_cutoff_strings'</span><span 
class="p">))</span>
+<span class="n">below</span> <span class="o">=</span> <span 
class="n">results</span><span class="o">.</span><span 
class="n">below_cutoff_strings</span>
+<span class="n">above</span> <span class="o">=</span> <span 
class="n">results</span><span class="o">.</span><span 
class="n">above_cutoff_lengths</span>
+<span class="n">marked</span> <span class="o">=</span> <span 
class="n">results</span><span class="p">[</span><span class="s">'marked 
strings'</span><span class="p">]</span>  <span class="c"># indexing works as 
well</span>
+
+<span class="c"># The result is also iterable, ordered in the same order that 
the tags were passed to with_outputs(), the main tag (if specified) 
first.</span>
+
+<span class="n">below</span><span class="p">,</span> <span 
class="n">above</span><span class="p">,</span> <span class="n">marked</span> 
<span class="o">=</span> <span class="p">(</span><span class="n">words</span>
+                        <span class="o">|</span> <span 
class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span 
class="p">(</span>
+                            <span class="n">ProcessWords</span><span 
class="p">(),</span> <span class="n">cutoff_length</span><span 
class="o">=</span><span class="mi">2</span><span class="p">,</span> <span 
class="n">marker</span><span class="o">=</span><span class="s">'x'</span><span 
class="p">)</span>
+                        <span class="o">.</span><span 
class="n">with_outputs</span><span class="p">(</span><span 
class="s">'above_cutoff_lengths'</span><span class="p">,</span>
+                                      <span class="s">'marked 
strings'</span><span class="p">,</span>
+                                      <span class="n">main</span><span 
class="o">=</span><span class="s">'below_cutoff_strings'</span><span 
class="p">))</span>
+</code></pre>
+</div>
+
+<h5 id="emitting-to-side-outputs-in-your-dofn">Emitting to Side Outputs in 
your DoFn:</h5>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="c1">// Inside your ParDo's DoFn, you can 
emit an element to a side output by using the method 
ProcessContext.sideOutput.</span>
+<span class="c1">// Pass the appropriate TupleTag for the target side output 
collection when you call ProcessContext.sideOutput.</span>
+<span class="c1">// After your ParDo, extract the resulting main and side 
output PCollections from the returned PCollectionTuple.</span>
+<span class="c1">// Based on the previous example, this shows the DoFn 
emitting to the main and side outputs.</span>
+
+  <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span class="n">DoFn</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;()</span> <span 
class="o">{</span>
+     <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">processElement</span><span class="o">(</span><span 
class="n">ProcessContext</span> <span class="n">c</span><span 
class="o">)</span> <span class="o">{</span>
+       <span class="n">String</span> <span class="n">word</span> <span 
class="o">=</span> <span class="n">c</span><span class="o">.</span><span 
class="na">element</span><span class="o">();</span>
+       <span class="k">if</span> <span class="o">(</span><span 
class="n">word</span><span class="o">.</span><span 
class="na">length</span><span class="o">()</span> <span class="o">&lt;=</span> 
<span class="n">wordLengthCutOff</span><span class="o">)</span> <span 
class="o">{</span>
+         <span class="c1">// Emit this short word to the main output.</span>
+         <span class="n">c</span><span class="o">.</span><span 
class="na">output</span><span class="o">(</span><span 
class="n">word</span><span class="o">);</span>
+       <span class="o">}</span> <span class="k">else</span> <span 
class="o">{</span>
+         <span class="c1">// Emit this long word's length to a side 
output.</span>
+         <span class="n">c</span><span class="o">.</span><span 
class="na">sideOutput</span><span class="o">(</span><span 
class="n">wordLengthsAboveCutOffTag</span><span class="o">,</span> <span 
class="n">word</span><span class="o">.</span><span 
class="na">length</span><span class="o">());</span>
+       <span class="o">}</span>
+       <span class="k">if</span> <span class="o">(</span><span 
class="n">word</span><span class="o">.</span><span 
class="na">startsWith</span><span class="o">(</span><span 
class="s">"MARKER"</span><span class="o">))</span> <span class="o">{</span>
+         <span class="c1">// Emit this word to a different side output.</span>
+         <span class="n">c</span><span class="o">.</span><span 
class="na">sideOutput</span><span class="o">(</span><span 
class="n">markedWordsTag</span><span class="o">,</span> <span 
class="n">word</span><span class="o">);</span>
+       <span class="o">}</span>
+     <span class="o">}}));</span>
+
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre 
class="highlight"><code><span class="c"># Inside your ParDo's DoFn, you can 
emit an element to a side output by wrapping the value and the output tag 
(str).</span>
+<span class="c"># using the pvalue.SideOutputValue wrapper class.</span>
+<span class="c"># Based on the previous example, this shows the DoFn emitting 
to the main and side outputs.</span>
+
+<span class="k">class</span> <span class="nc">ProcessWords</span><span 
class="p">(</span><span class="n">beam</span><span class="o">.</span><span 
class="n">DoFn</span><span class="p">):</span>
+
+  <span class="k">def</span> <span class="nf">process</span><span 
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span 
class="n">context</span><span class="p">,</span> <span 
class="n">cutoff_length</span><span class="p">,</span> <span 
class="n">marker</span><span class="p">):</span>
+    <span class="k">if</span> <span class="nb">len</span><span 
class="p">(</span><span class="n">context</span><span class="o">.</span><span 
class="n">element</span><span class="p">)</span> <span class="o">&lt;=</span> 
<span class="n">cutoff_length</span><span class="p">:</span>
+      <span class="c"># Emit this short word to the main output.</span>
+      <span class="k">yield</span> <span class="n">context</span><span 
class="o">.</span><span class="n">element</span>
+    <span class="k">else</span><span class="p">:</span>
+      <span class="c"># Emit this word's long length to a side output.</span>
+      <span class="k">yield</span> <span class="n">pvalue</span><span 
class="o">.</span><span class="n">SideOutputValue</span><span class="p">(</span>
+          <span class="s">'above_cutoff_lengths'</span><span 
class="p">,</span> <span class="nb">len</span><span class="p">(</span><span 
class="n">context</span><span class="o">.</span><span 
class="n">element</span><span class="p">))</span>
+    <span class="k">if</span> <span class="n">context</span><span 
class="o">.</span><span class="n">element</span><span class="o">.</span><span 
class="n">startswith</span><span class="p">(</span><span 
class="n">marker</span><span class="p">):</span>
+      <span class="c"># Emit this word to a different side output.</span>
+      <span class="k">yield</span> <span class="n">pvalue</span><span 
class="o">.</span><span class="n">SideOutputValue</span><span 
class="p">(</span><span class="s">'marked strings'</span><span 
class="p">,</span> <span class="n">context</span><span class="o">.</span><span 
class="n">element</span><span class="p">)</span>
+
+
+<span class="c"># Side outputs are also available in Map and FlatMap.</span>
+<span class="c"># Here is an example that uses FlatMap and shows that the tags 
do not need to be specified ahead of time.</span>
+
+<span class="k">def</span> <span class="nf">even_odd</span><span 
class="p">(</span><span class="n">x</span><span class="p">):</span>
+  <span class="k">yield</span> <span class="n">pvalue</span><span 
class="o">.</span><span class="n">SideOutputValue</span><span 
class="p">(</span><span class="s">'odd'</span> <span class="k">if</span> <span 
class="n">x</span> <span class="o">%</span> <span class="mi">2</span> <span 
class="k">else</span> <span class="s">'even'</span><span class="p">,</span> 
<span class="n">x</span><span class="p">)</span>
+  <span class="k">if</span> <span class="n">x</span> <span class="o">%</span> 
<span class="mi">10</span> <span class="o">==</span> <span 
class="mi">0</span><span class="p">:</span>
+    <span class="k">yield</span> <span class="n">x</span>
+
+<span class="n">results</span> <span class="o">=</span> <span 
class="n">numbers</span> <span class="o">|</span> <span 
class="n">beam</span><span class="o">.</span><span 
class="n">FlatMap</span><span class="p">(</span><span 
class="n">even_odd</span><span class="p">)</span><span class="o">.</span><span 
class="n">with_outputs</span><span class="p">()</span>
+
+<span class="n">evens</span> <span class="o">=</span> <span 
class="n">results</span><span class="o">.</span><span class="n">even</span>
+<span class="n">odds</span> <span class="o">=</span> <span 
class="n">results</span><span class="o">.</span><span class="n">odd</span>
+<span class="n">tens</span> <span class="o">=</span> <span 
class="n">results</span><span class="p">[</span><span 
class="bp">None</span><span class="p">]</span>  <span class="c"># the 
undeclared main output</span>
+</code></pre>
+</div>
+
 <p><a name="io"></a>
 <a name="running"></a>
 <a name="transforms-composite"></a>
-<a name="transforms-sideio"></a>
 <a name="coders"></a>
 <a name="windowing"></a>
 <a name="triggers"></a></p>

Reply via email to