This is an automated email from the ASF dual-hosted git repository. git-site-role pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/asf-site by this push: new fead510 Publishing website 2021/10/04 18:03:25 at commit 2a3d887 fead510 is described below commit fead510df4a26d24cdbd06c10cb2af8c252bead6 Author: jenkins <bui...@apache.org> AuthorDate: Mon Oct 4 18:03:26 2021 +0000 Publishing website 2021/10/04 18:03:25 at commit 2a3d887 --- website/generated-content/documentation/index.xml | 150 ++++++++++++++++++--- .../io/developing-io-python/index.html | 4 +- .../patterns/cross-language/index.html | 4 +- .../documentation/programming-guide/index.html | 96 +++++++++---- website/generated-content/get-started/index.xml | 8 +- .../get-started/mobile-gaming-example/index.html | 8 +- website/generated-content/sitemap.xml | 2 +- 7 files changed, 215 insertions(+), 57 deletions(-) diff --git a/website/generated-content/documentation/index.xml b/website/generated-content/documentation/index.xml index 56112c0..c4ca3be 100644 --- a/website/generated-content/documentation/index.xml +++ b/website/generated-content/documentation/index.xml @@ -978,7 +978,7 @@ to <code>_CountingSource</code>. Then, create the wrapper <code>PTransf </a> <pre><code>class ReadFromCountingSource(PTransform): def __init__(self, count): -super(ReadFromCountingSource, self).__init__() +super().__init__() self._count = count def expand(self, pcoll): return pcoll | iobase.Read(_CountingSource(self._count))</code></pre> @@ -1003,7 +1003,7 @@ numbers = pipeline | &#39;ProduceNumbers&#39; &gt;&gt; ReadFromC <pre><code>class WriteToKVSink(PTransform): def __init__(self, simplekv, url, final_table_name): self._simplekv = simplekv -super(WriteToKVSink, self).__init__() +super().__init__() self._url = url self._final_table_name = final_table_name def expand(self, pcoll): @@ -5509,7 +5509,8 @@ the pipeline.</li> </ul> </span> <p class="language-go">If your <code>PCollection</code> uses any non-global windowing function, the Beam Go SDK -behaves the same way as with global windowing.</p> +behaves the same way as with global windowing. Windows that are empty in the input +<code>PCollection</code> will likewise be empty in the output collection.</p> <h5 id="combining-values-in-a-keyed-pcollection">4.2.4.6. Combining values in a keyed PCollection</h5> <p>After creating a keyed PCollection (for example, by using a <code>GroupByKey</code> transform), a common pattern is to combine the collection of values associated @@ -5724,7 +5725,7 @@ Non-idempotent functions are supported by Beam, but require additional thought to ensure correctness when there are external side effects.</p> <span class="language-java language-py"> <blockquote> -<p><strong>Note:</strong> These requirements apply to subclasses of <code>DoFn</code></span> (a function object +<p><strong>Note:</strong> These requirements apply to subclasses of <code>DoFn</code>(a function object used with the <a href="#pardo">ParDo</a> transform), <code>CombineFn</code> (a function object used with the <a href="#combine">Combine</a> transform), and <code>WindowFn</code> (a function object used with the <a href="#windowing">Window</a> transform).</p> @@ -5732,7 +5733,7 @@ used with the <a href="#windowing">Window</a> transform).</p> </span> <span class="language-go"> <blockquote> -<p><strong>Note:</strong> These requirements apply to <code>DoFn</code>s</span> (a function object +<p><strong>Note:</strong> These requirements apply to <code>DoFn</code>s (a function object used with the <a href="#pardo">ParDo</a> transform), <code>CombineFn</code>s (a function object used with the <a href="#combine">Combine</a> transform), and <code>WindowFn</code>s (a function object used with the <a href="#windowing">Window</a> transform).</p> @@ -8558,7 +8559,7 @@ window.</p> <li>Sliding Time Windows</li> <li>Per-Session Windows</li> <li>Single Global Window</li> -<li>Calendar-based Windows (not supported by the Beam SDK for Python)</li> +<li>Calendar-based Windows (not supported by the Beam SDK for Python or Go)</li> </ul> <p>You can also define your own <code>WindowFn</code> if you have a more complex need.</p> <p>Note that each element can logically belong to more than one window, depending @@ -8648,6 +8649,16 @@ into fixed windows, each 60 seconds in length:</p> <span class="n">items</span> <span class="o">|</span> <span class="s1">&#39;window&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">FixedWindows</span><span class="p">(</span><span class="mi">60</span><span class="p">)))< [...] </div> </div> +<div class='language-go snippet'> +<div class="notebook-skip code-snippet"> +<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> +<img src="/images/copy-icon.svg"/> +</a> +<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"><span class="nx">fixedWindowedItems</span> <span class="o">:=</span> <span class="nx">beam</span><span class="p">.</span><span class="nf">WindowInto</span><span class="p">(</span><span class="nx">s</span><span class="p">,</span> +<span class="nx">window</span><span class="p">.</span><span class="nf">NewFixedWindows</span><span class="p">(</span><span class="mi">60</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Second</span><span class="p">),</span> +<span class="nx">items</span><span class="p">)</span></code></pre></div> +</div> +</div> <h4 id="using-sliding-time-windows">8.3.2. Sliding time windows</h4> <p>The following example code shows how to apply <code>Window</code> to divide a <code>PCollection</code> into sliding time windows. Each window is 30 seconds in length, and a new window @@ -8672,6 +8683,16 @@ begins every five seconds:</p> <span class="n">items</span> <span class="o">|</span> <span class="s1">&#39;window&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">SlidingWindows</span><span class="p">(</span><span class="mi">30</span><span class="p">,< [...] </div> </div> +<div class='language-go snippet'> +<div class="notebook-skip code-snippet"> +<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> +<img src="/images/copy-icon.svg"/> +</a> +<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"><span class="nx">slidingWindowedItems</span> <span class="o">:=</span> <span class="nx">beam</span><span class="p">.</span><span class="nf">WindowInto</span><span class="p">(</span><span class="nx">s</span><span class="p">,</span> +<span class="nx">window</span><span class="p">.</span><span class="nf">NewSlidingWindows</span><span class="p">(</span><span class="mi">5</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Second</span><span class="p">,</span> <span class="mi">30</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Second</ [...] +<span class="nx">items</span><span class="p">)</span></code></pre></div> +</div> +</div> <h4 id="using-session-windows">8.3.3. Session windows</h4> <p>The following example code shows how to apply <code>Window</code> to divide a <code>PCollection</code> into session windows, where each session must be separated by a time gap of at @@ -8696,6 +8717,16 @@ least 10 minutes (600 seconds):</p> <span class="n">items</span> <span class="o">|</span> <span class="s1">&#39;window&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">Sessions</span><span class="p">(</span><span class="mi">10</span> <span class="o">*</spa [...] </div> </div> +<div class='language-go snippet'> +<div class="notebook-skip code-snippet"> +<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> +<img src="/images/copy-icon.svg"/> +</a> +<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"><span class="nx">sessionWindowedItems</span> <span class="o">:=</span> <span class="nx">beam</span><span class="p">.</span><span class="nf">WindowInto</span><span class="p">(</span><span class="nx">s</span><span class="p">,</span> +<span class="nx">window</span><span class="p">.</span><span class="nf">NewSessions</span><span class="p">(</span><span class="mi">600</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Second</span><span class="p">),</span> +<span class="nx">items</span><span class="p">)</span></code></pre></div> +</div> +</div> <p>Note that the sessions are per-key — each key in the collection will have its own session groupings depending on the data distribution.</p> <h4 id="using-single-global-window">8.3.4. Single global window</h4> @@ -8722,6 +8753,16 @@ a single global window for a <code>PCollection</code>:</p> <span class="n">items</span> <span class="o">|</span> <span class="s1">&#39;window&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">window</span><span class="o">.</span><span class="n">GlobalWindows</span><span class="p">()))</span></code></pre></div> </div> </div> +<div class='language-go snippet'> +<div class="notebook-skip code-snippet"> +<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> +<img src="/images/copy-icon.svg"/> +</a> +<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"><span class="nx">globalWindowedItems</span> <span class="o">:=</span> <span class="nx">beam</span><span class="p">.</span><span class="nf">WindowInto</span><span class="p">(</span><span class="nx">s</span><span class="p">,</span> +<span class="nx">window</span><span class="p">.</span><span class="nf">NewGlobalWindows</span><span class="p">(),</span> +<span class="nx">items</span><span class="p">)</span></code></pre></div> +</div> +</div> <h3 id="watermarks-and-late-data">8.4. Watermarks and late data</h3> <p>In any data processing system, there is a certain amount of lag between the time a data event occurs (the &ldquo;event time&rdquo;, determined by the timestamp on the data @@ -8786,6 +8827,17 @@ the end of a window.</p> <span class="n">allowed_lateness</span><span class="o">=</span><span class="n">Duration</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="mi">2</span><span class="o">*</span><span class="mi">24</span><span class="o">*</span><span class="mi">60</span><span class="o">*</span><span class="mi">60</span><span class="p">))</span> <span class="c1"># 2 days</sp [...] </div> </div> +<div class='language-go snippet'> +<div class="notebook-skip code-snippet"> +<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> +<img src="/images/copy-icon.svg"/> +</a> +<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"><span class="nx">windowedItems</span> <span class="o">:=</span> <span class="nx">beam</span><span class="p">.</span><span class="nf">WindowInto</span><span class="p">(</span><span class="nx">s</span><span class="p">,</span> +<span class="nx">window</span><span class="p">.</span><span class="nf">NewFixedWindows</span><span class="p">(</span><span class="mi">1</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Minute</span><span class="p">),</span> <span class="nx">items</span><span class="p">,</span> +<span class="nx">beam</span><span class="p">.</span><span class="nf">AllowedLateness</span><span class="p">(</span><span class="mi">2</span><span class="o">*</span><span class="mi">24</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Hour</span><span class="p">),</span> <span class="c1">// 2 days +</span><span class="c1"></span><span class="p">)</span></code></pre></div> +</div> +</div> <p>When you set <code>.withAllowedLateness</code> on a <code>PCollection</code>, that allowed lateness propagates forward to any subsequent <code>PCollection</code> derived from the first <code>PCollection</code> you applied allowed lateness to. If you want to change the allowed @@ -8840,7 +8892,29 @@ with a <code>DoFn</code> to attach the timestamps to each element in your <span class="n">timestamped_items</span> <span class="o">=</span> <span class="n">items</span> <span class="o">|</span> <span class="s1">&#39;timestamp&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">AddTimestampDoFn</span><span class="p">())</span></code></pre></div> </div> </div> +<div class='language-go snippet'> +<div class="notebook-skip code-snippet"> +<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> +<img src="/images/copy-icon.svg"/> +</a> +<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"><span class="c1">// AddTimestampDoFn extracts an event time from a LogEntry. +</span><span class="c1"></span><span class="kd">func</span> <span class="nf">AddTimestampDoFn</span><span class="p">(</span><span class="nx">element</span> <span class="nx">LogEntry</span><span class="p">,</span> <span class="nx">emit</span> <span class="kd">func</span><span class="p">(</span><span class="nx">beam</span><span class="p">.</span><span class="nx">EventTime</span><span class="p">,</span> & [...] +<span class="nx">et</span> <span class="o">:=</span> <span class="nf">extractEventTime</span><span class="p">(</span><span class="nx">element</span><span class="p">)</span> +<span class="c1">// Defining an emitter with beam.EventTime as the first parameter +</span><span class="c1"></span> <span class="c1">// allows the DoFn to set the event time for the emitted element. +</span><span class="c1"></span> <span class="nf">emit</span><span class="p">(</span><span class="nx">mtime</span><span class="p">.</span><span class="nf">FromTime</span><span class="p">(</span><span class="nx">et</span><span class="p">),</span> <span class="nx">element</span><span class="p">)</span> +<span class="p">}</span> +<span class="c1">// Use the DoFn with ParDo as normal. +</span><span class="c1"></span> +<span class="nx">stampedLogs</span> <span class="o">:=</span> <span class="nx">beam</span><span class="p">.</span><span class="nf">ParDo</span><span class="p">(</span><span class="nx">s</span><span class="p">,</span> <span class="nx">AddTimestampDoFn</span><span class="p">,</span> <span class="nx">unstampedLogs</span><span class="p">)</span></code></pre></div> +</div> +</div> <h2 id="triggers">9. Triggers</h2> +<span class="language-go"> +<blockquote> +<p><strong>Note:</strong> The Trigger API in the Beam SDK for Go is currently experimental and subject to change.</p> +</blockquote> +</span> <p>When collecting and grouping data into windows, Beam uses <strong>triggers</strong> to determine when to emit the aggregated results of each window (referred to as a <em>pane</em>). If you use Beam&rsquo;s default windowing configuration and <a href="#default-trigger">default @@ -8936,8 +9010,10 @@ firings:</p> <a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> <img src="/images/copy-icon.svg"/> </a> -<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"> -<span class="nx">trigger</span> <span class="o">:=</span> <span class="nx">window</span><span class="p">.</span><span class="nf">TriggerAfterEndOfWindow</span><span class="p">().</span><span class="nf">EarlyFiring</span><span class="p">(</span><span class="nx">window</span><span class="p">.</span><span class="nf">TriggerAfterProcessingTime</span><span class="p">(</span><span class="mi">60000</span><span clas [...] +<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"><span class="nx">trigger</span> <span class="o">:=</span> <span class="nx">window</span><span class="p">.</span><span class="nf">TriggerAfterEndOfWindow</span><span class="p">().</span> +<span class="nf">EarlyFiring</span><span class="p">(</span><span class="nx">window</span><span class="p">.</span><span class="nf">TriggerAfterProcessingTime</span><span class="p">().</span> +<span class="nf">PlusDelay</span><span class="p">(</span><span class="mi">60</span> <span class="o">*</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Second</span><span class="p">)).</span> +<span class="nf">LateFiring</span><span class="p">(</span><span class="nx">window</span><span class="p">.</span><span class="nf">TriggerRepeat</span><span class="p">(</span><span class="nx">window</span><span class="p">.</span><span class="nf">TriggerAfterCount</span><span class="p">(</span><span class="mi">1</span><span class="p">)))</span></code></pre></div> </div> </div> <h4 id="default-trigger">9.1.1. Default trigger</h4> @@ -8992,9 +9068,10 @@ trigger for a <code>PCollection</code>, which emits results one minute aft element in that window has been processed. The <code>accumulation_mode</code> parameter sets the window&rsquo;s <strong>accumulation mode</strong>.</p> <p class="language-go">You set the trigger(s) for a <code>PCollection</code> by passing in the <code>beam.Trigger</code> parameter -when you use the <code>beam.WindowInto</code> transform. This code sample sets an Always -trigger for a <code>PCollection</code>, which emits results every time an element in that window has been processed. The <code>beam.AccumulationMode</code> parameter -sets the window&rsquo;s <strong>accumulation mode</strong>.</p> +when you use the <code>beam.WindowInto</code> transform. This code sample sets a time-based +trigger for a <code>PCollection</code>, which emits results one minute after the first +element in that window has been processed. +The <code>beam.AccumulationMode</code> parameter sets the window&rsquo;s <strong>accumulation mode</strong>.</p> <div class='language-java snippet'> <div class="notebook-skip code-snippet"> <a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> @@ -9023,10 +9100,13 @@ sets the window&rsquo;s <strong>accumulation mode</strong>.</p> <a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> <img src="/images/copy-icon.svg"/> </a> -<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"> -<span class="nx">windowSize</span> <span class="o">:=</span> <span class="mi">10</span> <span class="o">*</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Second</span> -<span class="nx">trigger</span> <span class="o">:=</span> <span class="nx">window</span><span class="p">.</span><span class="nf">TriggerAlways</span><span class="p">()</span> -<span class="nx">beam</span><span class="p">.</span><span class="nf">WindowInto</span><span class="p">(</span><span class="nx">s</span><span class="p">,</span> <span class="nx">window</span><span class="p">.</span><span class="nf">NewFixedWindows</span><span class="p">(</span><span class="nx">windowSize</span><span class="p">),</span> <span class="nx">pCollection</span><span class="p">,</span> <span cl [...] +<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"><span class="nx">windowedItems</span> <span class="o">:=</span> <span class="nx">beam</span><span class="p">.</span><span class="nf">WindowInto</span><span class="p">(</span><span class="nx">s</span><span class="p">,</span> +<span class="nx">window</span><span class="p">.</span><span class="nf">NewFixedWindows</span><span class="p">(</span><span class="mi">1</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Minute</span><span class="p">),</span> <span class="nx">pcollection</span><span class="p">,</span> +<span class="nx">beam</span><span class="p">.</span><span class="nf">Trigger</span><span class="p">(</span><span class="nx">window</span><span class="p">.</span><span class="nf">TriggerAfterProcessingTime</span><span class="p">().</span> +<span class="nf">PlusDelay</span><span class="p">(</span><span class="mi">1</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Minute</span><span class="p">)),</span> +<span class="nx">beam</span><span class="p">.</span><span class="nf">AllowedLateness</span><span class="p">(</span><span class="mi">30</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Minute</span><span class="p">),</span> +<span class="nx">beam</span><span class="p">.</span><span class="nf">PanesDiscard</span><span class="p">(),</span> +<span class="p">)</span></code></pre></div> </div> </div> <h4 id="window-accumulation-modes">9.4.1. Window accumulation modes</h4> @@ -9079,8 +9159,10 @@ passes the end of the window, you can apply an <em>allowed lateness</em> w your windowing configuration. This gives your trigger the opportunity to react to the late data. If allowed lateness is set, the default trigger will emit new results immediately whenever late data arrives.</p> -<p>You set the allowed lateness by using <code>.withAllowedLateness()</code> when you set your -windowing function:</p> +<p>You set the allowed lateness by using <span class="language-java"><code>.withAllowedLateness()</code></span> +<span class="language-py"><code>allowed_lateness</code></span> +<span class="language-go"><code>beam.AllowedLateness()</code></span> +when you set your windowing function:</p> <div class='language-java snippet'> <div class="notebook-skip code-snippet"> <a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> @@ -9106,10 +9188,26 @@ windowing function:</p> <span class="o">|</span> <span class="o">...</span></code></pre></div> </div> </div> +<div class='language-go snippet'> +<div class="notebook-skip code-snippet"> +<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> +<img src="/images/copy-icon.svg"/> +</a> +<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"><span class="nx">allowedToBeLateItems</span> <span class="o">:=</span> <span class="nx">beam</span><span class="p">.</span><span class="nf">WindowInto</span><span class="p">(</span><span class="nx">s</span><span class="p">,</span> +<span class="nx">window</span><span class="p">.</span><span class="nf">NewFixedWindows</span><span class="p">(</span><span class="mi">1</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Minute</span><span class="p">),</span> <span class="nx">pcollection</span><span class="p">,</span> +<span class="nx">beam</span><span class="p">.</span><span class="nf">Trigger</span><span class="p">(</span><span class="nx">window</span><span class="p">.</span><span class="nf">TriggerAfterProcessingTime</span><span class="p">().</span> +<span class="nf">PlusDelay</span><span class="p">(</span><span class="mi">1</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Minute</span><span class="p">)),</span> +<span class="nx">beam</span><span class="p">.</span><span class="nf">AllowedLateness</span><span class="p">(</span><span class="mi">30</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Minute</span><span class="p">),</span> +<span class="p">)</span></code></pre></div> +</div> +</div> <p>This allowed lateness propagates to all <code>PCollection</code>s derived as a result of applying transforms to the original <code>PCollection</code>. If you want to change the allowed lateness later in your pipeline, you can apply -<code>Window.configure().withAllowedLateness()</code> again, explicitly.</p> +<span class="language-java"><code>Window.configure().withAllowedLateness()</code></span> +<span class="language-py"><code>allowed_lateness</code></span> +<span class="language-go"><code>beam.AllowedLateness()</code></span> +again, explicitly.</p> <h3 id="composite-triggers">9.5. Composite triggers</h3> <p>You can combine multiple triggers to form <strong>composite triggers</strong>, and can specify a trigger to emit results repeatedly, at most once, or under other @@ -9193,6 +9291,20 @@ trigger stops executing</li> <span class="n">accumulation_mode</span><span class="o">=</span><span class="n">AccumulationMode</span><span class="o">.</span><span class="n">DISCARDING</span><span class="p">)</span></code></pre></div> </div> </div> +<div class='language-go snippet'> +<div class="notebook-skip code-snippet"> +<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> +<img src="/images/copy-icon.svg"/> +</a> +<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"><span class="nx">compositeTriggerItems</span> <span class="o">:=</span> <span class="nx">beam</span><span class="p">.</span><span class="nf">WindowInto</span><span class="p">(</span><span class="nx">s</span><span class="p">,</span> +<span class="nx">window</span><span class="p">.</span><span class="nf">NewFixedWindows</span><span class="p">(</span><span class="mi">1</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Minute</span><span class="p">),</span> <span class="nx">pcollection</span><span class="p">,</span> +<span class="nx">beam</span><span class="p">.</span><span class="nf">Trigger</span><span class="p">(</span><span class="nx">window</span><span class="p">.</span><span class="nf">TriggerAfterEndOfWindow</span><span class="p">().</span> +<span class="nf">LateFiring</span><span class="p">(</span><span class="nx">window</span><span class="p">.</span><span class="nf">TriggerAfterProcessingTime</span><span class="p">().</span> +<span class="nf">PlusDelay</span><span class="p">(</span><span class="mi">10</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Minute</span><span class="p">))),</span> +<span class="nx">beam</span><span class="p">.</span><span class="nf">AllowedLateness</span><span class="p">(</span><span class="mi">2</span><span class="o">*</span><span class="mi">24</span><span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Hour</span><span class="p">),</span> +<span class="p">)</span></code></pre></div> +</div> +</div> <h4 id="other-composite-triggers">9.5.3. Other composite triggers</h4> <p>You can also build other sorts of composite triggers. The following example code shows a simple composite trigger that fires whenever the pane has at least 100 @@ -14000,7 +14112,7 @@ limitations under the License. <span class="nd">@ptransform.PTransform.register_urn</span><span class="p">(</span><span class="n">URN</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span> <span class="k">class</span> <span class="nc">PythonTransform</span><span class="p">(</span><span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> -<span class="nb">super</span><span class="p">(</span><span class="n">PythonTransform</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> +<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> <span class="k">return</span> <span class="p">(</span><span class="n">pcoll</span> <span class="o">|</span> <span class="s2">&#34;Input preparation&#34;</span> diff --git a/website/generated-content/documentation/io/developing-io-python/index.html b/website/generated-content/documentation/io/developing-io-python/index.html index bbd41ee..f95205c 100644 --- a/website/generated-content/documentation/io/developing-io-python/index.html +++ b/website/generated-content/documentation/io/developing-io-python/index.html @@ -126,7 +126,7 @@ that they are not exposed to end-users. For the source, rename <code>CountingSou to <code>_CountingSource</code>. Then, create the wrapper <code>PTransform</code>, called <code>ReadFromCountingSource</code>:</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>class ReadFromCountingSource(PTransform): def __init__(self, count): - super(ReadFromCountingSource, self).__init__() + super().__init__() self._count = count def expand(self, pcoll): @@ -134,7 +134,7 @@ to <code>_CountingSource</code>. Then, create the wrapper <code>PTransform</code numbers = pipeline | 'ProduceNumbers' >> ReadFromCountingSource(count)</code></pre></div></div><p>For the sink, rename <code>SimpleKVSink</code> to <code>_SimpleKVSink</code>. Then, create the wrapper <code>PTransform</code>, called <code>WriteToKVSink</code>:</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a>< [...] def __init__(self, simplekv, url, final_table_name): self._simplekv = simplekv - super(WriteToKVSink, self).__init__() + super().__init__() self._url = url self._final_table_name = final_table_name diff --git a/website/generated-content/documentation/patterns/cross-language/index.html b/website/generated-content/documentation/patterns/cross-language/index.html index ae8afd8..b171fdb 100644 --- a/website/generated-content/documentation/patterns/cross-language/index.html +++ b/website/generated-content/documentation/patterns/cross-language/index.html @@ -53,7 +53,7 @@ function openMenu(){addPlaceholder();blockScroll();}</script><div class="clearfi <span class=nd>@ptransform.PTransform.register_urn</span><span class=p>(</span><span class=n>URN</span><span class=p>,</span> <span class=bp>None</span><span class=p>)</span> <span class=k>class</span> <span class=nc>PythonTransform</span><span class=p>(</span><span class=n>ptransform</span><span class=o>.</span><span class=n>PTransform</span><span class=p>):</span> <span class=k>def</span> <span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>):</span> - <span class=nb>super</span><span class=p>(</span><span class=n>PythonTransform</span><span class=p>,</span> <span class=bp>self</span><span class=p>)</span><span class=o>.</span><span class=fm>__init__</span><span class=p>()</span> + <span class=nb>super</span><span class=p>()</span><span class=o>.</span><span class=fm>__init__</span><span class=p>()</span> <span class=k>def</span> <span class=nf>expand</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>pcoll</span><span class=p>):</span> <span class=k>return</span> <span class=p>(</span><span class=n>pcoll</span> @@ -96,7 +96,7 @@ function openMenu(){addPlaceholder();blockScroll();}</script><div class="clearfi <span class=n>logging</span><span class=o>.</span><span class=n>getLogger</span><span class=p>()</span><span class=o>.</span><span class=n>setLevel</span><span class=p>(</span><span class=n>logging</span><span class=o>.</span><span class=n>INFO</span><span class=p>)</span> <span class=n>main</span><span class=p>(</span><span class=n>sys</span><span class=o>.</span><span class=n>argv</span><span class=p>)</span></code></pre></div></div></div><h2 id=how-to-run-the-cross-language-pipeline>How to run the cross-language pipeline?</h2><p>In this section, the steps to run a cross-language pipeline are set out:</p><ol><li><p>Start the <strong>expansion service</strong> with your Python transforms: <code>python expansion_service.py -p 9097</code></p></li><li><p>S [...] <code>./gradlew :runners:spark:job-server:runShadow</code></li><li>Using the pre-build Docker container: -<code>docker run -net=host apache/beam_spark_job_server</code></li></ul></li><li><p><strong>Run pipeline</strong>: <code>mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \ -Pportable-runner \ -Dexec.args=" \ --runner=PortableRunner \ --jobEndpoint=localhost:8099 \ --useExternal=true \ --expansionServiceURL=localhost:9097 \ --experiments=beam_fn_api"</code></p></li></ol><div class=feedback><p class=update>Last updated on 2020/12/25</p><h3>Have you found everything you were looking for [...] +<code>docker run -net=host apache/beam_spark_job_server</code></li></ul></li><li><p><strong>Run pipeline</strong>: <code>mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \ -Pportable-runner \ -Dexec.args=" \ --runner=PortableRunner \ --jobEndpoint=localhost:8099 \ --useExternal=true \ --expansionServiceURL=localhost:9097 \ --experiments=beam_fn_api"</code></p></li></ol><div class=feedback><p class=update>Last updated on 2021/10/04</p><h3>Have you found everything you were looking for [...] <a href=http://www.apache.org>The Apache Software Foundation</a> | <a href=/privacy_policy>Privacy Policy</a> | <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div></div></div></footer></body></html> \ No newline at end of file diff --git a/website/generated-content/documentation/programming-guide/index.html b/website/generated-content/documentation/programming-guide/index.html index 9cc9792..c092a4c 100644 --- a/website/generated-content/documentation/programming-guide/index.html +++ b/website/generated-content/documentation/programming-guide/index.html @@ -1098,7 +1098,8 @@ applying <code>Combine</code>:</p><ul><li>Specify <code>.withoutDefaults</code>, when used as a side input. You’ll generally only need to use this option if the result of your pipeline’s <code>Combine</code> is to be used as a side input later in the pipeline.</li></ul></span><p class=language-go>If your <code>PCollection</code> uses any non-global windowing function, the Beam Go SDK -behaves the same way as with global windowing.</p><h5 id=combining-values-in-a-keyed-pcollection>4.2.4.6. Combining values in a keyed PCollection</h5><p>After creating a keyed PCollection (for example, by using a <code>GroupByKey</code> +behaves the same way as with global windowing. Windows that are empty in the input +<code>PCollection</code> will likewise be empty in the output collection.</p><h5 id=combining-values-in-a-keyed-pcollection>4.2.4.6. Combining values in a keyed PCollection</h5><p>After creating a keyed PCollection (for example, by using a <code>GroupByKey</code> transform), a common pattern is to combine the collection of values associated with each key into a single, merged value. Drawing on the previous example from <code>GroupByKey</code>, a key-grouped <code>PCollection</code> called <code>groupedWords</code> looks like this:</p><pre><code> cat, [1,5,9] @@ -1230,10 +1231,10 @@ run multiple times. As such, you should be cautious about including things like state dependency in your user code.</p><p>In general, your user code must fulfill at least these requirements:</p><ul><li>Your function object must be <strong>serializable</strong>.</li><li>Your function object must be <strong>thread-compatible</strong>, and be aware that <em>the Beam SDKs are not thread-safe</em>.</li></ul><p>In addition, it’s recommended that you make your function object <strong>idempotent</strong>. Non-idempotent functions are supported by Beam, but require additional -thought to ensure correctness when there are external side effects.</p><span class="language-java language-py"><blockquote><p><strong>Note:</strong> These requirements apply to subclasses of <code>DoFn</code></span> (a function object +thought to ensure correctness when there are external side effects.</p><span class="language-java language-py"><blockquote><p><strong>Note:</strong> These requirements apply to subclasses of <code>DoFn</code>(a function object used with the <a href=#pardo>ParDo</a> transform), <code>CombineFn</code> (a function object used with the <a href=#combine>Combine</a> transform), and <code>WindowFn</code> (a function object -used with the <a href=#windowing>Window</a> transform).</p></blockquote></span><span class=language-go><blockquote><p><strong>Note:</strong> These requirements apply to <code>DoFn</code>s</span> (a function object +used with the <a href=#windowing>Window</a> transform).</p></blockquote></span><span class=language-go><blockquote><p><strong>Note:</strong> These requirements apply to <code>DoFn</code>s (a function object used with the <a href=#pardo>ParDo</a> transform), <code>CombineFn</code>s (a function object used with the <a href=#combine>Combine</a> transform), and <code>WindowFn</code>s (a function object used with the <a href=#windowing>Window</a> transform).</p></blockquote></span><h4 id=user-code-serializability>4.3.1. Serializability</h4><p>Any function object you provide to a transform must be <strong>fully serializable</strong>. @@ -2546,7 +2547,7 @@ for that <code>PCollection</code>. The <code>GroupByKey</code> transform groups <code>PCollection</code> by both key and window, based on the windowing function. The subsequent <code>ParDo</code> transform gets applied multiple times per key, once for each window.</p><h3 id=provided-windowing-functions>8.2. Provided windowing functions</h3><p>You can define different kinds of windows to divide the elements of your -<code>PCollection</code>. Beam provides several windowing functions, including:</p><ul><li>Fixed Time Windows</li><li>Sliding Time Windows</li><li>Per-Session Windows</li><li>Single Global Window</li><li>Calendar-based Windows (not supported by the Beam SDK for Python)</li></ul><p>You can also define your own <code>WindowFn</code> if you have a more complex need.</p><p>Note that each element can logically belong to more than one window, depending +<code>PCollection</code>. Beam provides several windowing functions, including:</p><ul><li>Fixed Time Windows</li><li>Sliding Time Windows</li><li>Per-Session Windows</li><li>Single Global Window</li><li>Calendar-based Windows (not supported by the Beam SDK for Python or Go)</li></ul><p>You can also define your own <code>WindowFn</code> if you have a more complex need.</p><p>Note that each element can logically belong to more than one window, depending on the windowing function you use. Sliding time windowing, for example, creates overlapping windows wherein a single element can be assigned to multiple windows.</p><h4 id=fixed-time-windows>8.2.1. Fixed time windows</h4><p>The simplest form of windowing is using <strong>fixed time windows</strong>: given a @@ -2594,26 +2595,34 @@ into fixed windows, each 60 seconds in length:</p><div class="language-java snip <span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>fixedWindowedItems</span> <span class=o>=</span> <span class=n>items</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span> <span class=n>Window</span><span class=o>.<</span><span class=n>String</span><span class=o>></span><span class=n>into</span><span class=o>(</span><span class=n>FixedWindows</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>Duration</span><span class=o>.</span><span class=na>standardSeconds</span><span class=o>(</span><span class=n>60</span><span class=o>))));</span></code></pre></div></div></div><div class="language-py snippet"><div cla [...] <span class=n>fixed_windowed_items</span> <span class=o>=</span> <span class=p>(</span> - <span class=n>items</span> <span class=o>|</span> <span class=s1>'window'</span> <span class=o>>></span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span><span class=n>window</span><span class=o>.</span><span class=n>FixedWindows</span><span class=p>(</span><span class=mi>60</span><span class=p>)))</span></code></pre></div></div></div><h4 id=using-sliding-time-windows>8.3.2. Sliding time windows</h4><p>The following exa [...] + <span class=n>items</span> <span class=o>|</span> <span class=s1>'window'</span> <span class=o>>></span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span><span class=n>window</span><span class=o>.</span><span class=n>FixedWindows</span><span class=p>(</span><span class=mi>60</span><span class=p>)))</span></code></pre></div></div></div><div class="language-go snippet"><div class="notebook-skip code-snippet"><a class=copy [...] + <span class=nx>window</span><span class=p>.</span><span class=nf>NewFixedWindows</span><span class=p>(</span><span class=mi>60</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Second</span><span class=p>),</span> + <span class=nx>items</span><span class=p>)</span></code></pre></div></div></div><h4 id=using-sliding-time-windows>8.3.2. Sliding time windows</h4><p>The following example code shows how to apply <code>Window</code> to divide a <code>PCollection</code> into sliding time windows. Each window is 30 seconds in length, and a new window begins every five seconds:</p><div class="language-java snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-java data-lang=java> <span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>items</span> <span class=o>=</span> <span cl [...] <span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>slidingWindowedItems</span> <span class=o>=</span> <span class=n>items</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span> <span class=n>Window</span><span class=o>.<</span><span class=n>String</span><span class=o>></span><span class=n>into</span><span class=o>(</span><span class=n>SlidingWindows</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>Duration</span><span class=o>.</span><span class=na>standardSeconds</span><span class=o>(</span><span class=n>30</span><span class=o>)).</span><span class=na>every</span><span class=o>(</span><span class=n>Duration< [...] <span class=n>sliding_windowed_items</span> <span class=o>=</span> <span class=p>(</span> - <span class=n>items</span> <span class=o>|</span> <span class=s1>'window'</span> <span class=o>>></span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span><span class=n>window</span><span class=o>.</span><span class=n>SlidingWindows</span><span class=p>(</span><span class=mi>30</span><span class=p>,</span> <span class=mi>5</span><span class=p>)))</span></code></pre></div></div></div><h4 id=using-session-windows>8.3.3. Se [...] + <span class=n>items</span> <span class=o>|</span> <span class=s1>'window'</span> <span class=o>>></span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span><span class=n>window</span><span class=o>.</span><span class=n>SlidingWindows</span><span class=p>(</span><span class=mi>30</span><span class=p>,</span> <span class=mi>5</span><span class=p>)))</span></code></pre></div></div></div><div class="language-go snippet"><div [...] + <span class=nx>window</span><span class=p>.</span><span class=nf>NewSlidingWindows</span><span class=p>(</span><span class=mi>5</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Second</span><span class=p>,</span> <span class=mi>30</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Second</span><span class=p>),</span> + <span class=nx>items</span><span class=p>)</span></code></pre></div></div></div><h4 id=using-session-windows>8.3.3. Session windows</h4><p>The following example code shows how to apply <code>Window</code> to divide a <code>PCollection</code> into session windows, where each session must be separated by a time gap of at least 10 minutes (600 seconds):</p><div class="language-java snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-java data-lang=java> <span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>items</span> <span class=o>=</span> <sp [...] <span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>sessionWindowedItems</span> <span class=o>=</span> <span class=n>items</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span> <span class=n>Window</span><span class=o>.<</span><span class=n>String</span><span class=o>></span><span class=n>into</span><span class=o>(</span><span class=n>Sessions</span><span class=o>.</span><span class=na>withGapDuration</span><span class=o>(</span><span class=n>Duration</span><span class=o>.</span><span class=na>standardSeconds</span><span class=o>(</span><span class=n>600</span><span class=o>))));</span></code></pre></div></div></div><div class="language-py snippet [...] <span class=n>session_windowed_items</span> <span class=o>=</span> <span class=p>(</span> - <span class=n>items</span> <span class=o>|</span> <span class=s1>'window'</span> <span class=o>>></span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span><span class=n>window</span><span class=o>.</span><span class=n>Sessions</span><span class=p>(</span><span class=mi>10</span> <span class=o>*</span> <span class=mi>60</span><span class=p>)))</span></code></pre></div></div></div><p>Note that the sessions are per-key — ea [...] + <span class=n>items</span> <span class=o>|</span> <span class=s1>'window'</span> <span class=o>>></span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span><span class=n>window</span><span class=o>.</span><span class=n>Sessions</span><span class=p>(</span><span class=mi>10</span> <span class=o>*</span> <span class=mi>60</span><span class=p>)))</span></code></pre></div></div></div><div class="language-go snippet"><div clas [...] + <span class=nx>window</span><span class=p>.</span><span class=nf>NewSessions</span><span class=p>(</span><span class=mi>600</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Second</span><span class=p>),</span> + <span class=nx>items</span><span class=p>)</span></code></pre></div></div></div><p>Note that the sessions are per-key — each key in the collection will have its own session groupings depending on the data distribution.</p><h4 id=using-single-global-window>8.3.4. Single global window</h4><p>If your <code>PCollection</code> is bounded (the size is fixed), you can assign all the elements to a single global window. The following example code shows how to set a single global window for a <code>PCollection</code>:</p><div class="language-java snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-java data-lang=java> <span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>items</span> <sp [...] <span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>batchItems</span> <span class=o>=</span> <span class=n>items</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span> <span class=n>Window</span><span class=o>.<</span><span class=n>String</span><span class=o>></span><span class=n>into</span><span class=o>(</span><span class=k>new</span> <span class=n>GlobalWindows</span><span class=o>()));</span></code></pre></div></div></div><div class="language-py snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div cla [...] <span class=n>global_windowed_items</span> <span class=o>=</span> <span class=p>(</span> - <span class=n>items</span> <span class=o>|</span> <span class=s1>'window'</span> <span class=o>>></span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span><span class=n>window</span><span class=o>.</span><span class=n>GlobalWindows</span><span class=p>()))</span></code></pre></div></div></div><h3 id=watermarks-and-late-data>8.4. Watermarks and late data</h3><p>In any data processing system, there is a certain amount of l [...] + <span class=n>items</span> <span class=o>|</span> <span class=s1>'window'</span> <span class=o>>></span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span><span class=n>window</span><span class=o>.</span><span class=n>GlobalWindows</span><span class=p>()))</span></code></pre></div></div></div><div class="language-go snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs- [...] + <span class=nx>window</span><span class=p>.</span><span class=nf>NewGlobalWindows</span><span class=p>(),</span> + <span class=nx>items</span><span class=p>)</span></code></pre></div></div></div><h3 id=watermarks-and-late-data>8.4. Watermarks and late data</h3><p>In any data processing system, there is a certain amount of lag between the time a data event occurs (the “event time”, determined by the timestamp on the data element itself) and the time the actual data element gets processed at any stage in your pipeline (the “processing time”, determined by the clock on the system @@ -2652,7 +2661,10 @@ the end of a window.</p><div class="language-java snippet"><div class="notebook- <span class=n>trigger</span><span class=o>=</span><span class=n>trigger_fn</span><span class=p>,</span> <span class=n>accumulation_mode</span><span class=o>=</span><span class=n>accumulation_mode</span><span class=p>,</span> <span class=n>timestamp_combiner</span><span class=o>=</span><span class=n>timestamp_combiner</span><span class=p>,</span> - <span class=n>allowed_lateness</span><span class=o>=</span><span class=n>Duration</span><span class=p>(</span><span class=n>seconds</span><span class=o>=</span><span class=mi>2</span><span class=o>*</span><span class=mi>24</span><span class=o>*</span><span class=mi>60</span><span class=o>*</span><span class=mi>60</span><span class=p>))</span> <span class=c1># 2 days</span></code></pre></div></div></div><p>When you set <code>.withAllowedLateness</code> on a <code>PCollection [...] + <span class=n>allowed_lateness</span><span class=o>=</span><span class=n>Duration</span><span class=p>(</span><span class=n>seconds</span><span class=o>=</span><span class=mi>2</span><span class=o>*</span><span class=mi>24</span><span class=o>*</span><span class=mi>60</span><span class=o>*</span><span class=mi>60</span><span class=p>))</span> <span class=c1># 2 days</span></code></pre></div></div></div><div class="language-go snippet"><div class="notebook-skip code-snippet" [...] + <span class=nx>window</span><span class=p>.</span><span class=nf>NewFixedWindows</span><span class=p>(</span><span class=mi>1</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Minute</span><span class=p>),</span> <span class=nx>items</span><span class=p>,</span> + <span class=nx>beam</span><span class=p>.</span><span class=nf>AllowedLateness</span><span class=p>(</span><span class=mi>2</span><span class=o>*</span><span class=mi>24</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Hour</span><span class=p>),</span> <span class=c1>// 2 days +</span><span class=c1></span><span class=p>)</span></code></pre></div></div></div><p>When you set <code>.withAllowedLateness</code> on a <code>PCollection</code>, that allowed lateness propagates forward to any subsequent <code>PCollection</code> derived from the first <code>PCollection</code> you applied allowed lateness to. If you want to change the allowed lateness later in your pipeline, you must do so explicitly by applying @@ -2685,7 +2697,19 @@ with a <code>DoFn</code> to attach the timestamps to each element in your <code> <span class=c1># TimestampedValue.</span> <span class=k>yield</span> <span class=n>beam</span><span class=o>.</span><span class=n>window</span><span class=o>.</span><span class=n>TimestampedValue</span><span class=p>(</span><span class=n>element</span><span class=p>,</span> <span class=n>unix_timestamp</span><span class=p>)</span> -<span class=n>timestamped_items</span> <span class=o>=</span> <span class=n>items</span> <span class=o>|</span> <span class=s1>'timestamp'</span> <span class=o>>></span> <span class=n>beam</span><span class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span class=n>AddTimestampDoFn</span><span class=p>())</span></code></pre></div></div></div><h2 id=triggers>9. Triggers</h2><p>When collecting and grouping data into windows, Beam uses <strong>triggers</strong> to +<span class=n>timestamped_items</span> <span class=o>=</span> <span class=n>items</span> <span class=o>|</span> <span class=s1>'timestamp'</span> <span class=o>>></span> <span class=n>beam</span><span class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span class=n>AddTimestampDoFn</span><span class=p>())</span></code></pre></div></div></div><div class="language-go snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip [...] +</span><span class=c1></span><span class=kd>func</span> <span class=nf>AddTimestampDoFn</span><span class=p>(</span><span class=nx>element</span> <span class=nx>LogEntry</span><span class=p>,</span> <span class=nx>emit</span> <span class=kd>func</span><span class=p>(</span><span class=nx>beam</span><span class=p>.</span><span class=nx>EventTime</span><span class=p>,</span> <span class=nx>LogEntry</span><span class=p>))</span> <span class=p>{</span> + <span class=nx>et</span> <span class=o>:=</span> <span class=nf>extractEventTime</span><span class=p>(</span><span class=nx>element</span><span class=p>)</span> + <span class=c1>// Defining an emitter with beam.EventTime as the first parameter +</span><span class=c1></span> <span class=c1>// allows the DoFn to set the event time for the emitted element. +</span><span class=c1></span> <span class=nf>emit</span><span class=p>(</span><span class=nx>mtime</span><span class=p>.</span><span class=nf>FromTime</span><span class=p>(</span><span class=nx>et</span><span class=p>),</span> <span class=nx>element</span><span class=p>)</span> +<span class=p>}</span> + + + +<span class=c1>// Use the DoFn with ParDo as normal. +</span><span class=c1></span> +<span class=nx>stampedLogs</span> <span class=o>:=</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>ParDo</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=nx>AddTimestampDoFn</span><span class=p>,</span> <span class=nx>unstampedLogs</span><span class=p>)</span></code></pre></div></div></div><h2 id=triggers>9. Triggers</h2><span class=language-go><blockquote><p><strong>Note:</strong> The Trigger API in the Beam SDK for Go is curren [...] determine when to emit the aggregated results of each window (referred to as a <em>pane</em>). If you use Beam’s default windowing configuration and <a href=#default-trigger>default trigger</a>, Beam outputs the aggregated result when it @@ -2734,8 +2758,10 @@ firings:</p><div class="language-java snippet"><div class="notebook-skip code-sn <span class=o>.</span><span class=na>plusDuration</span><span class=o>(</span><span class=n>Duration</span><span class=o>.</span><span class=na>standardMinutes</span><span class=o>(</span><span class=n>1</span><span class=o>))</span> <span class=c1>// Fire on any late data so the bill can be corrected. </span><span class=c1></span> <span class=o>.</span><span class=na>withLateFirings</span><span class=o>(</span><span class=n>AfterPane</span><span class=o>.</span><span class=na>elementCountAtLeast</span><span class=o>(</span><span class=n>1</span><span class=o>))</span></code></pre></div></div></div><div class="language-py snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/im [...] - <span class=n>early</span><span class=o>=</span><span class=n>AfterProcessingTime</span><span class=p>(</span><span class=n>delay</span><span class=o>=</span><span class=mi>1</span> <span class=o>*</span> <span class=mi>60</span><span class=p>),</span> <span class=n>late</span><span class=o>=</span><span class=n>AfterCount</span><span class=p>(</span><span class=mi>1</span><span class=p>))</span></code></pre></div></div></div><div class="language-go snippet"><div class="notebook-skip [...] -<span class=nx>trigger</span> <span class=o>:=</span> <span class=nx>window</span><span class=p>.</span><span class=nf>TriggerAfterEndOfWindow</span><span class=p>().</span><span class=nf>EarlyFiring</span><span class=p>(</span><span class=nx>window</span><span class=p>.</span><span class=nf>TriggerAfterProcessingTime</span><span class=p>(</span><span class=mi>60000</span><span class=p>)).</span><span class=nf>LateFiring</span><span class=p>(</span><span class=nx>window</span><span class [...] + <span class=n>early</span><span class=o>=</span><span class=n>AfterProcessingTime</span><span class=p>(</span><span class=n>delay</span><span class=o>=</span><span class=mi>1</span> <span class=o>*</span> <span class=mi>60</span><span class=p>),</span> <span class=n>late</span><span class=o>=</span><span class=n>AfterCount</span><span class=p>(</span><span class=mi>1</span><span class=p>))</span></code></pre></div></div></div><div class="language-go snippet"><div class="notebook-skip [...] + <span class=nf>EarlyFiring</span><span class=p>(</span><span class=nx>window</span><span class=p>.</span><span class=nf>TriggerAfterProcessingTime</span><span class=p>().</span> + <span class=nf>PlusDelay</span><span class=p>(</span><span class=mi>60</span> <span class=o>*</span> <span class=nx>time</span><span class=p>.</span><span class=nx>Second</span><span class=p>)).</span> + <span class=nf>LateFiring</span><span class=p>(</span><span class=nx>window</span><span class=p>.</span><span class=nf>TriggerRepeat</span><span class=p>(</span><span class=nx>window</span><span class=p>.</span><span class=nf>TriggerAfterCount</span><span class=p>(</span><span class=mi>1</span><span class=p>)))</span></code></pre></div></div></div><h4 id=default-trigger>9.1.1. Default trigger</h4><p>The default trigger for a <code>PCollection</code> is based on event time, and emits the results of the window when the Beam’s watermark passes the end of the window, and then fires each time late data arrives.</p><p>However, if you are using both the default windowing configuration and the default trigger, the default trigger emits exactly once, and late data is @@ -2774,19 +2800,23 @@ when you use the <code>WindowInto</code> transform. This code sample sets a time trigger for a <code>PCollection</code>, which emits results one minute after the first element in that window has been processed. The <code>accumulation_mode</code> parameter sets the window’s <strong>accumulation mode</strong>.</p><p class=language-go>You set the trigger(s) for a <code>PCollection</code> by passing in the <code>beam.Trigger</code> parameter -when you use the <code>beam.WindowInto</code> transform. This code sample sets an Always -trigger for a <code>PCollection</code>, which emits results every time an element in that window has been processed. The <code>beam.AccumulationMode</code> parameter -sets the window’s <strong>accumulation mode</strong>.</p><div class="language-java snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-java data-lang=java> <span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>pc</span> <sp [...] +when you use the <code>beam.WindowInto</code> transform. This code sample sets a time-based +trigger for a <code>PCollection</code>, which emits results one minute after the first +element in that window has been processed. +The <code>beam.AccumulationMode</code> parameter sets the window’s <strong>accumulation mode</strong>.</p><div class="language-java snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-java data-lang=java> <span class=n>PCollection</span><span class=o><</span><span class=n>String</span><spa [...] <span class=n>pc</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>Window</span><span class=o>.<</span><span class=n>String</span><span class=o>></span><span class=n>into</span><span class=o>(</span><span class=n>FixedWindows</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>1</span><span class=o>,</span> <span class=n>TimeUnit</span><span class=o>.</span><span class=na>MINUTES</span><span class=o>))</span> <span class=o>.</span><span class=na>triggering</span><span class=o>(</span><span class=n>AfterProcessingTime</span><span class=o>.</span><span class=na>pastFirstElementInPane</span><span class=o>()</span> <span class=o>.</span><span class=na>plusDelayOf</span><span class=o>(</span><span class=n>Duration</span><span class=o>.</span><span class=na>standardMinutes</span><span class=o>(</span><span class=n>1</span><span class=o>)))</span> <span class=o>.</span><span class=na>discardingFiredPanes</span><span class=o>());</span></code></pre></div></div></div><div class="language-py snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-py data-lang=py> <span class=n>pcollection</span> <span class=o>|</span> [...] <span class=n>FixedWindows</span><span class=p>(</span><span class=mi>1</span> <span class=o>*</span> <span class=mi>60</span><span class=p>),</span> <span class=n>trigger</span><span class=o>=</span><span class=n>AfterProcessingTime</span><span class=p>(</span><span class=mi>1</span> <span class=o>*</span> <span class=mi>60</span><span class=p>),</span> - <span class=n>accumulation_mode</span><span class=o>=</span><span class=n>AccumulationMode</span><span class=o>.</span><span class=n>DISCARDING</span><span class=p>)</span></code></pre></div></div></div><div class="language-go snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-go data-lang=go> -<span class=nx>windowSize</span> <span class=o>:=</span> <span class=mi>10</span> <span class=o>*</span> <span class=nx>time</span><span class=p>.</span><span class=nx>Second</span> -<span class=nx>trigger</span> <span class=o>:=</span> <span class=nx>window</span><span class=p>.</span><span class=nf>TriggerAlways</span><span class=p>()</span> -<span class=nx>beam</span><span class=p>.</span><span class=nf>WindowInto</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=nx>window</span><span class=p>.</span><span class=nf>NewFixedWindows</span><span class=p>(</span><span class=nx>windowSize</span><span class=p>),</span> <span class=nx>pCollection</span><span class=p>,</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>Trigger</span><span class=p>(</span><span class=nx>trigger</ [...] + <span class=n>accumulation_mode</span><span class=o>=</span><span class=n>AccumulationMode</span><span class=o>.</span><span class=n>DISCARDING</span><span class=p>)</span></code></pre></div></div></div><div class="language-go snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-go data-lang=go>< [...] + <span class=nx>window</span><span class=p>.</span><span class=nf>NewFixedWindows</span><span class=p>(</span><span class=mi>1</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Minute</span><span class=p>),</span> <span class=nx>pcollection</span><span class=p>,</span> + <span class=nx>beam</span><span class=p>.</span><span class=nf>Trigger</span><span class=p>(</span><span class=nx>window</span><span class=p>.</span><span class=nf>TriggerAfterProcessingTime</span><span class=p>().</span> + <span class=nf>PlusDelay</span><span class=p>(</span><span class=mi>1</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Minute</span><span class=p>)),</span> + <span class=nx>beam</span><span class=p>.</span><span class=nf>AllowedLateness</span><span class=p>(</span><span class=mi>30</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Minute</span><span class=p>),</span> + <span class=nx>beam</span><span class=p>.</span><span class=nf>PanesDiscard</span><span class=p>(),</span> +<span class=p>)</span></code></pre></div></div></div><h4 id=window-accumulation-modes>9.4.1. Window accumulation modes</h4><p>When you specify a trigger, you must also set the the window’s <strong>accumulation mode</strong>. When a trigger fires, it emits the current contents of the window as a pane. Since a trigger can fire multiple times, the accumulation mode determines whether the system <em>accumulates</em> the window panes as the trigger fires, or @@ -2818,8 +2848,10 @@ on each firing:</p><pre><code> First trigger firing: [5, 8, 3] passes the end of the window, you can apply an <em>allowed lateness</em> when you set your windowing configuration. This gives your trigger the opportunity to react to the late data. If allowed lateness is set, the default trigger will emit new -results immediately whenever late data arrives.</p><p>You set the allowed lateness by using <code>.withAllowedLateness()</code> when you set your -windowing function:</p><div class="language-java snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-java data-lang=java> <span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>pc</span> <span class=o>=</span> <span class=o>...;</span> +results immediately whenever late data arrives.</p><p>You set the allowed lateness by using <span class=language-java><code>.withAllowedLateness()</code></span> +<span class=language-py><code>allowed_lateness</code></span> +<span class=language-go><code>beam.AllowedLateness()</code></span> +when you set your windowing function:</p><div class="language-java snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-java data-lang=java> <span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>pc</span> <span class=o>=</span> <s [...] <span class=n>pc</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>Window</span><span class=o>.<</span><span class=n>String</span><span class=o>></span><span class=n>into</span><span class=o>(</span><span class=n>FixedWindows</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>1</span><span class=o>,</span> <span class=n>TimeUnit</span><span class=o>.</span><span class=na>MINUTES</span><span class=o>))</span> <span class=o>.</span><span class=na>triggering</span><span class=o>(</span><span class=n>AfterProcessingTime</span><span class=o>.</span><span class=na>pastFirstElementInPane</span><span class=o>()</span> <span class=o>.</span><span class=na>plusDelayOf</span><span class=o>(</span><span class=n>Duration</span><span class=o>.</span><span class=na>standardMinutes</span><span class=o>(</span><span class=n>1</span><span class=o>)))</span> @@ -2828,10 +2860,18 @@ windowing function:</p><div class="language-java snippet"><div class="notebook-s <span class=n>FixedWindows</span><span class=p>(</span><span class=mi>60</span><span class=p>),</span> <span class=n>trigger</span><span class=o>=</span><span class=n>AfterProcessingTime</span><span class=p>(</span><span class=mi>60</span><span class=p>),</span> <span class=n>allowed_lateness</span><span class=o>=</span><span class=mi>1800</span><span class=p>)</span> <span class=c1># 30 minutes</span> - <span class=o>|</span> <span class=o>...</span></code></pre></div></div></div><p>This allowed lateness propagates to all <code>PCollection</code>s derived as a result of + <span class=o>|</span> <span class=o>...</span></code></pre></div></div></div><div class="language-go snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-go data-lang=go><span class=nx>allowedToBeLateItems</span> <span class=o>:=</span> <span class=nx>beam</span><span class=p>.</span><span clas [...] + <span class=nx>window</span><span class=p>.</span><span class=nf>NewFixedWindows</span><span class=p>(</span><span class=mi>1</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Minute</span><span class=p>),</span> <span class=nx>pcollection</span><span class=p>,</span> + <span class=nx>beam</span><span class=p>.</span><span class=nf>Trigger</span><span class=p>(</span><span class=nx>window</span><span class=p>.</span><span class=nf>TriggerAfterProcessingTime</span><span class=p>().</span> + <span class=nf>PlusDelay</span><span class=p>(</span><span class=mi>1</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Minute</span><span class=p>)),</span> + <span class=nx>beam</span><span class=p>.</span><span class=nf>AllowedLateness</span><span class=p>(</span><span class=mi>30</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Minute</span><span class=p>),</span> +<span class=p>)</span></code></pre></div></div></div><p>This allowed lateness propagates to all <code>PCollection</code>s derived as a result of applying transforms to the original <code>PCollection</code>. If you want to change the allowed lateness later in your pipeline, you can apply -<code>Window.configure().withAllowedLateness()</code> again, explicitly.</p><h3 id=composite-triggers>9.5. Composite triggers</h3><p>You can combine multiple triggers to form <strong>composite triggers</strong>, and can +<span class=language-java><code>Window.configure().withAllowedLateness()</code></span> +<span class=language-py><code>allowed_lateness</code></span> +<span class=language-go><code>beam.AllowedLateness()</code></span> +again, explicitly.</p><h3 id=composite-triggers>9.5. Composite triggers</h3><p>You can combine multiple triggers to form <strong>composite triggers</strong>, and can specify a trigger to emit results repeatedly, at most once, or under other custom conditions.</p><h4 id=composite-trigger-types>9.5.1. Composite trigger types</h4><p>Beam includes the following composite triggers:</p><ul><li>You can add additional early firings or late firings to <code>AfterWatermark.pastEndOfWindow</code> via <code>.withEarlyFirings</code> and @@ -2864,7 +2904,13 @@ trigger stops executing</li></ul></p><div class="language-java snippet"><div cla <span class=n>FixedWindows</span><span class=p>(</span><span class=mi>1</span> <span class=o>*</span> <span class=mi>60</span><span class=p>),</span> <span class=n>trigger</span><span class=o>=</span><span class=n>AfterWatermark</span><span class=p>(</span><span class=n>late</span><span class=o>=</span><span class=n>AfterProcessingTime</span><span class=p>(</span><span class=mi>10</span> <span class=o>*</span> <span class=mi>60</span><span class=p>)),</span> <span class=n>allowed_lateness</span><span class=o>=</span><span class=mi>10</span><span class=p>,</span> - <span class=n>accumulation_mode</span><span class=o>=</span><span class=n>AccumulationMode</span><span class=o>.</span><span class=n>DISCARDING</span><span class=p>)</span></code></pre></div></div></div><h4 id=other-composite-triggers>9.5.3. Other composite triggers</h4><p>You can also build other sorts of composite triggers. The following example code + <span class=n>accumulation_mode</span><span class=o>=</span><span class=n>AccumulationMode</span><span class=o>.</span><span class=n>DISCARDING</span><span class=p>)</span></code></pre></div></div></div><div class="language-go snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-go data-lang=go>< [...] + <span class=nx>window</span><span class=p>.</span><span class=nf>NewFixedWindows</span><span class=p>(</span><span class=mi>1</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Minute</span><span class=p>),</span> <span class=nx>pcollection</span><span class=p>,</span> + <span class=nx>beam</span><span class=p>.</span><span class=nf>Trigger</span><span class=p>(</span><span class=nx>window</span><span class=p>.</span><span class=nf>TriggerAfterEndOfWindow</span><span class=p>().</span> + <span class=nf>LateFiring</span><span class=p>(</span><span class=nx>window</span><span class=p>.</span><span class=nf>TriggerAfterProcessingTime</span><span class=p>().</span> + <span class=nf>PlusDelay</span><span class=p>(</span><span class=mi>10</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Minute</span><span class=p>))),</span> + <span class=nx>beam</span><span class=p>.</span><span class=nf>AllowedLateness</span><span class=p>(</span><span class=mi>2</span><span class=o>*</span><span class=mi>24</span><span class=o>*</span><span class=nx>time</span><span class=p>.</span><span class=nx>Hour</span><span class=p>),</span> +<span class=p>)</span></code></pre></div></div></div><h4 id=other-composite-triggers>9.5.3. Other composite triggers</h4><p>You can also build other sorts of composite triggers. The following example code shows a simple composite trigger that fires whenever the pane has at least 100 elements, or after a minute.</p><div class="language-java snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-java data-lang=java> <span class=n>Repeatedly</span><span class=o>.</span><span class=na>forever</span><span class=o>(</span><span class=n>AfterFirst</span><span class=o>.</span><span class [...] <span class=n>AfterPane</span><span class=o>.</span><span class=na>elementCountAtLeast</span><span class=o>(</span><span class=n>100</span><span class=o>),</span> @@ -4131,7 +4177,7 @@ expansionAddr := "localhost:8097" outT := beam.UnnamedOutput(typex.New(reflectx.String)) res := beam.CrossLanguage(s, urn, payload, expansionAddr, beam.UnnamedInput(inputPCol), outT) </code></pre></div></div></li><li><p>After the job has been submitted to the Beam runner, shutdown the expansion service by -terminating the expansion service process.</p></li></ol><h3 id=x-lang-transform-runner-support>13.3. Runner Support</h3><p>Currently, portable runners such as Flink, Spark, and the Direct runner can be used with multi-language pipelines.</p><p>Google Cloud Dataflow supports multi-language pipelines through the Dataflow Runner v2 backend architecture.</p><div class=feedback><p class=update>Last updated on 2021/10/01</p><h3>Have you found everything you were looking for?</h3><p class=descr [...] +terminating the expansion service process.</p></li></ol><h3 id=x-lang-transform-runner-support>13.3. Runner Support</h3><p>Currently, portable runners such as Flink, Spark, and the Direct runner can be used with multi-language pipelines.</p><p>Google Cloud Dataflow supports multi-language pipelines through the Dataflow Runner v2 backend architecture.</p><div class=feedback><p class=update>Last updated on 2021/10/04</p><h3>Have you found everything you were looking for?</h3><p class=descr [...] <a href=http://www.apache.org>The Apache Software Foundation</a> | <a href=/privacy_policy>Privacy Policy</a> | <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div></div></div></footer></body></html> \ No newline at end of file diff --git a/website/generated-content/get-started/index.xml b/website/generated-content/get-started/index.xml index df139c5..bd3de12 100644 --- a/website/generated-content/get-started/index.xml +++ b/website/generated-content/get-started/index.xml @@ -152,7 +152,7 @@ looks more like what is depicted by the red squiggly line above the ideal line.& </span><span class="s2"> &#34;&#34;&#34;</span> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">field</span><span class="p">):</span> <span class="c1"># TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.</span> -<span class="c1"># super(ExtractAndSumScore, self).__init__()</span> +<span class="c1"># super().__init__()</span> <span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">field</span> <span class="o">=</span> <span class="n">field</span> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> @@ -392,7 +392,7 @@ logical windows based on when those scores occurred in event time.</em></p <div class="highlight"><pre class="chroma"><code class="language-py" data-lang="py"><span class="k">class</span> <span class="nc">HourlyTeamScore</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">start_min</span><span class="p">,</span> <span class="n">stop_min</span><span class="p">,</span> <span class="n">window_duration</span><span class="p">):</span> <span class="c1"># TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.</span> -<span class="c1"># super(HourlyTeamScore, self).__init__()</span> +<span class="c1"># super().__init__()</span> <span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_timestamp</span> <span class="o">=</span> <span class="n">str2timestamp</span><span class="p">(</span><span class="n">start_min</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">stop_timestamp</span> <span class="o">=</span> <span class="n">str2timestamp</span><span class="p">(</span><span class="n">stop_min</span><span class="p">)</span> @@ -578,7 +578,7 @@ ten minutes after data is received.</em></p> </span><span class="s2"> &#34;&#34;&#34;</span> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">allowed_lateness</span><span class="p">):</span> <span class="c1"># TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.</span> -<span class="c1"># super(CalculateUserScores, self).__init__()</span> +<span class="c1"># super().__init__()</span> <span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">allowed_lateness_seconds</span> <span class="o">=</span> <span class="n">allowed_lateness</span> <span class="o">*</span> <span class="mi">60</span> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> @@ -662,7 +662,7 @@ late results.</em></p> </span><span class="s2"> &#34;&#34;&#34;</span> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">team_window_duration</span><span class="p">,</span> <span class="n">allowed_lateness</span><span class="p">):</span> <span class="c1"># TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.</span> -<span class="c1"># super(CalculateTeamScores, self).__init__()</span> +<span class="c1"># super().__init__()</span> <span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">team_window_duration</span> <span class="o">=</span> <span class="n">team_window_duration</span> <span class="o">*</span> <span class="mi">60</span> <span class="bp">self</span><span class="o">.</span><span class="n">allowed_lateness_seconds</span> <span class="o">=</span> <span class="n">allowed_lateness</span> <span class="o">*</span> <span class="mi">60</span> diff --git a/website/generated-content/get-started/mobile-gaming-example/index.html b/website/generated-content/get-started/mobile-gaming-example/index.html index b8f04d2..d1cfe1a 100644 --- a/website/generated-content/get-started/mobile-gaming-example/index.html +++ b/website/generated-content/get-started/mobile-gaming-example/index.html @@ -49,7 +49,7 @@ looks more like what is depicted by the red squiggly line above the ideal line.< </span><span class=s2> """</span> <span class=k>def</span> <span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>field</span><span class=p>):</span> <span class=c1># TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.</span> - <span class=c1># super(ExtractAndSumScore, self).__init__()</span> + <span class=c1># super().__init__()</span> <span class=n>beam</span><span class=o>.</span><span class=n>PTransform</span><span class=o>.</span><span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>)</span> <span class=bp>self</span><span class=o>.</span><span class=n>field</span> <span class=o>=</span> <span class=n>field</span> @@ -179,7 +179,7 @@ logical windows based on when those scores occurred in event time.</em></p><p>No <span class=o>}</span></code></pre></div></div></div><div class="language-py snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=k>class</span> <span class=nc>HourlyTeamScore</span><span class=p>(</span><span class=n>beam</span><span class=o>.</span><span class=n>PTransfo [...] <span class=k>def</span> <span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>start_min</span><span class=p>,</span> <span class=n>stop_min</span><span class=p>,</span> <span class=n>window_duration</span><span class=p>):</span> <span class=c1># TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.</span> - <span class=c1># super(HourlyTeamScore, self).__init__()</span> + <span class=c1># super().__init__()</span> <span class=n>beam</span><span class=o>.</span><span class=n>PTransform</span><span class=o>.</span><span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>)</span> <span class=bp>self</span><span class=o>.</span><span class=n>start_timestamp</span> <span class=o>=</span> <span class=n>str2timestamp</span><span class=p>(</span><span class=n>start_min</span><span class=p>)</span> <span class=bp>self</span><span class=o>.</span><span class=n>stop_timestamp</span> <span class=o>=</span> <span class=n>str2timestamp</span><span class=p>(</span><span class=n>stop_min</span><span class=p>)</span> @@ -328,7 +328,7 @@ ten minutes after data is received.</em></p><p>As processing time advances and m </span><span class=s2> """</span> <span class=k>def</span> <span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>allowed_lateness</span><span class=p>):</span> <span class=c1># TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.</span> - <span class=c1># super(CalculateUserScores, self).__init__()</span> + <span class=c1># super().__init__()</span> <span class=n>beam</span><span class=o>.</span><span class=n>PTransform</span><span class=o>.</span><span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>)</span> <span class=bp>self</span><span class=o>.</span><span class=n>allowed_lateness_seconds</span> <span class=o>=</span> <span class=n>allowed_lateness</span> <span class=o>*</span> <span class=mi>60</span> @@ -387,7 +387,7 @@ late results.</em></p><p>The dotted line in the diagram is the “ideal&rdqu </span><span class=s2> """</span> <span class=k>def</span> <span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>team_window_duration</span><span class=p>,</span> <span class=n>allowed_lateness</span><span class=p>):</span> <span class=c1># TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.</span> - <span class=c1># super(CalculateTeamScores, self).__init__()</span> + <span class=c1># super().__init__()</span> <span class=n>beam</span><span class=o>.</span><span class=n>PTransform</span><span class=o>.</span><span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>)</span> <span class=bp>self</span><span class=o>.</span><span class=n>team_window_duration</span> <span class=o>=</span> <span class=n>team_window_duration</span> <span class=o>*</span> <span class=mi>60</span> <span class=bp>self</span><span class=o>.</span><span class=n>allowed_lateness_seconds</span> <span class=o>=</span> <span class=n>allowed_lateness</span> <span class=o>*</span> <span class=mi>60</span> diff --git a/website/generated-content/sitemap.xml b/website/generated-content/sitemap.xml index 980082e..4816d58 100644 --- a/website/generated-content/sitemap.xml +++ b/website/generated-content/sitemap.xml @@ -1 +1 @@ -<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9" xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>/blog/beam-2.32.0/</loc><lastmod>2021-09-16T12:21:14-07:00</lastmod></url><url><loc>/categories/blog/</loc><lastmod>2021-09-16T12:21:14-07:00</lastmod></url><url><loc>/blog/</loc><lastmod>2021-09-16T12:21:14-07:00</lastmod></url><url><loc>/categories/</loc><lastmod>2021-09-17T14:05:48-07:00</lastmod></url><url><loc>/blog/b [...] \ No newline at end of file +<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9" xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>/blog/beam-2.32.0/</loc><lastmod>2021-09-16T12:21:14-07:00</lastmod></url><url><loc>/categories/blog/</loc><lastmod>2021-09-16T12:21:14-07:00</lastmod></url><url><loc>/blog/</loc><lastmod>2021-09-16T12:21:14-07:00</lastmod></url><url><loc>/categories/</loc><lastmod>2021-09-17T14:05:48-07:00</lastmod></url><url><loc>/blog/b [...] \ No newline at end of file