Modified: samza/site/learn/documentation/latest/api/high-level-api.html
URL: 
http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/api/high-level-api.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/api/high-level-api.html (original)
+++ samza/site/learn/documentation/latest/api/high-level-api.html Wed Jan 18 
19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a 
href="/learn/documentation/1.8.0/api/high-level-api">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/1.7.0/api/high-level-api">1.7.0</a></li>
+
+              
+
               <li class="hide"><a 
href="/learn/documentation/1.6.0/api/high-level-api">1.6.0</a></li>
 
               
@@ -640,334 +654,338 @@
 -->
 
 <h3 id="table-of-contents">Table Of Contents</h3>
-
 <ul>
-<li><a href="#introduction">Introduction</a></li>
-<li><a href="#code-examples">Code Examples</a></li>
-<li><a href="#key-concepts">Key Concepts</a>
-
-<ul>
-<li><a href="#streamapplication">StreamApplication</a></li>
-<li><a href="#messagestream">MessageStream</a></li>
-<li><a href="#table">Table</a></li>
-</ul></li>
-<li><a href="#operators">Operators</a>
-
-<ul>
-<li><a href="#map">Map</a></li>
-<li><a href="#flatmap">FlatMap</a></li>
-<li><a href="#asyncflatmap">AsyncFlatMap</a></li>
-<li><a href="#filter">Filter</a></li>
-<li><a href="#partitionby">PartitionBy</a></li>
-<li><a href="#merge">Merge</a></li>
-<li><a href="#broadcast">Broadcast</a></li>
-<li><a href="#sendto-stream">SendTo (Stream)</a></li>
-<li><a href="#sendto-table">SendTo (Table)</a></li>
-<li><a href="#sink">Sink</a></li>
-<li><a href="#join-stream-stream">Join (Stream-Stream)</a></li>
-<li><a href="#join-stream-table">Join (Stream-Table)</a></li>
-<li><a href="#window">Window</a>
-
-<ul>
-<li><a href="#windowing-concepts">Windowing Concepts</a></li>
-<li><a href="#window-types">Window Types</a></li>
-</ul></li>
-</ul></li>
-<li><a href="#operator-ids">Operator IDs</a></li>
-<li><a href="#data-serialization">Data Serialization</a></li>
-<li><a href="#application-serialization">Application Serialization</a></li>
+  <li><a href="#introduction">Introduction</a></li>
+  <li><a href="#code-examples">Code Examples</a></li>
+  <li><a href="#key-concepts">Key Concepts</a>
+    <ul>
+      <li><a href="#streamapplication">StreamApplication</a></li>
+      <li><a href="#messagestream">MessageStream</a></li>
+      <li><a href="#table">Table</a></li>
+    </ul>
+  </li>
+  <li><a href="#operators">Operators</a>
+    <ul>
+      <li><a href="#map">Map</a></li>
+      <li><a href="#flatmap">FlatMap</a></li>
+      <li><a href="#asyncflatmap">AsyncFlatMap</a></li>
+      <li><a href="#filter">Filter</a></li>
+      <li><a href="#partitionby">PartitionBy</a></li>
+      <li><a href="#merge">Merge</a></li>
+      <li><a href="#broadcast">Broadcast</a></li>
+      <li><a href="#sendto-stream">SendTo (Stream)</a></li>
+      <li><a href="#sendto-table">SendTo (Table)</a></li>
+      <li><a href="#sink">Sink</a></li>
+      <li><a href="#join-stream-stream">Join (Stream-Stream)</a></li>
+      <li><a href="#join-stream-table">Join (Stream-Table)</a></li>
+      <li><a href="#window">Window</a>
+        <ul>
+          <li><a href="#windowing-concepts">Windowing Concepts</a></li>
+          <li><a href="#window-types">Window Types</a></li>
+        </ul>
+      </li>
+    </ul>
+  </li>
+  <li><a href="#operator-ids">Operator IDs</a></li>
+  <li><a href="#data-serialization">Data Serialization</a></li>
+  <li><a href="#application-serialization">Application Serialization</a></li>
 </ul>
 
 <h3 id="introduction">Introduction</h3>
 
-<p>Samza&rsquo;s flexible High Level Streams API lets you describe your 
complex stream processing pipeline in the form of a Directional Acyclic Graph 
(DAG) of operations on <a 
href="javadocs/org/apache/samza/operators/MessageStream">MessageStream</a>. It 
provides a rich set of built-in operators that simplify common stream 
processing operations such as filtering, projection, repartitioning, 
stream-stream and stream-table joins, and windowing. </p>
+<p>Samza’s flexible High Level Streams API lets you describe your complex 
stream processing pipeline in the form of a Directional Acyclic Graph (DAG) of 
operations on <a 
href="javadocs/org/apache/samza/operators/MessageStream">MessageStream</a>. It 
provides a rich set of built-in operators that simplify common stream 
processing operations such as filtering, projection, repartitioning, 
stream-stream and stream-table joins, and windowing.</p>
 
 <h3 id="code-examples">Code Examples</h3>
 
 <p><a 
href="https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/cookbook";>The
 Samza Cookbook</a> contains various recipes using the Samza High Level Streams 
API. These include:</p>
 
 <ul>
-<li><p>The <a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java";>Filter
 example</a> demonstrates how to perform stateless operations on a stream. 
</p></li>
-<li><p>The <a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/JoinExample.java";>Join
 example</a> demonstrates how you can join a Kafka stream of page-views with a 
stream of ad-clicks</p></li>
-<li><p>The <a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java";>Stream-Table
 Join example</a> demonstrates how to use the Samza Table API. It joins a Kafka 
stream with a remote dataset accessed through a REST service.</p></li>
-<li><p>The <a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/SessionWindowExample.java";>SessionWindow</a>
 and <a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/TumblingWindowExample.java";>TumblingWindow</a>
 examples illustrate Samza&rsquo;s rich windowing and triggering 
capabilities.</p></li>
+  <li>
+    <p>The <a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java";>Filter
 example</a> demonstrates how to perform stateless operations on a stream.</p>
+  </li>
+  <li>
+    <p>The <a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/JoinExample.java";>Join
 example</a> demonstrates how you can join a Kafka stream of page-views with a 
stream of ad-clicks</p>
+  </li>
+  <li>
+    <p>The <a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java";>Stream-Table
 Join example</a> demonstrates how to use the Samza Table API. It joins a Kafka 
stream with a remote dataset accessed through a REST service.</p>
+  </li>
+  <li>
+    <p>The <a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/SessionWindowExample.java";>SessionWindow</a>
 and <a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/TumblingWindowExample.java";>TumblingWindow</a>
 examples illustrate Samza’s rich windowing and triggering capabilities.</p>
+  </li>
 </ul>
 
 <h3 id="key-concepts">Key Concepts</h3>
-
 <h4 id="streamapplication">StreamApplication</h4>
-
-<p>A <a 
href="javadocs/org/apache/samza/application/StreamApplication">StreamApplication</a>
 describes the inputs, outputs, state, configuration and the processing logic 
for an application written using Samza&rsquo;s High Level Streams API.</p>
+<p>A <a 
href="javadocs/org/apache/samza/application/StreamApplication">StreamApplication</a>
 describes the inputs, outputs, state, configuration and the processing logic 
for an application written using Samza’s High Level Streams API.</p>
 
 <p>A typical StreamApplication implementation consists of the following 
stages:</p>
 
 <ol>
-<li>Configuring the inputs, outputs and state (tables) using the appropriate 
<a 
href="javadocs/org/apache/samza/system/descriptors/SystemDescriptor">SystemDescriptor</a>s,
 <a 
href="javadocs/org/apache/samza/descriptors/InputDescriptor">InputDescriptor</a>s,
 <a 
href="javadocs/org/apache/samza/system/descriptors/OutputDescriptor">OutputDescriptor</a>s
 and <a 
