This is an automated email from the ASF dual-hosted git repository. mergebot-role pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/beam-site.git
commit 5bd80e6ef06e2f0c9920b9477481307c27499361 Author: Mergebot <merge...@apache.org> AuthorDate: Mon Aug 28 21:38:09 2017 +0000 Prepare repository for deployment. --- content/blog/index.html | 37 ++++ content/feed.xml | 565 +++++++++++++++++++++++++++++++++++++++++++----- content/index.html | 10 +- 3 files changed, 556 insertions(+), 56 deletions(-) diff --git a/content/blog/index.html b/content/blog/index.html index ab0127d..8d9fc46 100644 --- a/content/blog/index.html +++ b/content/blog/index.html @@ -146,6 +146,43 @@ <p>This is the blog for the Apache Beam project. This blog contains news and updates for the project.</p> +<h3 id="a-classpost-link-hrefblog20170828timely-processinghtmltimely-and-stateful-processing-with-apache-beama"><a class="post-link" href="/blog/2017/08/28/timely-processing.html">Timely (and Stateful) Processing with Apache Beam</a></h3> +<p><i>Aug 28, 2017 • Kenneth Knowles [<a href="https://twitter.com/KennKnowles">@KennKnowles</a>] +</i></p> + +<p>In a <a href="/blog/2017/02/13/stateful-processing.html">prior blog +post</a>, I +introduced the basics of stateful processing in Apache Beam, focusing on the +addition of state to per-element processing. So-called <em>timely</em> processing +complements stateful processing in Beam by letting you set timers to request a +(stateful) callback at some point in the future.</p> + +<p>What can you do with timers in Beam? Here are some examples:</p> + +<ul> + <li>You can output data buffered in state after some amount of processing time.</li> + <li>You can take special action when the watermark estimates that you have +received all data up to a specified point in event time.</li> + <li>You can author workflows with timeouts that alter state and emit output in +response to the absence of additional input for some period of time.</li> +</ul> + +<p>These are just a few possibilities. State and timers together form a powerful +programming paradigm for fine-grained control to express a huge variety of +workflows. Stateful and timely processing in Beam is portable across data +processing engines and integrated with Beam’s unified model of event time +windowing in both streaming and batch processing.</p> + +<!-- Render a "read more" button if the post is longer than the excerpt --> + +<p> +<a class="btn btn-default btn-sm" href="/blog/2017/08/28/timely-processing.html" role="button"> +Read more <span class="glyphicon glyphicon-menu-right" aria-hidden="true"></span> +</a> +</p> + +<hr /> + <h3 id="a-classpost-link-hrefblog20170816splittable-do-fnhtmlpowerful-and-modular-io-connectors-with-splittable-dofn-in-apache-beama"><a class="post-link" href="/blog/2017/08/16/splittable-do-fn.html">Powerful and modular IO connectors with Splittable DoFn in Apache Beam</a></h3> <p><i>Aug 16, 2017 • Eugene Kirpichov </i></p> diff --git a/content/feed.xml b/content/feed.xml index 5e3ee2a..eb789b8 100644 --- a/content/feed.xml +++ b/content/feed.xml @@ -9,6 +9,520 @@ <generator>Jekyll v3.2.0</generator> <item> + <title>Timely (and Stateful) Processing with Apache Beam</title> + <description><p>In a <a href="/blog/2017/02/13/stateful-processing.html">prior blog +post</a>, I +introduced the basics of stateful processing in Apache Beam, focusing on the +addition of state to per-element processing. So-called <em>timely</em> processing +complements stateful processing in Beam by letting you set timers to request a +(stateful) callback at some point in the future.</p> + +<p>What can you do with timers in Beam? Here are some examples:</p> + +<ul> + <li>You can output data buffered in state after some amount of processing time.</li> + <li>You can take special action when the watermark estimates that you have +received all data up to a specified point in event time.</li> + <li>You can author workflows with timeouts that alter state and emit output in +response to the absence of additional input for some period of time.</li> +</ul> + +<p>These are just a few possibilities. State and timers together form a powerful +programming paradigm for fine-grained control to express a huge variety of +workflows. Stateful and timely processing in Beam is portable across data +processing engines and integrated with Beam’s unified model of event time +windowing in both streaming and batch processing.</p> + +<!--more--> + +<h2 id="what-is-stateful-and-timely-processing">What is stateful and timely processing?</h2> + +<p>In my prior post, I developed an understanding of stateful processing largely +by contrast with associative, commutative combiners. In this post, I’ll +emphasize a perspective that I had mentioned only briefly: that elementwise +processing with access to per-key-and-window state and timers represents a +fundamental pattern for “embarrassingly parallel” computation, distinct from +the others in Beam.</p> + +<p>In fact, stateful and timely computation is the low-level computational pattern +that underlies the others. Precisely because it is lower level, it allows you +to really micromanage your computations to unlock new use cases and new +efficiencies. This incurs the complexity of manually managing your state and +timers - it isn’t magic! Let’s first look again at the two primary +computational patterns in Beam.</p> + +<h3 id="element-wise-processing-pardo-map-etc">Element-wise processing (ParDo, Map, etc)</h3> + +<p>The most elementary embarrassingly parallel pattern is just using a bunch of +computers to apply the same function to every input element of a massive +collection. In Beam, per-element processing like this is expressed as a basic +<code class="highlighter-rouge">ParDo</code> - analogous to “Map” from MapReduce - which is like an enhanced “map”, +“flatMap”, etc, from functional programming.</p> + +<p>The following diagram illustrates per-element processing. Input elements are +squares, output elements are triangles. The colors of the elements represent +their key, which will matter later. Each input element maps to the +corresponding output element(s) completely independently. Processing may be +distributed across computers in any way, yielding essentially limitless +parallelism.</p> + +<p><img class="center-block" src="/images/blog/timely-processing/ParDo.png" alt="ParDo offers limitless parallelism" width="600" /></p> + +<p>This pattern is obvious, exists in all data-parallel paradigms, and has +a simple stateless implementation. Every input element can be processed +independently or in arbitrary bundles. Balancing the work between computers is +actually the hard part, and can be addressed by splitting, progress estimation, +work-stealing, etc.</p> + +<h3 id="per-key-and-window-aggregation-combine-reduce-groupbykey-etc">Per-key (and window) aggregation (Combine, Reduce, GroupByKey, etc.)</h3> + +<p>The other embarassingly parallel design pattern at the heart of Beam is per-key +(and window) aggregation. Elements sharing a key are colocated and then +combined using some associative and commutative operator. In Beam this is +expressed as a <code class="highlighter-rouge">GroupByKey</code> or <code class="highlighter-rouge">Combine.perKey</code>, and corresponds to the shuffle +and “Reduce” from MapReduce. It is sometimes helpful to think of per-key +<code class="highlighter-rouge">Combine</code> as the fundamental operation, and raw <code class="highlighter-rouge">GroupByKey</code> as a combiner that +just concatenates input elements. The communication pattern for the input +elements is the same, modulo some optimizations possible for <code class="highlighter-rouge">Combine</code>.</p> + +<p>In the illustration here, recall that the color of each element represents the +key. So all of the red squares are routed to the same location where they are +aggregated and the red triangle is the output. Likewise for the yellow and +green squares, etc. In a real application, you may have millions of keys, so +the parallelism is still massive.</p> + +<p><img class="center-block" src="/images/blog/timely-processing/CombinePerKey.png" alt="Gathering elements per key then combining them" width="600" /></p> + +<p>The underlying data processing engine will, at some level of abstraction, use +state to perform this aggregation across all the elements arriving for a key. +In particular, in a streaming execution, the aggregation process may need to +wait for more data to arrive or for the watermark to estimate that all input +for an event time window is complete. This requires some way to store the +intermediate aggregation between input elements as well a way to a receive a +callback when it is time to emit the result. As a result, the <em>execution</em> of +per key aggregation by a stream processing engine fundamentally involves state +and timers.</p> + +<p>However, <em>your</em> code is just a declarative expression of the aggregation +operator. The runner can choose a variety of ways to execute your operator. +I went over this in detail in <a href="/blog/2017/02/13/stateful-processing.html">my prior post focused on state alone</a>. Since you do not +observe elements in any defined order, nor manipulate mutable state or timers +directly, I call this neither stateful nor timely processing.</p> + +<h3 id="per-key-and-window-stateful-timely-processing">Per-key-and-window stateful, timely processing</h3> + +<p>Both <code class="highlighter-rouge">ParDo</code> and <code class="highlighter-rouge">Combine.perKey</code> are standard patterns for parallelism that go +back decades. When implementing these in a massive-scale distributed data +processing engine, we can highlight a few characteristics that are particularly +important.</p> + +<p>Let us consider these characteristics of <code class="highlighter-rouge">ParDo</code>:</p> + +<ul> + <li>You write single-threaded code to process one element.</li> + <li>Elements are processed in arbitrary order with no dependencies +or interaction between processing of elements.</li> +</ul> + +<p>And these characteristics for <code class="highlighter-rouge">Combine.perKey</code>:</p> + +<ul> + <li>Elements for a common key and window are gathered together.</li> + <li>A user-defined operator is applied to those elements.</li> +</ul> + +<p>Combining some of the characteristics of unrestricted parallel mapping and +per-key-and-window combination, we can discern a megaprimitive from which we +build stateful and timely processing:</p> + +<ul> + <li>Elements for a common key and window are gathered together.</li> + <li>Elements are processed in arbitrary order.</li> + <li>You write single-threaded code to process one element or timer, possibly +accessing state or setting timers.</li> +</ul> + +<p>In the illustration below, the red squares are gathered and fed one by one to +the stateful, timely, <code class="highlighter-rouge">DoFn</code>. As each element is processed, the <code class="highlighter-rouge">DoFn</code> has +access to state (the color-partitioned cylinder on the right) and can set +timers to receive callbacks (the colorful clocks on the left).</p> + +<p><img class="center-block" src="/images/blog/timely-processing/StateAndTimers.png" alt="Gathering elements per key then timely, stateful processing" width="600" /></p> + +<p>So that is the abstract notion of per-key-and-window stateful, timely +processing in Apache Beam. Now let’s see what it looks like to write code that +accesses state, sets timers, and receives callbacks.</p> + +<h2 id="example-batched-rpc">Example: Batched RPC</h2> + +<p>To demonstrate stateful and timely processing, let’s work through a concrete +example, with code.</p> + +<p>Suppose you are writing a system to analyze events. You have a ton of data +coming in and you need to enrich each event by RPC to an external system. You +can’t just issue an RPC per event. Not only would this be terrible for +performance, but it would also likely blow your quota with the external system. +So you’d like to gather a number of events, make one RPC for them all, and then +output all the enriched events.</p> + +<h3 id="state">State</h3> + +<p>Let’s set up the state we need to track batches of elements. As each element +comes in, we will write the element to a buffer while tracking the number of +elements we have buffered. Here are the state cells in code:</p> + +<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">Event</span><span class="o">,</span> <span class="n">EnrichedEvent</span><span class="o">&gt;()</span> <span class=&qu [...] + + <span class="nd">@StateId</span><span class="o">(</span><span class="s">"buffer"</span><span class="o">)</span> + <span class="kd">private</span> <span class="kd">final</span> <span class="n">StateSpec</span><span class="o">&lt;</span><span class="n">BagState</span><span class="o">&lt;</span><span class="n">Event</span><span class="o">&gt;&gt;</span> <span class="n">bufferedEvents</span> [...] + + <span class="nd">@StateId</span><span class="o">(</span><span class="s">"count"</span><span class="o">)</span> + <span class="kd">private</span> <span class="kd">final</span> <span class="n">StateSpec</span><span class="o">&lt;</span><span class="n">ValueState</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">countState</span> [...] + + <span class="err">…</span> <span class="n">TBD</span> <span class="err">…</span> +<span class="o">}</span> +</code></pre> +</div> + +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># State and timers are not yet supported in Beam's Python SDK.</span> +<span class="c"># Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.</span> +</code></pre> +</div> + +<p>Walking through the code, we have:</p> + +<ul> + <li>The state cell <code class="highlighter-rouge">"buffer"</code> is an unordered bag of buffered events.</li> + <li>The state cell <code class="highlighter-rouge">"count"</code> tracks how many events have been buffered.</li> +</ul> + +<p>Next, as a recap of reading and writing state, let’s write our <code class="highlighter-rouge">@ProcessElement</code> +method. We will choose a limit on the size of the buffer, <code class="highlighter-rouge">MAX_BUFFER_SIZE</code>. If +our buffer reaches this size, we will perform a single RPC to enrich all the +events, and output.</p> + +<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">Event</span><span class="o">,</span> <span class="n">EnrichedEvent</span><span class="o">&gt;()</span> <span class=&qu [...] + + <span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kt">int</span> <span class="n">MAX_BUFFER_SIZE</span> <span class="o">=</span> <span class="mi">500</span><span class="o">;</span> + + <span class="nd">@StateId</span><span class="o">(</span><span class="s">"buffer"</span><span class="o">)</span> + <span class="kd">private</span> <span class="kd">final</span> <span class="n">StateSpec</span><span class="o">&lt;</span><span class="n">BagState</span><span class="o">&lt;</span><span class="n">Event</span><span class="o">&gt;&gt;</span> <span class="n">bufferedEvents</span> [...] + + <span class="nd">@StateId</span><span class="o">(</span><span class="s">"count"</span><span class="o">)</span> + <span class="kd">private</span> <span class="kd">final</span> <span class="n">StateSpec</span><span class="o">&lt;</span><span class="n">ValueState</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">countState</span> [...] + + <span class="nd">@ProcessElement</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span> + <span class="n">ProcessContext</span> <span class="n">context</span><span class="o">,</span> + <span class="nd">@StateId</span><span class="o">(</span><span class="s">"buffer"</span><span class="o">)</span> <span class="n">BagState</span><span class="o">&lt;</span><span class="n">Event</span><span class="o">&gt;</span> <span class="n">bufferState</span><span c [...] + <span class="nd">@StateId</span><span class="o">(</span><span class="s">"count"</span><span class="o">)</span> <span class="n">ValueState</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">countState</span><span [...] + + <span class="kt">int</span> <span class="n">count</span> <span class="o">=</span> <span class="n">firstNonNull</span><span class="o">(</span><span class="n">countState</span><span class="o">.</span><span class="na">read</span><span class="o">(),</span> <span class="mi">0< [...] + <span class="n">count</span> <span class="o">=</span> <span class="n">count</span> <span class="o">+</span> <span class="mi">1</span><span class="o">;</span> + <span class="n">countState</span><span class="o">.</span><span class="na">write</span><span class="o">(</span><span class="n">count</span><span class="o">);</span> + <span class="n">bufferState</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">context</span><span class="o">.</span><span class="na">element</span><span class="o">());</span> + + <span class="k">if</span> <span class="o">(</span><span class="n">count</span> <span class="o">&gt;</span> <span class="n">MAX_BUFFER_SIZE</span><span class="o">)</span> <span class="o">{</span> + <span class="k">for</span> <span class="o">(</span><span class="n">EnrichedEvent</span> <span class="n">enrichedEvent</span> <span class="o">:</span> <span class="n">enrichEvents</span><span class="o">(</span><span class="n">bufferState</span><span class="o">.</span><span class=" [...] + <span class="n">context</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">enrichedEvent</span><span class="o">);</span> + <span class="o">}</span> + <span class="n">bufferState</span><span class="o">.</span><span class="na">clear</span><span class="o">();</span> + <span class="n">countState</span><span class="o">.</span><span class="na">clear</span><span class="o">();</span> + <span class="o">}</span> + <span class="o">}</span> + + <span class="err">…</span> <span class="n">TBD</span> <span class="err">…</span> +<span class="o">}</span> +</code></pre> +</div> + +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># State and timers are not yet supported in Beam's Python SDK.</span> +<span class="c"># Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.</span> +</code></pre> +</div> + +<p>Here is an illustration to accompany the code:</p> + +<p><img class="center-block" src="/images/blog/timely-processing/BatchedRpcState.png" alt="Batching elements in state, then performing RPCs" width="600" /></p> + +<ul> + <li>The blue box is the <code class="highlighter-rouge">DoFn</code>.</li> + <li>The yellow box within it is the <code class="highlighter-rouge">@ProcessElement</code> method.</li> + <li>Each input event is a red square - this diagram just shows the activity for +a single key, represented by the color red. Your <code class="highlighter-rouge">DoFn</code> will run the same +workflow in parallel for all keys which are perhaps user IDs.</li> + <li>Each input event is written to the buffer as a red triangle, representing +the fact that you might actually buffer more than just the raw input, even +though this code doesn’t.</li> + <li>The external service is drawn as a cloud. When there are enough buffered +events, the <code class="highlighter-rouge">@ProcessElement</code> method reads the events from state and issues +a single RPC.</li> + <li>Each output enriched event is drawn as a red circle. To consumers of this +output, it looks just like an element-wise operation.</li> +</ul> + +<p>So far, we have only used state, but not timers. You may have noticed that +there is a problem - there will usually be data left in the buffer. If no more +input arrives, that data will never be processed. In Beam, every window has +some point in event time when any further input for the window is considered +too late and is discarded. At this point, we say that the window has “expired”. +Since no further input can arrive to access the state for that window, the +state is also discarded. For our example, we need to ensure that all leftover +events are output when the window expires.</p> + +<h3 id="event-time-timers">Event Time Timers</h3> + +<p>An event time timer requests a call back when the watermark for an input +<code class="highlighter-rouge">PCollection</code> reaches some threshold. In other words, you can use an event time +timer to take action at a specific moment in event time - a particular point of +completeness for a <code class="highlighter-rouge">PCollection</code> - such as when a window expires.</p> + +<p>For our example, let us add an event time timer so that when the window expires, +any events remaining in the buffer are processed.</p> + +<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">Event</span><span class="o">,</span> <span class="n">EnrichedEvent</span><span class="o">&gt;()</span> <span class=&qu [...] + <span class="err">…</span> + + <span class="nd">@TimerId</span><span class="o">(</span><span class="s">"expiry"</span><span class="o">)</span> + <span class="kd">private</span> <span class="kd">final</span> <span class="n">TimerSpec</span> <span class="n">expirySpec</span> <span class="o">=</span> <span class="n">TimerSpecs</span><span class="o">.</span><span class="na">timer</span><span class="o">(</span><span class="n" [...] + + <span class="nd">@ProcessElement</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span> + <span class="n">ProcessContext</span> <span class="n">context</span><span class="o">,</span> + <span class="n">BoundedWindow</span> <span class="n">window</span><span class="o">,</span> + <span class="nd">@StateId</span><span class="o">(</span><span class="s">"buffer"</span><span class="o">)</span> <span class="n">BagState</span><span class="o">&lt;</span><span class="n">Event</span><span class="o">&gt;</span> <span class="n">bufferState</span><span c [...] + <span class="nd">@StateId</span><span class="o">(</span><span class="s">"count"</span><span class="o">)</span> <span class="n">ValueState</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">countState</span><span [...] + <span class="nd">@TimerId</span><span class="o">(</span><span class="s">"expiry"</span><span class="o">)</span> <span class="n">Timer</span> <span class="n">expiryTimer</span><span class="o">)</span> <span class="o">{</span> + + <span class="n">expiryTimer</span><span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="n">window</span><span class="o">.</span><span class="na">maxTimestamp</span><span class="o">().</span><span class="na">plus</span><span class="o">(</ [...] + + <span class="err">…</span> <span class="n">same</span> <span class="n">logic</span> <span class="n">as</span> <span class="n">above</span> <span class="err">…</span> + <span class="o">}</span> + + <span class="nd">@OnTimer</span><span class="o">(</span><span class="s">"expiry"</span><span class="o">)</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onExpiry</span><span class="o">(</span> + <span class="n">OnTimerContext</span> <span class="n">context</span><span class="o">,</span> + <span class="nd">@StateId</span><span class="o">(</span><span class="s">"buffer"</span><span class="o">)</span> <span class="n">BagState</span><span class="o">&lt;</span><span class="n">Event</span><span class="o">&gt;</span> <span class="n">bufferState</span><span c [...] + <span class="k">if</span> <span class="o">(!</span><span class="n">bufferState</span><span class="o">.</span><span class="na">isEmpty</span><span class="o">().</span><span class="na">read</span><span class="o">())</span> <span class="o">{</span> + <span class="k">for</span> <span class="o">(</span><span class="n">EnrichedEvent</span> <span class="n">enrichedEvent</span> <span class="o">:</span> <span class="n">enrichEvents</span><span class="o">(</span><span class="n">bufferState</span><span class="o">.</span><span class=" [...] + <span class="n">context</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">enrichedEvent</span><span class="o">);</span> + <span class="o">}</span> + <span class="o">}</span> + <span class="o">}</span> +<span class="o">}</span> +</code></pre> +</div> + +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># State and timers are not yet supported in Beam's Python SDK.</span> +<span class="c"># Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.</span> +</code></pre> +</div> + +<p>Let’s unpack the pieces of this snippet:</p> + +<ul> + <li> + <p>We declare an event time timer with <code class="highlighter-rouge">@TimerId("expiry")</code>. We will use the +identifier <code class="highlighter-rouge">"expiry"</code> to identify the timer for setting the callback time as +well as receiving the callback.</p> + </li> + <li> + <p>The variable <code class="highlighter-rouge">expiryTimer</code>, annotated with <code class="highlighter-rouge">@TimerId</code>, is set to the value +<code class="highlighter-rouge">TimerSpecs.timer(TimeDomain.EVENT_TIME)</code>, indicating that we want a +callback according to the event time watermark of the input elements.</p> + </li> + <li> + <p>In the <code class="highlighter-rouge">@ProcessElement</code> element we annotate a parameter <code class="highlighter-rouge">@TimerId("expiry") +Timer</code>. The Beam runner automatically provides this <code class="highlighter-rouge">Timer</code> parameter by which +we can set (and reset) the timer. It is inexpensive to reset a timer +repeatedly, so we simply set it on every element.</p> + </li> + <li> + <p>We define the <code class="highlighter-rouge">onExpiry</code> method, annotated with <code class="highlighter-rouge">@OnTimer("expiry")</code>, that +performs a final event enrichment RPC and outputs the result. The Beam runner +delivers the callback to this method by matching its identifier.</p> + </li> +</ul> + +<p>Illustrating this logic, we have the diagram below:</p> + +<p><img class="center-block" src="/images/blog/timely-processing/BatchedRpcExpiry.png" alt="Batched RPCs with window expiration" width="600" /></p> + +<p>Both the <code class="highlighter-rouge">@ProcessElement</code> and <code class="highlighter-rouge">@OnTimer("expiry")</code> methods perform the same +access to buffered state, perform the same batched RPC, and output enriched +elements.</p> + +<p>Now, if we are executing this in a streaming real-time manner, we might still +have unbounded latency for particular buffered data. If the watermark is advancing +very slowly, or event time windows are chosen to be quite large, then a lot of +time might pass before output is emitted based either on enough elements or +window expiration. We can also use timers to limit the amount of wall-clock +time, aka processing time, before we process buffered elements. We can choose +some reasonable amount of time so that even though we are issuing RPCs that are +not as large as they might be, it is still few enough RPCs to avoid blowing our +quota with the external service.</p> + +<h3 id="processing-time-timers">Processing Time Timers</h3> + +<p>A timer in processing time (time as it passes while your pipeline is executing) +is intuitively simple: you want to wait a certain amount of time and then +receive a call back.</p> + +<p>To put the finishing touches on our example, we will set a processing time +timer as soon as any data is buffered. We track whether or not the timer has +been set so we don’t continually reset it. When an element arrives, if the +timer has not been set, then we set it for the current moment plus +<code class="highlighter-rouge">MAX_BUFFER_DURATION</code>. After the allotted processing time has passed, a +callback will fire and enrich and emit any buffered elements.</p> + +<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">Event</span><span class="o">,</span> <span class="n">EnrichedEvent</span><span class="o">&gt;()</span> <span class=&qu [...] + <span class="err">…</span> + + <span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">Duration</span> <span class="n">MAX_BUFFER_DURATION</span> <span class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span class="na">standardSeconds</span><s [...] + + <span class="nd">@TimerId</span><span class="o">(</span><span class="s">"stale"</span><span class="o">)</span> + <span class="kd">private</span> <span class="kd">final</span> <span class="n">TimerSpec</span> <span class="n">staleSpec</span> <span class="o">=</span> <span class="n">TimerSpecs</span><span class="o">.</span><span class="na">timer</span><span class="o">(</span><span class="n"& [...] + + <span class="nd">@ProcessElement</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span> + <span class="n">ProcessContext</span> <span class="n">context</span><span class="o">,</span> + <span class="n">BoundedWindow</span> <span class="n">window</span><span class="o">,</span> + <span class="nd">@StateId</span><span class="o">(</span><span class="s">"count"</span><span class="o">)</span> <span class="n">ValueState</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">countState</span><span [...] + <span class="nd">@StateId</span><span class="o">(</span><span class="s">"buffer"</span><span class="o">)</span> <span class="n">BagState</span><span class="o">&lt;</span><span class="n">Event</span><span class="o">&gt;</span> <span class="n">bufferState</span><span c [...] + <span class="nd">@TimerId</span><span class="o">(</span><span class="s">"stale"</span><span class="o">)</span> <span class="n">Timer</span> <span class="n">staleTimer</span><span class="o">,</span> + <span class="nd">@TimerId</span><span class="o">(</span><span class="s">"expiry"</span><span class="o">)</span> <span class="n">Timer</span> <span class="n">expiryTimer</span><span class="o">)</span> <span class="o">{</span> + + <span class="kt">boolean</span> <span class="n">staleTimerSet</span> <span class="o">=</span> <span class="n">firstNonNull</span><span class="o">(</span><span class="n">staleSetState</span><span class="o">.</span><span class="na">read</span><span class="o">(),</span> <span class="k [...] + <span class="k">if</span> <span class="o">(</span><span class="n">firstNonNull</span><span class="o">(</span><span class="n">countState</span><span class="o">.</span><span class="na">read</span><span class="o">(),</span> <span class="mi">0</span><span class="o">)</span&g [...] + <span class="n">staleTimer</span><span class="o">.</span><span class="na">offset</span><span class="o">(</span><span class="n">MAX_BUFFER_DURATION</span><span class="o">).</span><span class="na">setRelative</span><span class="o">());</span> + <span class="o">}</span> + + <span class="err">…</span> <span class="n">same</span> <span class="n">processing</span> <span class="n">logic</span> <span class="n">as</span> <span class="n">above</span> <span class="err">…</span> + <span class="o">}</span> + + <span class="nd">@OnTimer</span><span class="o">(</span><span class="s">"stale"</span><span class="o">)</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onStale</span><span class="o">(</span> + <span class="n">OnTimerContext</span> <span class="n">context</span><span class="o">,</span> + <span class="nd">@StateId</span><span class="o">(</span><span class="s">"buffer"</span><span class="o">)</span> <span class="n">BagState</span><span class="o">&lt;</span><span class="n">Event</span><span class="o">&gt;</span> <span class="n">bufferState</span><span c [...] + <span class="nd">@StateId</span><span class="o">(</span><span class="s">"count"</span><span class="o">)</span> <span class="n">ValueState</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">countState</span><span [...] + <span class="k">if</span> <span class="o">(!</span><span class="n">bufferState</span><span class="o">.</span><span class="na">isEmpty</span><span class="o">().</span><span class="na">read</span><span class="o">())</span> <span class="o">{</span> + <span class="k">for</span> <span class="o">(</span><span class="n">EnrichedEvent</span> <span class="n">enrichedEvent</span> <span class="o">:</span> <span class="n">enrichEvents</span><span class="o">(</span><span class="n">bufferState</span><span class="o">.</span><span class=" [...] + <span class="n">context</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">enrichedEvent</span><span class="o">);</span> + <span class="o">}</span> + <span class="n">bufferState</span><span class="o">.</span><span class="na">clear</span><span class="o">();</span> + <span class="n">countState</span><span class="o">.</span><span class="na">clear</span><span class="o">();</span> + <span class="o">}</span> + <span class="o">}</span> + + <span class="err">…</span> <span class="n">same</span> <span class="n">expiry</span> <span class="n">as</span> <span class="n">above</span> <span class="err">…</span> +<span class="o">}</span> +</code></pre> +</div> + +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># State and timers are not yet supported in Beam's Python SDK.</span> +<span class="c"># Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.</span> +</code></pre> +</div> + +<p>Here is an illustration of the final code:</p> + +<p><img class="center-block" src="/images/blog/timely-processing/BatchedRpcStale.png" alt="Batching elements in state, then performing RPCs" width="600" /></p> + +<p>Recapping the entirety of the logic:</p> + +<ul> + <li>As events arrive at <code class="highlighter-rouge">@ProcessElement</code> they are buffered in state.</li> + <li>If the size of the buffer exceeds a maximum, the events are enriched and output.</li> + <li>If the buffer fills too slowly and the events get stale before the maximum is reached, +a timer causes a callback which enriches the buffered events and outputs.</li> + <li>Finally, as any window is expiring, any events buffered in that window are +processed and output prior to the state for that window being discarded.</li> +</ul> + +<p>In the end, we have a full example that uses state and timers to explicitly +manage the low-level details of a performance-sensitive transform in Beam. As +we added more and more features, our <code class="highlighter-rouge">DoFn</code> actually became pretty large. That +is a normal characteristic of stateful, timely processing. You are really +digging in and managing a lot of details that are handled automatically when +you express your logic using Beam’s higher-level APIs. What you gain from this +extra effort is an ability to tackle use cases and achieve efficiencies that +may not have been possible otherwise.</p> + +<h2 id="state-and-timers-in-beams-unified-model">State and Timers in Beam’s Unified Model</h2> + +<p>Beam’s unified model for event time across streaming and batch processing has +novel implications for state and timers. Usually, you don’t need to do anything +for your stateful and timely <code class="highlighter-rouge">DoFn</code> to work well in the Beam model. But it will +help to be aware of the considerations below, especially if you have used +similar features before outside of Beam.</p> + +<h3 id="event-time-windowing-just-works">Event Time Windowing “Just Works”</h3> + +<p>One of the raisons d’etre for Beam is correct processing of out-of-order event +data, which is almost all event data. Beam’s solution to out-of-order data is +event time windowing, where windows in event time yield correct results no +matter what windowing a user chooses or what order the events come in.</p> + +<p>If you write a stateful, timely transform, it should work no matter how the +surrounding pipeline chooses to window event time. If the pipeline chooses +fixed windows of one hour (sometimes called tumbling windows) or windows of 30 +minutes sliding by 10 minutes, the stateful, timely transform should +transparently work correctly.</p> + +<p><img class="center-block" src="/images/blog/timely-processing/WindowingChoices.png" alt="Two windowing strategies for the same stateful and timely transform" width="600" /></p> + +<p>This works in Beam automatically, because state and timers are partitioned per +key and window. Within each key and window, the stateful, timely processing is +essentially independent. As an added benefit, the passing of event time (aka +advancement of the watermark) allows automatic release of unreachable state +when a window expires, so you often don’t have to worry about evicting old +state.</p> + +<h3 id="unified-real-time-and-historical-processing">Unified real-time and historical processing</h3> + +<p>A second tenet of Beam’s semantic model is that processing must be unified +between batch and streaming. One important use case for this unification +is the ability to apply the same logic to a stream of events in real time and +to archived storage of the same events.</p> + +<p>A common characteristic of archived data is that it may arrive radically out of +order. The sharding of archived files often results in a totally different +ordering for processing than events coming in near-real-time. The data will +also all be all available and hence delivered instantaneously from the point of +view of your pipeline. Whether running experiments on past data or reprocessing +past results to fix a data processing bug, it is critically important that your +processing logic be applicable to archived events just as easily as incoming +near-real-time data.</p> + +<p><img class="center-block" src="/images/blog/timely-processing/UnifiedModel.png" alt="Unified stateful processing over streams and file archives" width="600" /></p> + +<p>It is (deliberately) possible to write a stateful and timely DoFn that delivers +results that depend on ordering or delivery timing, so in this sense there is +additional burden on you, the <code class="highlighter-rouge">DoFn</code> author, to ensure that this nondeterminism +falls within documented allowances.</p> + +<h2 id="go-use-it">Go use it!</h2> + +<p>I’ll end this post in the same way I ended the last. I hope you will go try out +Beam with stateful, timely processing. If it opens up new possibilities for +you, then great! If not, we want to hear about it. Since this is a new feature, +please check the <a href="/documentation/runners/capability-matrix/">capability matrix</a> to see the level of support for +your preferred Beam backend(s).</p> + +<p>And please do join the Beam community at +<a href="/get-started/support">u...@beam.apache.org</a> and follow +<a href="https://twitter.com/ApacheBeam">@ApacheBeam</a> on Twitter.</p> +</description> + <pubDate>Mon, 28 Aug 2017 01:00:01 -0700</pubDate> + <link>https://beam.apache.org/blog/2017/08/28/timely-processing.html</link> + <guid isPermaLink="true">https://beam.apache.org/blog/2017/08/28/timely-processing.html</guid> + + + <category>blog</category> + + </item> + + <item> <title>Powerful and modular IO connectors with Splittable DoFn in Apache Beam</title> <description><p>One of the most important parts of the Apache Beam ecosystem is its quickly growing set of connectors that allow Beam pipelines to read and write data to @@ -1847,56 +2361,5 @@ Java SDK. If you have questions or comments, we’d love to hear them on the </item> - <item> - <title>Apache Beam: Six Months in Incubation</title> - <description><p>It’s been just over six months since Apache Beam was formally accepted into incubation with the <a href="http://www.apache.org">Apache Software Foundation</a>. As a community, we’ve been hard at work getting Beam off the ground.</p> - -<!--more--> - -<p>Looking just at raw numbers for those first six months, that’s:</p> - -<ul> - <li>48,238 lines of preexisting code donated by Cloudera, dataArtisans, and Google.</li> - <li>761 pull requests from 45 contributors.</li> - <li>498 Jira issues opened and 245 resolved.</li> - <li>1 incubating release (and another 1 in progress).</li> - <li>4,200 hours of automated tests.</li> - <li>161 subscribers / 606 messages on user@.</li> - <li>217 subscribers / 1205 messages on dev@.</li> - <li>277 stars and 174 forks on GitHub.</li> -</ul> - -<p>And behind those numbers, there’s been a ton of technical progress, including:</p> - -<ul> - <li>Refactoring of the entire codebase, examples, and tests to be truly runner-independent.</li> - <li>New functionality in the Apache Flink runner for timestamps/windows in batch and bounded sources and side inputs in streaming mode.</li> - <li>Work in progress to upgrade the Apache Spark runner to use Spark 2.0.</li> - <li>Several new runners from the wider Apache community – Apache Gearpump has its own feature branch, Apache Apex has a PR, and conversations are starting on Apache Storm and others.</li> - <li>New SDKs/DSLs for exposing the Beam model – the Python SDK from Google is in on a feature branch, and there are plans to add the Scio DSL from Spotify.</li> - <li>Support for additional data sources and sinks – Apache Kafka and JMS are in, there are PRs for Amazon Kinesis, Apache Cassandra, and MongoDB, and more connectors are being planned.</li> -</ul> - -<p>But perhaps most importantly, we’re committed to building an involved, welcoming community. So far, we’ve:</p> - -<ul> - <li>Started building a vibrant developer community, with detailed design discussions on features like DoFn reuse semantics, serialization technology, and an API for accessing state.</li> - <li>Started building a user community with an active mailing list and improvements to the website and documentation.</li> - <li>Had multiple talks on Beam at venues including ApacheCon, Hadoop Summit, Kafka Summit, JBCN Barcelona, and Strata.</li> - <li>Presented at multiple existing meetups and are starting to organize some of our own.</li> -</ul> - -<p>While it’s nice to reflect back on all we’ve done, we’re working full <em>stream</em> ahead towards a stable release and graduation from incubator. And we’d love your help – join the <a href="/get-started/support/">mailing lists</a>, check out the <a href="/contribute/contribution-guide/">contribution guide</a>, and grab a <a href="https://issues.apache.org/jira/browse/BEAM-520?jql=project%20%3D%20BEAM%20AND%20resol [...] - -</description> - <pubDate>Wed, 03 Aug 2016 00:00:01 -0700</pubDate> - <link>https://beam.apache.org/blog/2016/08/03/six-months.html</link> - <guid isPermaLink="true">https://beam.apache.org/blog/2016/08/03/six-months.html</guid> - - - <category>blog</category> - - </item> - </channel> </rss> diff --git a/content/index.html b/content/index.html index cbc903f..b5b11a6 100644 --- a/content/index.html +++ b/content/index.html @@ -168,6 +168,11 @@ </div> <div class="hero__blog__cards"> + <a class="hero__blog__cards__card" href="/blog/2017/08/28/timely-processing.html"> + <div class="hero__blog__cards__card__title">Timely (and Stateful) Processing with Apache Beam</div> + <div class="hero__blog__cards__card__date">Aug 28, 2017</div> + </a> + <a class="hero__blog__cards__card" href="/blog/2017/08/16/splittable-do-fn.html"> <div class="hero__blog__cards__card__title">Powerful and modular IO connectors with Splittable DoFn in Apache Beam</div> <div class="hero__blog__cards__card__date">Aug 16, 2017</div> @@ -178,11 +183,6 @@ <div class="hero__blog__cards__card__date">May 17, 2017</div> </a> - <a class="hero__blog__cards__card" href="/blog/2017/03/16/python-sdk-release.html"> - <div class="hero__blog__cards__card__title">Python SDK released in Apache Beam 0.6.0</div> - <div class="hero__blog__cards__card__date">Mar 16, 2017</div> - </a> - </div> </div> </div> -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" <commits@beam.apache.org>.