href="javadocs/org/apache/samza/table/descriptors/TableDescriptor">TableDescriptor</a>s.</li>
-<li>Obtaining the corresponding <a 
href="javadocs/org/apache/samza/operators/MessageStream">MessageStream</a>s, <a 
href="javadocs/org/apache/samza/operators/OutputStream">OutputStream</a>s and 
<a href="javadocs/org/apache/samza/table/Table">Table</a>s from the provided <a 
href="javadocs/org/apache/samza/application/descriptors/StreamApplicationDescriptor">StreamApplicationDescriptor</a></li>
-<li>Defining the processing logic using operators and functions on the streams 
and tables thus obtained.</li>
+  <li>Configuring the inputs, outputs and state (tables) using the appropriate 
<a 
href="javadocs/org/apache/samza/system/descriptors/SystemDescriptor">SystemDescriptor</a>s,
 <a 
href="javadocs/org/apache/samza/descriptors/InputDescriptor">InputDescriptor</a>s,
 <a 
href="javadocs/org/apache/samza/system/descriptors/OutputDescriptor">OutputDescriptor</a>s
 and <a 
href="javadocs/org/apache/samza/table/descriptors/TableDescriptor">TableDescriptor</a>s.</li>
+  <li>Obtaining the corresponding <a 
href="javadocs/org/apache/samza/operators/MessageStream">MessageStream</a>s, <a 
href="javadocs/org/apache/samza/operators/OutputStream">OutputStream</a>s and 
<a href="javadocs/org/apache/samza/table/Table">Table</a>s from the provided <a 
href="javadocs/org/apache/samza/application/descriptors/StreamApplicationDescriptor">StreamApplicationDescriptor</a></li>
+  <li>Defining the processing logic using operators and functions on the 
streams and tables thus obtained.</li>
 </ol>
 
 <p>The following example StreamApplication removes page views older than 1 
hour from the input stream:</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>   
-    <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">PageViewFilter</span> <span class="kd">implements</span> <span 
class="n">StreamApplication</span> <span class="o">{</span>
-      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">describe</span><span class="o">(</span><span 
class="n">StreamApplicationDescriptor</span> <span 
class="n">appDescriptor</span><span class="o">)</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
+    <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">PageViewFilter</span> <span class="kd">implements</span> <span 
class="nc">StreamApplication</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">describe</span><span class="o">(</span><span 
class="nc">StreamApplicationDescriptor</span> <span 
class="n">appDescriptor</span><span class="o">)</span> <span class="o">{</span>
         <span class="c1">// Step 1: configure the inputs and outputs using 
descriptors</span>
-        <span class="n">KafkaSystemDescriptor</span> <span 
class="n">ksd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="n">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;kafka&quot;</span><span class="o">)</span>
-            <span class="o">.</span><span 
class="na">withConsumerZkConnect</span><span class="o">(</span><span 
class="n">ImmutableList</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="s">&quot;...&quot;</span><span class="o">))</span>
-            <span class="o">.</span><span 
class="na">withProducerBootstrapServers</span><span class="o">(</span><span 
class="n">ImmutableList</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="s">&quot;...&quot;</span><span class="o">,</span> <span 
class="s">&quot;...&quot;</span><span class="o">));</span>
-        <span class="n">KafkaInputDescriptor</span><span 
class="o">&lt;</span><span class="n">PageViewEvent</span><span 
class="o">&gt;</span> <span class="n">kid</span> <span class="o">=</span> 
-            <span class="n">ksd</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">&quot;pageViewEvent&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">PageViewEvent</span><span 
class="o">.</span><span class="na">class</span><span class="o">));</span>
-        <span class="n">KafkaOutputDescriptor</span><span 
class="o">&lt;</span><span class="n">PageViewEvent</span><span 
class="o">&gt;&gt;</span> <span class="n">kod</span> <span class="o">=</span> 
-            <span class="n">ksd</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">&quot;recentPageViewEvent&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">PageViewEvent</span><span 
class="o">.</span><span class="na">class</span><span class="o">)));</span>
+        <span class="nc">KafkaSystemDescriptor</span> <span 
class="n">ksd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="nc">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="s">"kafka"</span><span class="o">)</span>
+            <span class="o">.</span><span 
class="na">withConsumerZkConnect</span><span class="o">(</span><span 
class="nc">ImmutableList</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="s">"..."</span><span 
class="o">))</span>
+            <span class="o">.</span><span 
class="na">withProducerBootstrapServers</span><span class="o">(</span><span 
class="nc">ImmutableList</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="s">"..."</span><span 
class="o">,</span> <span class="s">"..."</span><span class="o">));</span>
+        <span class="nc">KafkaInputDescriptor</span><span 
class="o">&lt;</span><span class="nc">PageViewEvent</span><span 
class="o">&gt;</span> <span class="n">kid</span> <span class="o">=</span> 
+            <span class="n">ksd</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">"pageViewEvent"</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">PageViewEvent</span><span 
class="o">.</span><span class="na">class</span><span class="o">));</span>
+        <span class="nc">KafkaOutputDescriptor</span><span 
class="o">&lt;</span><span class="nc">PageViewEvent</span><span 
class="o">&gt;&gt;</span> <span class="n">kod</span> <span class="o">=</span> 
+            <span class="n">ksd</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">"recentPageViewEvent"</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">PageViewEvent</span><span 
class="o">.</span><span class="na">class</span><span class="o">)));</span>
   
         <span class="c1">// Step 2: obtain the message strems and output 
streams </span>
-        <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageViewEvent</span><span class="o">&gt;</span> <span 
class="n">pageViewEvents</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="n">kid</span><span class="o">);</span>
-        <span class="n">OutputStream</span><span class="o">&lt;</span><span 
class="n">PageViewEvent</span><span class="o">&gt;</span> <span 
class="n">recentPageViewEvents</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="n">kod</span><span class="o">);</span>
+        <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">PageViewEvent</span><span class="o">&gt;</span> <span 
class="n">pageViewEvents</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="n">kid</span><span class="o">);</span>
+        <span class="nc">OutputStream</span><span class="o">&lt;</span><span 
class="nc">PageViewEvent</span><span class="o">&gt;</span> <span 
class="n">recentPageViewEvents</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="n">kod</span><span class="o">);</span>
   
         <span class="c1">// Step 3: define the processing logic</span>
         <span class="n">pageViewEvents</span>
             <span class="o">.</span><span class="na">filter</span><span 
class="o">(</span><span class="n">m</span> <span class="o">-&gt;</span> <span 
class="n">m</span><span class="o">.</span><span 
class="na">getCreationTime</span><span class="o">()</span> <span 
class="o">&gt;</span> 
-                <span class="n">System</span><span class="o">.</span><span 
class="na">currentTimeMillis</span><span class="o">()</span> <span 
class="o">-</span> <span class="n">Duration</span><span class="o">.</span><span 
class="na">ofHours</span><span class="o">(</span><span class="mi">1</span><span 
class="o">).</span><span class="na">toMillis</span><span class="o">())</span>
+                <span class="nc">System</span><span class="o">.</span><span 
class="na">currentTimeMillis</span><span class="o">()</span> <span 
class="o">-</span> <span class="nc">Duration</span><span 
class="o">.</span><span class="na">ofHours</span><span class="o">(</span><span 
class="mi">1</span><span class="o">).</span><span 
class="na">toMillis</span><span class="o">())</span>
             <span class="o">.</span><span class="na">sendTo</span><span 
class="o">(</span><span class="n">recentPageViewEvents</span><span 
class="o">);</span>
       <span class="o">}</span>
     <span class="o">}</span>
   </code></pre></figure>
 
 <h4 id="messagestream">MessageStream</h4>
-
 <p>A <a 
href="javadocs/org/apache/samza/operators/MessageStream">MessageStream</a>, as 
the name implies, represents a stream of messages. A StreamApplication is 
described as a Directed Acyclic Graph (DAG) of transformations on 
MessageStreams. You can get a MessageStream in two ways:</p>
 
 <ol>
-<li>Calling StreamApplicationDescriptor#getInputStream() with an <a 
href="javadocs/org/apache/samza/system/descriptors/InputDescriptor">InputDescriptor</a>
 obtained from a <a 
href="javadocs/org/apache/samza/system/descriptors/SystemDescriptor">SystemDescriptor</a>.</li>
-<li>By transforming an existing MessageStream using operators like map, 
filter, window, join etc.</li>
+  <li>Calling StreamApplicationDescriptor#getInputStream() with an <a 
href="javadocs/org/apache/samza/system/descriptors/InputDescriptor">InputDescriptor</a>
 obtained from a <a 
href="javadocs/org/apache/samza/system/descriptors/SystemDescriptor">SystemDescriptor</a>.</li>
+  <li>By transforming an existing MessageStream using operators like map, 
filter, window, join etc.</li>
 </ol>
 
 <h4 id="table">Table</h4>
-
-<p>A <a href="javadocs/org/apache/samza/table/Table">Table</a> is an 
abstraction for data sources that support random access by key. It is an 
evolution of the older <a 
href="javadocs/org/apache/samza/storage/kv/KeyValueStore">KeyValueStore</a> 
API. It offers support for both local and remote data sources and composition 
through hybrid tables. For remote data sources, a [RemoteTable] provides 
optimized access with caching, rate-limiting, and retry support. Depending on 
the implementation, a Table can be a <a 
href="javadocs/org/apache/samza/table/ReadableTable">ReadableTable</a> or a <a 
href="javadocs/org/apache/samza/table/ReadWriteTable">ReadWriteTable</a>.</p>
+<p>A <a href="javadocs/org/apache/samza/table/Table">Table</a> is an 
abstraction for data sources that support random access by key. It is an 
evolution of the older <a 
href="javadocs/org/apache/samza/storage/kv/KeyValueStore">KeyValueStore</a> 
API. It offers support for both local and remote data sources and composition 
through hybrid tables. For remote data sources, a [RemoteTable] provides 
optimized access with caching, rate-limiting, and retry support.</p>
 
 <p>In the High Level Streams API, you can obtain and use a Table as 
follows:</p>
 
 <ol>
-<li>Use the appropriate TableDescriptor to specify the table properties.</li>
-<li>Register the TableDescriptor with the StreamApplicationDescriptor. This 
returns a Table reference, which can be used for populate the table using the 
<a href="#sendto-table">Send To Table</a> operator, or for joining a stream 
with the table using the <a href="#join-stream-table">Stream-Table Join</a> 
operator.</li>
-<li>Alternatively, you can obtain a Table reference within an operator&rsquo;s 
<a 
href="javadocs/org/apache/samza/operators/functions/InitableFunction">InitableFunction</a>
 using the provided <a 
href="javadocs/org/apache/samza/context/TaskContext">TaskContext</a>.</li>
+  <li>Use the appropriate TableDescriptor to specify the table properties.</li>
+  <li>Register the TableDescriptor with the StreamApplicationDescriptor. This 
returns a Table reference, which can be used for populate the table using the 
<a href="#sendto-table">Send To Table</a> operator, or for joining a stream 
with the table using the <a href="#join-stream-table">Stream-Table Join</a> 
operator.</li>
+  <li>Alternatively, you can obtain a Table reference within an operator’s 
<a 
href="javadocs/org/apache/samza/operators/functions/InitableFunction">InitableFunction</a>
 using the provided <a 
href="javadocs/org/apache/samza/context/TaskContext">TaskContext</a>.</li>
 </ol>
 
 <h3 id="operators">Operators</h3>
-
-<p>The High Level Streams API provides common operations like map, flatmap, 
filter, merge, broadcast, joins, and windows on MessageStreams. Most of these 
operators accept their corresponding Functions as an argument. </p>
+<p>The High Level Streams API provides common operations like map, flatmap, 
filter, merge, broadcast, joins, and windows on MessageStreams. Most of these 
operators accept their corresponding Functions as an argument.</p>
 
 <h4 id="map">Map</h4>
-
 <p>Applies the provided 1:1 <a 
href="javadocs/org/apache/samza/operators/functions/MapFunction">MapFunction</a>
 to each element in the MessageStream and returns the transformed 
MessageStream. The MapFunction takes in a single message and returns a single 
message (potentially of a different type).</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> 
<span class="n">numbers</span> <span class="o">=</span> <span 
class="o">...</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">tripled</span> <span class="o">=</span> <span 
class="n">numbers</span><span class="o">.</span><span 
class="na">map</span><span class="o">(</span><span class="n">m</span> <span 
class="o">-&gt;</span> <span class="n">m</span> <span class="o">*</span> <span 
class="mi">3</span><span class="o">);</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">stringified</span> <span class="o">=</span> <span 
class="n">numbers</span><span class="o">.</span><span 
class="na">map</span><span class="o">(</span><span class="n">m</span> <span 
class="o">-&gt;</span> <span class="n">String</span><span 
class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span 
class="n">m</span><span class="o">));</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">numbers</span> <span class="o">=</span> <span class="o">...</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">tripled</span> <span class="o">=</span> <span 
class="n">numbers</span><span class="o">.</span><span 
class="na">map</span><span class="o">(</span><span class="n">m</span> <span 
class="o">-&gt;</span> <span class="n">m</span> <span class="o">*</span> <span 
class="mi">3</span><span class="o">);</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">&gt;</span> <span 
class="n">stringified</span> <span class="o">=</span> <span 
class="n">numbers</span><span class="o">.</span><span 
class="na">map</span><span class="o">(</span><span class="n">m</span> <span 
class="o">-&gt;</span> <span class="nc">String</span><span 
class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span 
class="n">m</span><span class="o">));</span></code></pre></figure>
 
 <h4 id="flatmap">FlatMap</h4>
-
 <p>Applies the provided 1:n <a 
href="javadocs/org/apache/samza/operators/functions/FlatMapFunction">FlatMapFunction</a>
 to each element in the MessageStream and returns the transformed 
MessageStream. The FlatMapFunction takes in a single message and returns zero 
or more messages.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">sentence</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">&gt;</span> <span 
class="n">sentence</span> <span class="o">=</span> <span class="o">...</span>
     <span class="c1">// Parse the sentence into its individual words splitting 
on space</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> 
<span class="o">=</span> <span class="n">sentence</span><span 
class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span 
class="n">sentence</span> <span class="o">-&gt;</span>
-        <span class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span 
class="n">sentence</span><span class="o">.</span><span 
class="na">split</span><span class="o">(</span><span class="err">“</span> 
<span class="err">”</span><span class="o">))</span></code></pre></figure>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">&gt;</span> <span 
class="n">words</span> <span class="o">=</span> <span 
class="n">sentence</span><span class="o">.</span><span 
class="na">flatMap</span><span class="o">(</span><span 
class="n">sentence</span> <span class="o">-&gt;</span>
+        <span class="nc">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span 
class="n">sentence</span><span class="o">.</span><span 
class="na">split</span><span class="o">(</span><span class="err">“</span> 
<span class="err">”</span><span class="o">))</span></code></pre></figure>
 
 <h4 id="asyncflatmap">AsyncFlatMap</h4>
-
 <p>Applies the provided 1:n <a 
href="javadocs/org/apache/samza/operators/functions/AsyncFlatMapFunction">AsyncFlatMapFunction</a>
 to each element in the MessageStream and returns the transformed 
MessageStream. The AsyncFlatMapFunction takes in a single message and returns a 
future of zero or more messages.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">RestClient</span> <span 
class="n">restClient</span> <span class="o">=</span> <span class="o">...</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> 
<span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">RestClient</span> <span class="n">restClient</span> <span 
class="o">=</span> <span class="o">...</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">&gt;</span> <span 
class="n">words</span> <span class="o">=</span> <span class="o">...</span>
     <span class="c1">// Transform each incoming word into its meaning using a 
dictionary look up service</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">meanings</span> <span class="o">=</span> <span 
class="n">words</span><span class="o">.</span><span 
class="na">asyncFlatMap</span><span class="o">(</span><span 
class="n">word</span> <span class="o">-&gt;</span> <span class="o">{</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">&gt;</span> <span 
class="n">meanings</span> <span class="o">=</span> <span 
class="n">words</span><span class="o">.</span><span 
class="na">asyncFlatMap</span><span class="o">(</span><span 
class="n">word</span> <span class="o">-&gt;</span> <span class="o">{</span>
        <span class="c1">// Builds a look up request to the dictionary 
service</span>
-       <span class="n">Request</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">dictionaryRequest</span> <span class="o">=</span> <span 
class="n">buildDictionaryRequest</span><span class="o">(</span><span 
class="n">word</span><span class="o">);</span>
-       <span class="n">CompletableFuture</span><span 
class="o">&lt;</span><span class="n">DictionaryResponse</span><span 
class="o">&gt;</span> <span class="n">dictionaryResponseFuture</span> <span 
class="o">=</span> <span class="n">restClient</span><span 
class="o">.</span><span class="na">sendRequest</span><span 
class="o">(</span><span class="n">dictionaryRequest</span><span 
class="o">);</span>
+       <span class="nc">Request</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">&gt;</span> <span 
class="n">dictionaryRequest</span> <span class="o">=</span> <span 
class="n">buildDictionaryRequest</span><span class="o">(</span><span 
class="n">word</span><span class="o">);</span>
+       <span class="nc">CompletableFuture</span><span 
class="o">&lt;</span><span class="nc">DictionaryResponse</span><span 
class="o">&gt;</span> <span class="n">dictionaryResponseFuture</span> <span 
class="o">=</span> <span class="n">restClient</span><span 
class="o">.</span><span class="na">sendRequest</span><span 
class="o">(</span><span class="n">dictionaryRequest</span><span 
class="o">);</span>
        <span class="k">return</span> <span 
class="n">dictionaryResponseFuture</span>
-            <span class="o">.</span><span class="na">thenApply</span><span 
class="o">(</span><span class="n">response</span> <span class="o">-&gt;</span> 
<span class="k">new</span> <span class="n">Pair</span><span 
class="o">&lt;&gt;(</span><span class="n">word</span><span class="o">,</span> 
<span class="n">response</span><span class="o">.</span><span 
class="na">getMeaning</span><span class="o">()));</span>
+            <span class="o">.</span><span class="na">thenApply</span><span 
class="o">(</span><span class="n">response</span> <span class="o">-&gt;</span> 
<span class="k">new</span> <span class="nc">Pair</span><span 
class="o">&lt;&gt;(</span><span class="n">word</span><span class="o">,</span> 
<span class="n">response</span><span class="o">.</span><span 
class="na">getMeaning</span><span class="o">()));</span>
     <span class="o">});</span></code></pre></figure>
 
 <p>For more details on asynchronous processing, see <a 
href="../../../tutorials/latest/samza-async-user-guide">Samza Async API and 
Multithreading User Guide</a></p>
 
 <h4 id="filter">Filter</h4>
-
 <p>Applies the provided <a 
href="javadocs/org/apache/samza/operators/functions/FilterFunction">FilterFunction</a>
 to the MessageStream and returns the filtered MessageStream. The 
FilterFunction is a predicate that specifies whether a message should be 
retained in the filtered stream. Messages for which the FilterFunction returns 
false are filtered out.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> 
<span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">&gt;</span> <span 
class="n">words</span> <span class="o">=</span> <span class="o">...</span>
     <span class="c1">// Extract only the long words</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">longWords</span> <span class="o">=</span> <span 
class="n">words</span><span class="o">.</span><span 
class="na">filter</span><span class="o">(</span><span class="n">word</span> 
<span class="o">-&gt;</span> <span class="n">word</span><span 
class="o">.</span><span class="na">size</span><span class="o">()</span> <span 
class="o">&gt;</span> <span class="mi">15</span><span class="o">);</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">&gt;</span> <span 
class="n">longWords</span> <span class="o">=</span> <span 
class="n">words</span><span class="o">.</span><span 
class="na">filter</span><span class="o">(</span><span class="n">word</span> 
<span class="o">-&gt;</span> <span class="n">word</span><span 
class="o">.</span><span class="na">size</span><span class="o">()</span> <span 
class="o">&gt;</span> <span class="mi">15</span><span class="o">);</span>
     <span class="c1">// Extract only the short words</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">shortWords</span> <span class="o">=</span> <span 
class="n">words</span><span class="o">.</span><span 
class="na">filter</span><span class="o">(</span><span class="n">word</span> 
<span class="o">-&gt;</span> <span class="n">word</span><span 
class="o">.</span><span class="na">size</span><span class="o">()</span> <span 
class="o">&lt;</span> <span class="mi">3</span><span class="o">);</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">&gt;</span> <span 
class="n">shortWords</span> <span class="o">=</span> <span 
class="n">words</span><span class="o">.</span><span 
class="na">filter</span><span class="o">(</span><span class="n">word</span> 
<span class="o">-&gt;</span> <span class="n">word</span><span 
class="o">.</span><span class="na">size</span><span class="o">()</span> <span 
class="o">&lt;</span> <span class="mi">3</span><span class="o">);</span>
     </code></pre></figure>
 
 <h4 id="partitionby">PartitionBy</h4>
-
 <p>Re-partitions this MessageStream using the key returned by the provided 
keyExtractor and returns the transformed MessageStream. Messages are sent 
through an intermediate stream during repartitioning.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
     
     <span class="c1">// Repartition PageViews by userId.</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;&gt;</span> <span 
class="n">partitionedPageViews</span> <span class="o">=</span> 
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">PageView</span><span class="o">&gt;&gt;</span> <span 
class="n">partitionedPageViews</span> <span class="o">=</span> 
         <span class="n">pageViews</span><span class="o">.</span><span 
class="na">partitionBy</span><span class="o">(</span>
             <span class="n">pageView</span> <span class="o">-&gt;</span> <span 
class="n">pageView</span><span class="o">.</span><span 
class="na">getUserId</span><span class="o">(),</span> <span class="c1">// key 
extractor</span>
             <span class="n">pageView</span> <span class="o">-&gt;</span> <span 
class="n">pageView</span><span class="o">,</span> <span class="c1">// value 
extractor</span>
-            <span class="n">KVSerde</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="n">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">)),</span> <span 
class="c1">// serdes</span>
-            <span class="s">&quot;partitioned-page-views&quot;</span><span 
class="o">);</span> <span class="c1">// operator ID    </span>
+            <span class="nc">KVSerde</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="nc">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">)),</span> <span 
class="c1">// serdes</span>
+            <span class="s">"partitioned-page-views"</span><span 
class="o">);</span> <span class="c1">// operator ID    </span>
         </code></pre></figure>
 
 <p>The operator ID should be unique for each operator within the application 
and is used to identify the streams and stores created by the operator.</p>
 
 <h4 id="merge">Merge</h4>
-
 <p>Merges the MessageStream with all the provided MessageStreams and returns 
the merged stream.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">LogEvent</span><span class="o">&gt;</span> <span 
class="n">log1</span> <span class="o">=</span> <span class="o">...</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">LogEvent</span><span class="o">&gt;</span> <span 
class="n">log2</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">LogEvent</span><span class="o">&gt;</span> <span 
class="n">log1</span> <span class="o">=</span> <span class="o">...</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">LogEvent</span><span class="o">&gt;</span> <span 
class="n">log2</span> <span class="o">=</span> <span class="o">...</span>
     
     <span class="c1">// Merge individual “LogEvent” streams and create a 
new merged MessageStream</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">LogEvent</span><span class="o">&gt;</span> <span 
class="n">mergedLogs</span> <span class="o">=</span> <span 
class="n">log1</span><span class="o">.</span><span class="na">merge</span><span 
class="o">(</span><span class="n">log2</span><span class="o">);</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">LogEvent</span><span class="o">&gt;</span> <span 
class="n">mergedLogs</span> <span class="o">=</span> <span 
class="n">log1</span><span class="o">.</span><span class="na">merge</span><span 
class="o">(</span><span class="n">log2</span><span class="o">);</span>
     
     <span class="c1">// Alternatively, use mergeAll to merge multiple 
streams</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">LogEvent</span><span class="o">&gt;</span> <span 
class="n">mergedLogs</span> <span class="o">=</span> <span 
class="n">MessageStream</span><span class="o">.</span><span 
class="na">mergeAll</span><span class="o">(</span><span 
class="n">ImmutableList</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="n">log1</span><span 
class="o">,</span> <span class="n">log2</span><span class="o">,</span> <span 
class="o">...));</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">LogEvent</span><span class="o">&gt;</span> <span 
class="n">mergedLogs</span> <span class="o">=</span> <span 
class="nc">MessageStream</span><span class="o">.</span><span 
class="na">mergeAll</span><span class="o">(</span><span 
class="nc">ImmutableList</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="n">log1</span><span 
class="o">,</span> <span class="n">log2</span><span class="o">,</span> <span 
class="o">...));</span>
     </code></pre></figure>
 
 <p>The merge transform preserves the order of messages within each 
MessageStream. If message <code>m1</code> appears before <code>m2</code> in any 
provided stream, then, <code>m1</code> will also appears before <code>m2</code> 
in the merged stream.</p>
 
 <h4 id="broadcast">Broadcast</h4>
-
 <p>Broadcasts the contents of the MessageStream to every <em>instance</em> of 
downstream operators via an intermediate stream.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">VersionChange</span><span 
class="o">&gt;</span> <span class="n">versionChanges</span> <span 
class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">VersionChange</span><span class="o">&gt;</span> <span 
class="n">versionChanges</span> <span class="o">=</span> <span 
class="o">...</span>
     
     <span class="c1">// Broadcast version change event to all downstream 
operator instances.</span>
     <span class="n">versionChanges</span>
         <span class="o">.</span><span class="na">broadcast</span><span 
class="o">(</span>
-            <span class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">VersionChange</span><span 
class="o">.</span><span class="na">class</span><span class="o">),</span> <span 
class="c1">// serde</span>
-            <span class="s">&quot;version-change-broadcast&quot;</span><span 
class="o">);</span> <span class="c1">// operator ID</span>
+            <span class="k">new</span> <span 
class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span 
class="nc">VersionChange</span><span class="o">.</span><span 
class="na">class</span><span class="o">),</span> <span class="c1">// 
serde</span>
+            <span class="s">"version-change-broadcast"</span><span 
class="o">);</span> <span class="c1">// operator ID</span>
         <span class="o">.</span><span class="na">map</span><span 
class="o">(</span><span class="n">vce</span> <span class="o">-&gt;</span> <span 
class="cm">/* act on version change event in each instance */</span> <span 
class="o">);</span>
          </code></pre></figure>
 
 <h4 id="sendto-stream">SendTo (Stream)</h4>
-
 <p>Sends all messages in this MessageStream to the provided OutputStream. You 
can specify the key and the value to be used for the outgoing messages.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 
     <span class="c1">// Obtain the OutputStream using an 
OutputDescriptor</span>
-    <span class="n">KafkaOutputDescriptor</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">kod</span> <span class="o">=</span> 
-        <span class="n">ksd</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">user</span><span class="o">-</span><span 
class="n">country</span><span class="err">”</span><span class="o">,</span> 
<span class="n">KVSerde</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="n">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="n">StringSerde</span><span 
class="o">());</span>
-    <span class="n">OutputStream</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">userCountries</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="n">od</span><span class="o">)</span>
+    <span class="nc">KafkaOutputDescriptor</span><span 
class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">String</span><span class="o">&gt;&gt;</span> <span 
class="n">kod</span> <span class="o">=</span> 
+        <span class="n">ksd</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">user</span><span class="o">-</span><span 
class="n">country</span><span class="err">”</span><span class="o">,</span> 
<span class="nc">KVSerde</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="nc">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="nc">StringSerde</span><span 
class="o">());</span>
+    <span class="nc">OutputStream</span><span class="o">&lt;</span><span 
class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">String</span><span class="o">&gt;&gt;</span> <span 
class="n">userCountries</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="n">od</span><span class="o">)</span>
     
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
     <span class="c1">// Send a new message with userId as the key and their 
country as the value to the “user-country” stream.</span>
     <span class="n">pageViews</span>
-      <span class="o">.</span><span class="na">map</span><span 
class="o">(</span><span class="n">pageView</span> <span class="o">-&gt;</span> 
<span class="n">KV</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="n">pageView</span><span class="o">.</span><span 
class="na">getUserId</span><span class="o">(),</span> <span 
class="n">pageView</span><span class="o">.</span><span 
class="na">getCountry</span><span class="o">()));</span>
+      <span class="o">.</span><span class="na">map</span><span 
class="o">(</span><span class="n">pageView</span> <span class="o">-&gt;</span> 
<span class="no">KV</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="n">pageView</span><span class="o">.</span><span 
class="na">getUserId</span><span class="o">(),</span> <span 
class="n">pageView</span><span class="o">.</span><span 
class="na">getCountry</span><span class="o">()));</span>
       <span class="o">.</span><span class="na">sendTo</span><span 
class="o">(</span><span class="n">userCountries</span><span 
class="o">);</span></code></pre></figure>
 
 <h4 id="sendto-table">SendTo (Table)</h4>
-
 <p>Sends all messages in this MessageStream to the provided Table. The 
expected message type is <a 
href="javadocs/org/apache/samza/operators/KV">KV</a>.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">Profile</span><span class="o">&gt;</span> 
<span class="n">profilesStream</span> <span class="o">=</span> <span 
class="o">...</span>
-    <span class="n">Table</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span class="n">Long</span><span 
class="o">,</span> <span class="n">Profile</span><span 
class="o">&gt;&gt;</span> <span class="n">profilesTable</span> <span 
class="o">=</span> 
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">Profile</span><span class="o">&gt;</span> <span 
class="n">profilesStream</span> <span class="o">=</span> <span 
class="o">...</span>
+    <span class="nc">Table</span><span class="o">&lt;</span><span 
class="no">KV</span><span class="o">&lt;</span><span 
class="nc">Long</span><span class="o">,</span> <span 
class="nc">Profile</span><span class="o">&gt;&gt;</span> <span 
class="n">profilesTable</span> <span class="o">=</span> 
     
     <span class="n">profilesStream</span>
-        <span class="o">.</span><span class="na">map</span><span 
class="o">(</span><span class="n">profile</span> <span class="o">-&gt;</span> 
<span class="n">KV</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="n">profile</span><span 
class="o">.</span><span class="na">getMemberId</span><span class="o">(),</span> 
<span class="n">profile</span><span class="o">))</span>
+        <span class="o">.</span><span class="na">map</span><span 
class="o">(</span><span class="n">profile</span> <span class="o">-&gt;</span> 
<span class="no">KV</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="n">profile</span><span 
class="o">.</span><span class="na">getMemberId</span><span class="o">(),</span> 
<span class="n">profile</span><span class="o">))</span>
         <span class="o">.</span><span class="na">sendTo</span><span 
class="o">(</span><span class="n">profilesTable</span><span class="o">);</span>
         </code></pre></figure>
 
-<h4 id="sink">Sink</h4>
+<p>Sends all update messages in this MessageStream to the provided Table. The 
expected message type is <a 
href="javadocs/org/apache/samza/operators/KV">KV</a>.
+V should be on type UpdateMessage which defines an update message and an 
optional default to be inserted in the absence of an existing record.
+User also needs to pass an UpdateOptions parameter as well in the sendTo call 
of MessageStream. It defines the behavior of the sendTo-table operator 
+in terms of whether it should insert a default in the absence of an existing 
record or not.</p>
+
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">UserInfoEvent</span><span class="o">&gt;</span> <span 
class="n">userInfoEventStream</span> <span class="o">=</span> <span 
class="o">...</span>
+    <span class="nc">Table</span><span class="o">&lt;</span><span 
class="no">KV</span><span class="o">&lt;</span><span 
class="nc">Long</span><span class="o">,</span> <span 
class="nc">UserInfo</span><span class="o">&gt;&gt;</span> <span 
class="n">userInfoTable</span> <span class="o">=</span> <span 
class="o">...</span>
+
+    <span class="n">userInfoEventStream</span>
+        <span class="o">.</span><span class="na">map</span><span 
class="o">(</span><span class="n">event</span> <span class="o">-&gt;</span> 
<span class="o">{</span>
+          <span class="nc">UserInfo</span> <span class="n">userInfo</span> 
<span class="o">=</span> <span class="n">event</span><span 
class="o">.</span><span class="na">getUserInfo</span><span class="o">();</span>
+          <span class="nc">String</span> <span class="n">update</span> <span 
class="o">=</span> <span class="o">...;</span> 
+          <span class="k">return</span> <span class="no">KV</span><span 
class="o">.</span><span class="na">of</span><span class="o">(</span><span 
class="n">userInfo</span><span class="o">.</span><span 
class="na">getUserId</span><span class="o">(),</span> <span 
class="nc">UpdateMessage</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="n">update</span><span 
class="o">,</span> <span class="n">userInfo</span><span class="o">));</span>
+        <span class="o">})</span>
+        <span class="o">.</span><span class="na">sendTo</span><span 
class="o">(</span><span class="n">userInfoTable</span><span class="o">,</span> 
<span class="nc">UpdateOptions</span><span class="o">.</span><span 
class="na">UPDATE_WITH_DEFAULTS</span><span 
class="o">);</span></code></pre></figure>
 
+<h4 id="sink">Sink</h4>
 <p>Allows sending messages from this MessageStream to an output system using 
the provided <a 
href="javadocs/org/apache/samza/operators/functions/SinkFunction.html">SinkFunction</a>.</p>
 
 <p>This offers more control than <a href="#sendto-stream">SendTo (Stream)</a> 
since the SinkFunction has access to the MessageCollector and the 
TaskCoordinator. For example, you can choose to manually commit offsets, or 
shut-down the job using the TaskCoordinator APIs. This operator can also be 
used to send messages to non-Samza systems (e.g. a remote databases, REST 
services, etc.)</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
     
     <span class="n">pageViews</span><span class="o">.</span><span 
class="na">sink</span><span class="o">((</span><span class="n">msg</span><span 
class="o">,</span> <span class="n">collector</span><span class="o">,</span> 
<span class="n">coordinator</span><span class="o">)</span> <span 
class="o">-&gt;</span> <span class="o">{</span>
         <span class="c1">// Construct a new outgoing message, and send it to a 
kafka topic named TransformedPageViewEvent.</span>
-        <span class="n">collector</span><span class="o">.</span><span 
class="na">send</span><span class="o">(</span><span class="k">new</span> <span 
class="n">OutgoingMessageEnvelope</span><span class="o">(</span><span 
class="k">new</span> <span class="n">SystemStream</span><span 
class="o">(</span><span class="err">“</span><span class="n">kafka</span><span 
class="err">”</span><span class="o">,</span> <span 
class="err">“</span><span class="n">TransformedPageViewEvent</span><span 
class="err">”</span><span class="o">),</span> <span class="n">msg</span><span 
class="o">));</span>
+        <span class="n">collector</span><span class="o">.</span><span 
class="na">send</span><span class="o">(</span><span class="k">new</span> <span 
class="nc">OutgoingMessageEnvelope</span><span class="o">(</span><span 
class="k">new</span> <span class="nc">SystemStream</span><span 
class="o">(</span><span class="err">“</span><span class="n">kafka</span><span 
class="err">”</span><span class="o">,</span> <span 
class="err">“</span><span class="nc">TransformedPageViewEvent</span><span 
class="err">”</span><span class="o">),</span> <span class="n">msg</span><span 
class="o">));</span>
     <span class="o">}</span> <span class="o">);</span>
         </code></pre></figure>
 
 <h4 id="join-stream-stream">Join (Stream-Stream)</h4>
-
 <p>The Stream-Stream Join operator joins messages from two MessageStreams 
using the provided pairwise <a 
href="javadocs/org/apache/samza/operators/functions/JoinFunction.html">JoinFunction</a>.
 Messages are joined when the key extracted from a message from the first 
stream matches the key extracted from a message in the second stream. Messages 
in each stream are retained for the provided ttl duration and join results are 
emitted as matches are found. Join only retains the latest message for each 
input stream.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 
     <span class="c1">// Joins a stream of OrderRecord with a stream of 
ShipmentRecord by orderId with a TTL of 20 minutes.</span>
     <span class="c1">// Results are produced to a new stream of 
FulfilledOrderRecord.</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">OrderRecord</span><span class="o">&gt;</span> <span 
class="n">orders</span> <span class="o">=</span> <span class="err">…</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">ShipmentRecord</span><span class="o">&gt;</span> <span 
class="n">shipments</span> <span class="o">=</span> <span class="err">…</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">OrderRecord</span><span class="o">&gt;</span> <span 
class="n">orders</span> <span class="o">=</span> <span class="err">…</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">ShipmentRecord</span><span class="o">&gt;</span> <span 
class="n">shipments</span> <span class="o">=</span> <span class="err">…</span>
 
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">FulfilledOrderRecord</span><span class="o">&gt;</span> <span 
class="n">shippedOrders</span> <span class="o">=</span> <span 
class="n">orders</span><span class="o">.</span><span 
class="na">join</span><span class="o">(</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">FulfilledOrderRecord</span><span class="o">&gt;</span> <span 
class="n">shippedOrders</span> <span class="o">=</span> <span 
class="n">orders</span><span class="o">.</span><span 
class="na">join</span><span class="o">(</span>
         <span class="n">shipments</span><span class="o">,</span> <span 
class="c1">// other stream</span>
-        <span class="k">new</span> <span 
class="n">OrderShipmentJoiner</span><span class="o">(),</span> <span 
class="c1">// join function</span>
-        <span class="k">new</span> <span class="n">StringSerde</span><span 
class="o">(),</span> <span class="c1">// serde for the join key</span>
-        <span class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">OrderRecord</span><span 
class="o">.</span><span class="na">class</span><span class="o">),</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">ShipmentRecord</span><span 
class="o">.</span><span class="na">class</span><span class="o">),</span> <span 
class="c1">// serde for both streams</span>
-        <span class="n">Duration</span><span class="o">.</span><span 
class="na">ofMinutes</span><span class="o">(</span><span 
class="mi">20</span><span class="o">),</span> <span class="c1">// join 
TTL</span>
-        <span class="s">&quot;shipped-order-stream&quot;</span><span 
class="o">)</span> <span class="c1">// operator ID</span>
+        <span class="k">new</span> <span 
class="nf">OrderShipmentJoiner</span><span class="o">(),</span> <span 
class="c1">// join function</span>
+        <span class="k">new</span> <span class="nf">StringSerde</span><span 
class="o">(),</span> <span class="c1">// serde for the join key</span>
+        <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">OrderRecord</span><span 
class="o">.</span><span class="na">class</span><span class="o">),</span> <span 
class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">ShipmentRecord</span><span 
class="o">.</span><span class="na">class</span><span class="o">),</span> <span 
class="c1">// serde for both streams</span>
+        <span class="nc">Duration</span><span class="o">.</span><span 
class="na">ofMinutes</span><span class="o">(</span><span 
class="mi">20</span><span class="o">),</span> <span class="c1">// join 
TTL</span>
+        <span class="s">"shipped-order-stream"</span><span class="o">)</span> 
<span class="c1">// operator ID</span>
 
     <span class="c1">// Constructs a new FulfilledOrderRecord by extracting 
the order timestamp from the OrderRecord and the shipment timestamp from the 
ShipmentRecord.</span>
-    <span class="kd">class</span> <span class="nc">OrderShipmentJoiner</span> 
<span class="kd">implements</span> <span class="n">JoinFunction</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">OrderRecord</span><span class="o">,</span> <span 
class="n">ShipmentRecord</span><span class="o">,</span> <span 
class="n">FulfilledOrderRecord</span><span class="o">&gt;</span> <span 
class="o">{</span>
+    <span class="kd">class</span> <span class="nc">OrderShipmentJoiner</span> 
<span class="kd">implements</span> <span class="nc">JoinFunction</span><span 
class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> 
<span class="nc">OrderRecord</span><span class="o">,</span> <span 
class="nc">ShipmentRecord</span><span class="o">,</span> <span 
class="nc">FulfilledOrderRecord</span><span class="o">&gt;</span> <span 
class="o">{</span>
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span 
class="n">FulfilledOrderRecord</span> <span class="nf">apply</span><span 
class="o">(</span><span class="n">OrderRecord</span> <span 
class="n">message</span><span class="o">,</span> <span 
class="n">ShipmentRecord</span> <span class="n">otherMessage</span><span 
class="o">)</span> <span class="o">{</span>
-        <span class="k">return</span> <span class="k">new</span> <span 
class="n">FulfilledOrderRecord</span><span class="o">(</span><span 
class="n">message</span><span class="o">.</span><span 
class="na">orderId</span><span class="o">,</span> <span 
class="n">message</span><span class="o">.</span><span 
class="na">orderTimestamp</span><span class="o">,</span> <span 
class="n">otherMessage</span><span class="o">.</span><span 
class="na">shipTimestamp</span><span class="o">);</span>
+      <span class="kd">public</span> <span 
class="nc">FulfilledOrderRecord</span> <span class="nf">apply</span><span 
class="o">(</span><span class="nc">OrderRecord</span> <span 
class="n">message</span><span class="o">,</span> <span 
class="nc">ShipmentRecord</span> <span class="n">otherMessage</span><span 
class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="k">new</span> <span 
class="nf">FulfilledOrderRecord</span><span class="o">(</span><span 
class="n">message</span><span class="o">.</span><span 
class="na">orderId</span><span class="o">,</span> <span 
class="n">message</span><span class="o">.</span><span 
class="na">orderTimestamp</span><span class="o">,</span> <span 
class="n">otherMessage</span><span class="o">.</span><span 
class="na">shipTimestamp</span><span class="o">);</span>
       <span class="o">}</span>
 
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="n">String</span> <span 
class="nf">getFirstKey</span><span class="o">(</span><span 
class="n">OrderRecord</span> <span class="n">message</span><span 
class="o">)</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="nc">String</span> <span 
class="nf">getFirstKey</span><span class="o">(</span><span 
class="nc">OrderRecord</span> <span class="n">message</span><span 
class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">message</span><span 
class="o">.</span><span class="na">orderId</span><span class="o">;</span>
       <span class="o">}</span>
 
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="n">String</span> <span 
class="nf">getSecondKey</span><span class="o">(</span><span 
class="n">ShipmentRecord</span> <span class="n">message</span><span 
class="o">)</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="nc">String</span> <span 
class="nf">getSecondKey</span><span class="o">(</span><span 
class="nc">ShipmentRecord</span> <span class="n">message</span><span 
class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">message</span><span 
class="o">.</span><span class="na">orderId</span><span class="o">;</span>
       <span class="o">}</span>
     <span class="o">}</span>
     </code></pre></figure>
 
 <h4 id="join-stream-table">Join (Stream-Table)</h4>
-
 <p>The Stream-Table Join operator joins messages from a MessageStream with 
messages in a Table using the provided <a 
href="javadocs/org/apache/samza/operators/functions/StreamTableJoinFunction.html">StreamTableJoinFunction</a>.
 Messages are joined when the key extracted from a message in the stream 
matches the key for a record in the table. The join function is invoked with 
both the message and the record. If a record is not found in the table, a null 
value is provided. The join function can choose to return null for an inner 
join, or an output message for a left outer join. For join correctness, it is 
important to ensure the input stream and table are partitioned using the same 
key (e.g., using the partitionBy operator) as this impacts the physical 
placement of data.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">pageViews</span>
-        <span class="o">.</span><span class="na">partitionBy</span><span 
class="o">(</span><span class="n">pv</span> <span class="o">-&gt;</span> <span 
class="n">pv</span><span class="o">.</span><span 
class="na">getMemberId</span><span class="o">,</span> <span class="n">pv</span> 
<span class="o">-&gt;</span> <span class="n">pv</span><span class="o">,</span> 
<span class="s">&quot;page-views-by-memberid&quot;</span><span 
class="o">)</span>
-        <span class="o">.</span><span class="na">join</span><span 
class="o">(</span><span class="n">profiles</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">PageViewToProfileTableJoiner</span><span 
class="o">())</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="n">pageViews</span>
+        <span class="o">.</span><span class="na">partitionBy</span><span 
class="o">(</span><span class="n">pv</span> <span class="o">-&gt;</span> <span 
class="n">pv</span><span class="o">.</span><span 
class="na">getMemberId</span><span class="o">,</span> <span class="n">pv</span> 
<span class="o">-&gt;</span> <span class="n">pv</span><span class="o">,</span> 
<span class="s">"page-views-by-memberid"</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">join</span><span 
class="o">(</span><span class="n">profiles</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">PageViewToProfileTableJoiner</span><span 
class="o">())</span>
         <span class="o">...</span>
     
     <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">PageViewToProfileTableJoiner</span> <span 
class="kd">implements</span> 
-        <span class="n">StreamTableJoinFunction</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> 
<span class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;,</span> <span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Profile</span><span class="o">&gt;,</span> <span 
class="n">EnrichedPageView</span><span class="o">&gt;</span> <span 
class="o">{</span>
+        <span class="nc">StreamTableJoinFunction</span><span 
class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> 
<span class="no">KV</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">,</span> <span 
class="nc">PageView</span><span class="o">&gt;,</span> <span 
class="no">KV</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">,</span> <span 
class="nc">Profile</span><span class="o">&gt;,</span> <span 
class="nc">EnrichedPageView</span><span class="o">&gt;</span> <span 
class="o">{</span>
       
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="n">EnrichedPageView</span> 
<span class="nf">apply</span><span class="o">(</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">m</span><span class="o">,</span> <span class="n">KV</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> 
<span class="n">Profile</span><span class="o">&gt;</span> <span 
class="n">r</span><span class="o">)</span> <span class="o">{</span>
-        <span class="k">return</span> <span class="n">r</span> <span 
class="o">!=</span> <span class="kc">null</span> <span class="o">?</span> <span 
class="k">new</span> <span class="n">EnrichedPageView</span><span 
class="o">(...)</span> <span class="o">:</span> <span 
class="kc">null</span><span class="o">;</span>
+      <span class="kd">public</span> <span class="nc">EnrichedPageView</span> 
<span class="nf">apply</span><span class="o">(</span><span 
class="no">KV</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">,</span> <span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">m</span><span class="o">,</span> <span class="no">KV</span><span 
class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> 
<span class="nc">Profile</span><span class="o">&gt;</span> <span 
class="n">r</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">r</span> <span 
class="o">!=</span> <span class="kc">null</span> <span class="o">?</span> <span 
class="k">new</span> <span class="nc">EnrichedPageView</span><span 
class="o">(...)</span> <span class="o">:</span> <span 
class="kc">null</span><span class="o">;</span>
       <span class="o">}</span>
        
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="n">Integer</span> <span 
class="nf">getMessageKey</span><span class="o">(</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">message</span><span class="o">)</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="nc">Integer</span> <span 
class="nf">getMessageKey</span><span class="o">(</span><span 
class="no">KV</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">,</span> <span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">message</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">message</span><span 
class="o">.</span><span class="na">getKey</span><span class="o">();</span>
       <span class="o">}</span>
 
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="n">Integer</span> <span 
class="nf">getRecordKey</span><span class="o">(</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Profile</span><span class="o">&gt;</span> <span 
class="n">record</span><span class="o">)</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="nc">Integer</span> <span 
class="nf">getRecordKey</span><span class="o">(</span><span 
class="no">KV</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">,</span> <span 
class="nc">Profile</span><span class="o">&gt;</span> <span 
class="n">record</span><span class="o">)</span> <span class="o">{</span>
         <span class="k">return</span> <span class="n">record</span><span 
class="o">.</span><span class="na">getKey</span><span class="o">();</span>
       <span class="o">}</span>
     <span class="o">}</span>
     </code></pre></figure>
 
 <h3 id="window">Window</h3>
-
 <h4 id="windowing-concepts">Windowing Concepts</h4>
-
 <p><strong>Windows, Triggers, and WindowPanes</strong>: The window operator 
groups incoming messages in the MessageStream into finite windows. Each emitted 
result contains one or more messages in the window and is called a 
WindowPane.</p>
 
 <p>A window can have one or more associated triggers which determine when 
results from the window are emitted. Triggers can be either <a 
href="javadocs/org/apache/samza/operators/windows/Window.html#setEarlyTrigger-org.apache.samza.operators.triggers.Trigger-">early
 triggers</a> that allow emitting results speculatively before all data for the 
window has arrived, or late triggers that allow handling late messages for the 
window.</p>
@@ -981,102 +999,95 @@
 <p>An accumulating window retains window results from previous emissions. Each 
emission will contain all messages that arrived since the beginning of the 
window.</p>
 
 <h4 id="window-types">Window Types</h4>
-
 <p>The Samza High Level Streams API currently supports tumbling and session 
windows.</p>
 
 <p><strong>Tumbling Window</strong>: A tumbling window defines a series of 
contiguous, fixed size time intervals in the stream.</p>
 
 <p>Examples:</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 
     <span class="c1">// Group the pageView stream into 30 second tumbling 
windows keyed by the userId.</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">WindowPane</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Collection</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;&gt;&gt;</span> <span 
class="o">=</span> <span class="n">pageViews</span><span 
class="o">.</span><span class="na">window</span><span class="o">(</span>
-        <span class="n">Windows</span><span class="o">.</span><span 
class="na">keyedTumblingWindow</span><span class="o">(</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">WindowPane</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">Collection</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">&gt;&gt;&gt;</span> <span 
class="n">windowedStream</span> <span class="o">=</span> <span 
class="n">pageViews</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span>
+        <span class="nc">Windows</span><span class="o">.</span><span 
class="na">keyedTumblingWindow</span><span class="o">(</span>
             <span class="n">pageView</span> <span class="o">-&gt;</span> <span 
class="n">pageView</span><span class="o">.</span><span 
class="na">getUserId</span><span class="o">(),</span> <span class="c1">// key 
extractor</span>
-            <span class="n">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">30</span><span class="o">),</span> <span class="c1">// window 
duration</span>
-            <span class="k">new</span> <span class="n">StringSerde</span><span 
class="o">(),</span> <span class="k">new</span> <span 
class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span 
class="n">PageView</span><span class="o">.</span><span 
class="na">class</span><span class="o">)));</span>
+            <span class="nc">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">30</span><span class="o">),</span> <span class="c1">// window 
duration</span>
+            <span class="k">new</span> <span 
class="nf">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">)));</span>
 
     <span class="c1">// Compute the maximum value over tumbling windows of 30 
seconds.</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">integers</span> <span class="o">=</span> <span class="err">…</span>
-    <span class="n">Supplier</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">initialValue</span> <span class="o">=</span> <span 
class="o">()</span> <span class="o">-&gt;</span> <span 
class="n">Integer</span><span class="o">.</span><span 
class="na">MIN_VALUE</span><span class="o">;</span>
-    <span class="n">FoldLeftFunction</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">aggregateFunction</span> <span class="o">=</span> 
-        <span class="o">(</span><span class="n">msg</span><span 
class="o">,</span> <span class="n">oldValue</span><span class="o">)</span> 
<span class="o">-&gt;</span> <span class="n">Math</span><span 
class="o">.</span><span class="na">max</span><span class="o">(</span><span 
class="n">msg</span><span class="o">,</span> <span 
class="n">oldValue</span><span class="o">);</span>
-    
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">WindowPane</span><span class="o">&lt;</span><span 
class="n">Void</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">windowedStream</span> <span class="o">=</span> <span 
class="n">integers</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span>
-       <span class="n">Windows</span><span class="o">.</span><span 
class="na">tumblingWindow</span><span class="o">(</span>
-            <span class="n">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">30</span><span class="o">),</span> 
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">integers</span> <span class="o">=</span> <span class="err">…</span>
+    <span class="nc">Supplier</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">initialValue</span> <span class="o">=</span> <span 
class="o">()</span> <span class="o">-&gt;</span> <span 
class="nc">Integer</span><span class="o">.</span><span 
class="na">MIN_VALUE</span><span class="o">;</span>
+    <span class="nc">FoldLeftFunction</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">,</span> <span 
class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">aggregateFunction</span> <span class="o">=</span> 
+        <span class="o">(</span><span class="n">msg</span><span 
class="o">,</span> <span class="n">oldValue</span><span class="o">)</span> 
<span class="o">-&gt;</span> <span class="nc">Math</span><span 
class="o">.</span><span class="na">max</span><span class="o">(</span><span 
class="n">msg</span><span class="o">,</span> <span 
class="n">oldValue</span><span class="o">);</span>
+    
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">WindowPane</span><span class="o">&lt;</span><span 
class="nc">Void</span><span class="o">,</span> <span 
class="nc">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">windowedStream</span> <span class="o">=</span> <span 
class="n">integers</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span>
+       <span class="nc">Windows</span><span class="o">.</span><span 
class="na">tumblingWindow</span><span class="o">(</span>
+            <span class="nc">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">30</span><span class="o">),</span> 
             <span class="n">initialValue</span><span class="o">,</span> 
             <span class="n">aggregateFunction</span><span class="o">,</span> 
-            <span class="k">new</span> <span 
class="n">IntegerSerde</span><span class="o">()));</span>
+            <span class="k">new</span> <span 
class="nf">IntegerSerde</span><span class="o">()));</span>
    </code></pre></figure>
 
 <p><strong>Session Window</strong>: A session window groups a MessageStream 
into sessions. A session captures a period of activity over a MessageStream and 
is defined by a gap. A session is closed and results are emitted if no new 
messages arrive for the window for the gap duration.</p>
 
 <p>Examples:</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="c1">// Sessionize a stream of 
page views, and count the number of page-views in a session for every 
user.</span>
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="err">…</span>
-    <span class="n">Supplier</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">initialValue</span> <span class="o">=</span> <span 
class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span>
-    <span class="n">FoldLeftFunction</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">countAggregator</span> <span class="o">=</span> <span 
class="o">(</span><span class="n">pageView</span><span class="o">,</span> <span 
class="n">oldCount</span><span class="o">)</span> <span class="o">-&gt;</span> 
<span class="n">oldCount</span> <span class="o">+</span> <span 
class="mi">1</span><span class="o">;</span>
-    <span class="n">Duration</span> <span class="n">sessionGap</span> <span 
class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span 
class="na">ofMinutes</span><span class="o">(</span><span 
class="mi">3</span><span class="o">);</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="c1">// Sessionize a stream of page views with a session gap of 10 
seconds</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="err">…</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">WindowPane</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">&gt;,</span> <span 
class="nc">Collection</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">&gt;&gt;</span> <span 
class="n">windowedStream</span> <span class="o">=</span> <span 
class="n">pageViews</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span>
+      <span class="nc">Windows</span><span class="o">.</span><span 
class="na">keyedSessionWindow</span><span class="o">(</span><span 
class="n">pageView</span> <span class="o">-&gt;</span> <span 
class="n">pageView</span><span class="o">.</span><span 
class="na">getUserId</span><span class="o">(),</span> <span 
class="nc">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">10</span><span class="o">)));</span>
+    
+    <span class="c1">// Sessionize a stream of page views, and count the 
number of page-views in a session for every user.</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="err">…</span>
+    <span class="nc">Supplier</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">initialValue</span> <span class="o">=</span> <span 
class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span>
+    <span class="nc">FoldLeftFunction</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">,</span> <span 
class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">countAggregator</span> <span class="o">=</span> <span 
class="o">(</span><span class="n">pageView</span><span class="o">,</span> <span 
class="n">oldCount</span><span class="o">)</span> <span class="o">-&gt;</span> 
<span class="n">oldCount</span> <span class="o">+</span> <span 
class="mi">1</span><span class="o">;</span>
+    <span class="nc">Duration</span> <span class="n">sessionGap</span> <span 
class="o">=</span> <span class="nc">Duration</span><span 
class="o">.</span><span class="na">ofMinutes</span><span 
class="o">(</span><span class="mi">3</span><span class="o">);</span>
     
-    <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">WindowPane</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">sessionCounts</span> <span class="o">=</span> <span 
class="n">pageViews</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span>
-        <span class="n">Windows</span><span class="o">.</span><span 
class="na">keyedSessionWindow</span><span class="o">(</span>
+    <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">WindowPane</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">sessionCounts</span> <span class="o">=</span> <span 
class="n">pageViews</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span>
+        <span class="nc">Windows</span><span class="o">.</span><span 
class="na">keyedSessionWindow</span><span class="o">(</span>
             <span class="n">pageView</span> <span class="o">-&gt;</span> <span 
class="n">pageView</span><span class="o">.</span><span 
class="na">getUserId</span><span class="o">(),</span> 
             <span class="n">sessionGap</span><span class="o">,</span> 
             <span class="n">initialValue</span><span class="o">,</span> 
             <span class="n">countAggregator</span><span class="o">,</span>

[... 72 lines stripped ...]


Reply via email to