Modified: samza/site/startup/preview/index.html
URL: 
http://svn.apache.org/viewvc/samza/site/startup/preview/index.html?rev=1832281&r1=1832280&r2=1832281&view=diff
==============================================================================
--- samza/site/startup/preview/index.html (original)
+++ samza/site/startup/preview/index.html Fri May 25 22:28:38 2018
@@ -185,27 +185,27 @@
 
 <p>There are four layers in the architecture. The following sections describe 
each of the layers.</p>
 
-<h4 id="i.-high-level-api">I. High Level API</h4>
+<h4 id="i-high-level-api">I. High Level API</h4>
 
 <p>The high level API provides the libraries to define your application logic. 
The <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a>
 is the central abstraction which your application must implement. You start by 
declaring your inputs as instances of <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html">MessageStream</a>.
 Then you can apply operators on each MessageStream like map, filter, window, 
and join to define the whole end-to-end data processing in a single program.</p>
 
 <p>For a deeper dive into the high level API, see <a 
href="#high-level-api">high level API section</a> below.</p>
 
-<h4 id="ii.-applicationrunner">II. ApplicationRunner</h4>
+<h4 id="ii-applicationrunner">II. ApplicationRunner</h4>
 
 <p>Samza uses an <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/runtime/ApplicationRunner.html">ApplicationRunner</a>
 to run a stream application. The ApplicationRunner generates the configs (such 
as input/output streams), creates intermediate streams, and starts the 
execution. There are two types of ApplicationRunner:</p>
 
 <p><strong>RemoteApplicationRunner</strong> - submits the application to a 
remote cluster. This runner is invoked via the <em>run-app.sh</em> script. To 
use RemoteApplicationRunner, set the following configurations</p>
 
-<div class="highlight"><pre><code class="jproperties"><span class="c"># The 
StreamApplication class to run</span>
+<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span class="c"># The StreamApplication 
class to run</span>
 <span class="na">app.class</span><span class="o">=</span><span 
class="s">com.company.job.YourStreamApplication</span>
-<span class="na">job.factory.class</span><span class="o">=</span><span 
class="s">org.apache.samza.job.yarn.YarnJobFactory</span></code></pre></div>
+<span class="na">job.factory.class</span><span class="o">=</span><span 
class="s">org.apache.samza.job.yarn.YarnJobFactory</span></code></pre></figure>
 
 <p>Then use <em>run-app.sh</em> to run the application in the remote cluster. 
The script will invoke the RemoteApplicationRunner, which will launch one or 
more jobs using the factory specified with <em>job.factory.class</em>. Follow 
the <a href="/learn/tutorials/latest/hello-samza-high-level-yarn.html">yarn 
deployment tutorial</a> to try it out.</p>
 
 <p><strong>LocalApplicationRunner</strong> - runs the application in the JVM 
process of the runner. For example, to launch your application on multiple 
machines using ZooKeeper for coordination, you can run multiple instances of 
LocalApplicationRunner on various machines. After the applications load they 
will start cordinatinating their actions through ZooKeeper. Here is an example 
to run the StreamApplication in your program using the 
LocalApplicationRunner:</p>
 
-<div class="highlight"><pre><code class="java"><span class="kd">public</span> 
<span class="kd">static</span> <span class="kt">void</span> <span 
class="nf">main</span><span class="o">(</span><span 
class="n">String</span><span class="o">[]</span> <span 
class="n">args</span><span class="o">)</span> <span class="kd">throws</span> 
<span class="n">Exception</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kd">public</span> <span 
class="kd">static</span> <span class="kt">void</span> <span 
class="nf">main</span><span class="o">(</span><span 
class="n">String</span><span class="o">[]</span> <span 
class="n">args</span><span class="o">)</span> <span class="kd">throws</span> 
<span class="n">Exception</span> <span class="o">{</span>
 <span class="n">CommandLine</span> <span class="n">cmdLine</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">CommandLine</span><span class="o">();</span>
 <span class="n">Config</span> <span class="n">config</span> <span 
class="o">=</span> <span class="n">cmdLine</span><span class="o">.</span><span 
class="na">loadConfig</span><span class="o">(</span><span 
class="n">cmdLine</span><span class="o">.</span><span 
class="na">parser</span><span class="o">().</span><span 
class="na">parse</span><span class="o">(</span><span class="n">args</span><span 
class="o">));</span>
 <span class="n">LocalApplicationRunner</span> <span 
class="n">localRunner</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">LocalApplicationRunner</span><span 
class="o">(</span><span class="n">config</span><span class="o">);</span>
@@ -215,7 +215,7 @@
 <span class="c1">// Wait for the application to finish</span>
 <span class="n">localRunner</span><span class="o">.</span><span 
class="na">waitForFinish</span><span class="o">();</span>
 <span class="n">System</span><span class="o">.</span><span 
class="na">out</span><span class="o">.</span><span 
class="na">println</span><span class="o">(</span><span 
class="s">&quot;Application completed with status &quot;</span> <span 
class="o">+</span> <span class="n">localRunner</span><span 
class="o">.</span><span class="na">status</span><span class="o">(</span><span 
class="n">app</span><span class="o">));</span>
-<span class="o">}</span></code></pre></div>
+<span class="o">}</span></code></pre></figure>
 
 <p>Follow the <a 
href="/learn/tutorials/latest/hello-samza-high-level-zk.html">ZooKeeper 
deployment tutorial</a> to try it out.</p>
 
@@ -232,7 +232,7 @@
 
 <p><img src="/img/latest/learn/documentation/introduction/execution-plan.png" 
alt="Execution plan" style="max-width: 100%; height: auto;" 
onclick="window.open(this.src)"/></p>
 
-<h4 id="iii.-execution-models">III. Execution Models</h4>
+<h4 id="iii-execution-models">III. Execution Models</h4>
 
 <p>Samza supports two types of execution models: cluster based execution and 
embedded execution.</p>
 
@@ -247,7 +247,7 @@
 
 <p>For more details on running Samza in embedded mode, take a look at the <a 
href="#flexible-deployment-model">flexible deployment model</a> section 
below.</p>
 
-<h4 id="iv.-processor">IV. Processor</h4>
+<h4 id="iv-processor">IV. Processor</h4>
 
 <p>The lowest execution unit of a Samza application is the processor. It reads 
the configs generated from the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/runtime/ApplicationRunner.html">ApplicationRunner</a>
 and processes the input stream partitions assigned by the JobCoordinator. It 
can access local state using a <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/storage/kv/KeyValueStore.html">KeyValueStore</a>
 implementation (e.g. RocksDB or in-memory) and remote state (e.g. REST 
service) using multithreading.</p>
 
@@ -274,11 +274,11 @@
 
 <p>When writing your stream processing application using the Samza high-level 
API, you should implement a <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a>
 and define your processing logic in the init method.</p>
 
-<div class="highlight"><pre><code class="java"><span class="kd">public</span> 
<span class="kt">void</span> <span class="nf">init</span><span 
class="o">(</span><span class="n">StreamGraph</span> <span 
class="n">graph</span><span class="o">,</span> <span class="n">Config</span> 
<span class="n">config</span><span class="o">)</span> <span class="o">{</span> 
<span class="err">…</span> <span class="o">}</span></code></pre></div>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kd">public</span> <span 
class="kt">void</span> <span class="nf">init</span><span 
class="o">(</span><span class="n">StreamGraph</span> <span 
class="n">graph</span><span class="o">,</span> <span class="n">Config</span> 
<span class="n">config</span><span class="o">)</span> <span class="o">{</span> 
<span class="err">…</span> <span class="o">}</span></code></pre></figure>
 
 <p>For example, here is a StreamApplication that validates and decorates page 
views with viewer&rsquo;s profile information.</p>
 
-<div class="highlight"><pre><code class="java"><span class="kd">public</span> 
<span class="kd">class</span> <span class="nc">BadPageViewFilter</span> <span 
class="kd">implements</span> <span class="n">StreamApplication</span> <span 
class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kd">public</span> <span 
class="kd">class</span> <span class="nc">BadPageViewFilter</span> <span 
class="kd">implements</span> <span class="n">StreamApplication</span> <span 
class="o">{</span>
   <span class="nd">@Override</span>
 <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">init</span><span class="o">(</span><span 
class="n">StreamGraph</span> <span class="n">graph</span><span 
class="o">,</span> <span class="n">Config</span> <span 
class="n">config</span><span class="o">)</span> <span class="o">{</span>
     <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span 
class="n">graph</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="err">“</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">));</span>
@@ -287,7 +287,7 @@
                       <span class="o">.</span><span class="na">map</span><span 
class="o">(</span><span class="k">this</span><span class="o">::</span><span 
class="n">addProfileInformation</span><span class="o">)</span>
                       <span class="o">.</span><span 
class="na">sendTo</span><span class="o">(</span><span 
class="n">graph</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="err">“</span><span class="n">decorated</span><span 
class="o">-</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">DecoratedPageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">)))</span>
  <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
+<span class="o">}</span></code></pre></figure>
 
 <h3 id="messagestream">MessageStream</h3>
 
@@ -302,43 +302,44 @@
 
 <p>There are 3 simple steps to write your stream processing applications using 
the Samza high-level API.</p>
 
-<h3 id="step-1:-obtain-the-input-streams:">Step 1: Obtain the input 
streams:</h3>
+<h3 id="step-1-obtain-the-input-streams">Step 1: Obtain the input streams:</h3>
 
 <p>You can obtain the MessageStream for your input stream ID 
(“page-views”) using StreamGraph.getInputStream.</p>
 
-<div class="highlight"><pre><code class="java"><span 
class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViewInput</span> <span class="o">=</span> <span 
class="n">graph</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="err">“</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">PageView</span><span 
class="o">.</span><span class="na">class</span><span 
class="o">));</span></code></pre></div>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> 
<span class="n">pageViewInput</span> <span class="o">=</span> <span 
class="n">graph</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="err">“</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">));</span>
+    </code></pre></figure>
 
 <p>The first parameter <code>page-views</code> is the logical stream ID. Each 
stream ID is associated with a <em>physical name</em> and a <em>system</em>. By 
default, Samza uses the stream ID as the physical stream name and accesses the 
stream on the default system which is specified with the property 
“job.default.system”. However, the <em>physical name</em> and 
<em>system</em> properties can be overridden in configuration. For example, the 
following configuration defines the stream ID &ldquo;page-views&rdquo; as an 
alias for the PageViewEvent topic in a local Kafka cluster.</p>
 
-<div class="highlight"><pre><code class="jproperties"><span 
class="na">streams.page-views.samza.system</span><span class="o">=</span><span 
class="s">kafka</span>
+<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">streams.page-views.samza.system</span><span class="o">=</span><span 
class="s">kafka</span>
 <span class="na">systems.kafka.samza.factory</span><span 
class="o">=</span><span 
class="s">org.apache.samza.system.kafka.KafkaSystemFactory</span>
 <span class="na">systems.kafka.consumer.zookeeper.connect</span><span 
class="o">=</span><span class="s">localhost:2181</span>
 <span class="na">systems.kafka.producer.bootstrap.servers</span><span 
class="o">=</span><span class="s">localhost:9092</span>
-<span class="na">streams.page-views.samza.physical.name</span><span 
class="o">=</span><span class="s">PageViewEvent</span></code></pre></div>
+<span class="na">streams.page-views.samza.physical.name</span><span 
class="o">=</span><span class="s">PageViewEvent</span></code></pre></figure>
 
 <p>The second parameter is a serde to de-serialize the incoming message.</p>
 
-<h3 id="step-2:-define-your-transformation-logic:">Step 2: Define your 
transformation logic:</h3>
+<h3 id="step-2-define-your-transformation-logic">Step 2: Define your 
transformation logic:</h3>
 
 <p>You are now ready to define your StreamApplication logic as a series of 
transformations on MessageStreams.</p>
 
-<div class="highlight"><pre><code class="java"><span 
class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">DecoratedPageViews</span><span class="o">&gt;</span> <span 
class="n">decoratedPageViews</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">DecoratedPageViews</span><span 
class="o">&gt;</span> <span class="n">decoratedPageViews</span>
                                    <span class="o">=</span> <span 
class="n">pageViewInput</span><span class="o">.</span><span 
class="na">filter</span><span class="o">(</span><span 
class="k">this</span><span class="o">::</span><span 
class="n">isValidPageView</span><span class="o">)</span>
-                                                  <span 
class="o">.</span><span class="na">map</span><span class="o">(</span><span 
class="k">this</span><span class="o">::</span><span 
class="n">addProfileInformation</span><span 
class="o">);</span></code></pre></div>
+                                                  <span 
class="o">.</span><span class="na">map</span><span class="o">(</span><span 
class="k">this</span><span class="o">::</span><span 
class="n">addProfileInformation</span><span 
class="o">);</span></code></pre></figure>
 
-<h3 id="step-3:-write-the-output-to-an-output-stream:">Step 3: Write the 
output to an output stream:</h3>
+<h3 id="step-3-write-the-output-to-an-output-stream">Step 3: Write the output 
to an output stream:</h3>
 
 <p>Finally, you can create an OutputStream using StreamGraph.getOutputStream 
and send the transformed messages through it.</p>
 
-<div class="highlight"><pre><code class="java"><span class="c1">// Send 
messages with userId as the key to “decorated-page-views”.</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// Send messages with userId as 
the key to “decorated-page-views”.</span>
 <span class="n">decoratedPageViews</span><span class="o">.</span><span 
class="na">sendTo</span><span class="o">(</span>
                           <span class="n">graph</span><span 
class="o">.</span><span class="na">getOutputStream</span><span 
class="o">(</span><span class="err">“</span><span 
class="n">decorated</span><span class="o">-</span><span 
class="n">page</span><span class="o">-</span><span class="n">views</span><span 
class="err">”</span><span class="o">,</span>
-                                                <span class="k">new</span> 
<span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span 
class="n">DecoratedPageView</span><span class="o">.</span><span 
class="na">class</span><span class="o">)));</span></code></pre></div>
+                                                <span class="k">new</span> 
<span class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span 
class="n">DecoratedPageView</span><span class="o">.</span><span 
class="na">class</span><span class="o">)));</span></code></pre></figure>
 
 <p>The first parameter <code>decorated-page-views</code> is a logical stream 
ID. The properties for this stream ID can be overridden just like the stream 
IDs for input streams. For example:</p>
 
-<div class="highlight"><pre><code class="jproperties"><span 
class="na">streams.decorated-page-views.samza.system</span><span 
class="o">=</span><span class="s">kafka</span>
-<span class="na">streams.decorated-page-views.samza.physical.name</span><span 
class="o">=</span><span 
class="s">DecoratedPageViewEvent</span></code></pre></div>
+<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">streams.decorated-page-views.samza.system</span><span 
class="o">=</span><span class="s">kafka</span>
+<span class="na">streams.decorated-page-views.samza.physical.name</span><span 
class="o">=</span><span 
class="s">DecoratedPageViewEvent</span></code></pre></figure>
 
 <p>The second parameter is a serde to de-serialize the outgoing message.</p>
 
@@ -350,34 +351,34 @@
 
 <p>Applies the provided 1:1 <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/MapFunction.html">MapFunction</a>
 to each element in the MessageStream and returns the transformed 
MessageStream. The MapFunction takes in a single message and returns a single 
message (potentially of a different type).</p>
 
-<div class="highlight"><pre><code class="java"><span 
class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">numbers</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> 
<span class="n">numbers</span> <span class="o">=</span> <span 
class="o">...</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">tripled</span><span class="o">=</span> <span 
class="n">numbers</span><span class="o">.</span><span 
class="na">map</span><span class="o">(</span><span class="n">m</span> <span 
class="o">-&gt;</span> <span class="n">m</span> <span class="o">*</span> <span 
class="mi">3</span><span class="o">)</span>
-<span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">stringified</span> <span class="o">=</span> <span 
class="n">numbers</span><span class="o">.</span><span 
class="na">map</span><span class="o">(</span><span class="n">m</span> <span 
class="o">-&gt;</span> <span class="n">String</span><span 
class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span 
class="n">m</span><span class="o">))</span></code></pre></div>
+<span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">stringified</span> <span class="o">=</span> <span 
class="n">numbers</span><span class="o">.</span><span 
class="na">map</span><span class="o">(</span><span class="n">m</span> <span 
class="o">-&gt;</span> <span class="n">String</span><span 
class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span 
class="n">m</span><span class="o">))</span></code></pre></figure>
 
 <h3 id="flatmap">Flatmap</h3>
 
 <p>Applies the provided 1:n <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FlatMapFunction.html">FlatMapFunction</a>
 to each element in the MessageStream and returns the transformed 
MessageStream. The FlatMapFunction takes in a single message and returns zero 
or more messages.</p>
 
-<div class="highlight"><pre><code class="java"><span 
class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">sentence</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> 
<span class="n">sentence</span> <span class="o">=</span> <span 
class="o">...</span>
 <span class="c1">// Parse the sentence into its individual words splitting by 
space</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> 
<span class="o">=</span> <span class="n">sentence</span><span 
class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span 
class="n">sentence</span> <span class="o">-&gt;</span>
-                                                          <span 
class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span 
class="n">sentence</span><span class="o">.</span><span 
class="na">split</span><span class="o">(</span><span class="err">“</span> 
<span class="err">”</span><span class="o">))</span></code></pre></div>
+                                                          <span 
class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span 
class="n">sentence</span><span class="o">.</span><span 
class="na">split</span><span class="o">(</span><span class="err">“</span> 
<span class="err">”</span><span class="o">))</span></code></pre></figure>
 
 <h3 id="filter">Filter</h3>
 
 <p>Applies the provided <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FilterFunction.html">FilterFunction</a>
 to the MessageStream and returns the filtered MessageStream. The 
FilterFunction is a predicate that specifies whether a message should be 
retained in the filtered stream. Messages for which the FilterFunction returns 
false are filtered out.</p>
 
-<div class="highlight"><pre><code class="java"><span 
class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> 
<span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> 
<span class="n">words</span> <span class="o">=</span> <span class="o">...</span>
 <span class="c1">// Extract only the long words</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">longWords</span> <span class="o">=</span> <span 
class="n">words</span><span class="o">.</span><span 
class="na">filter</span><span class="o">(</span><span class="n">word</span> 
<span class="o">-&gt;</span> <span class="n">word</span><span 
class="o">.</span><span class="na">size</span><span class="o">()</span> <span 
class="o">&gt;</span> <span class="mi">15</span><span class="o">);</span>
 <span class="c1">// Extract only the short words</span>
-<span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">shortWords</span> <span class="o">=</span> <span 
class="n">words</span><span class="o">.</span><span 
class="na">filter</span><span class="o">(</span><span class="n">word</span> 
<span class="o">-&gt;</span> <span class="n">word</span><span 
class="o">.</span><span class="na">size</span><span class="o">()</span> <span 
class="o">&lt;</span> <span class="mi">3</span><span 
class="o">);</span></code></pre></div>
+<span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">shortWords</span> <span class="o">=</span> <span 
class="n">words</span><span class="o">.</span><span 
class="na">filter</span><span class="o">(</span><span class="n">word</span> 
<span class="o">-&gt;</span> <span class="n">word</span><span 
class="o">.</span><span class="na">size</span><span class="o">()</span> <span 
class="o">&lt;</span> <span class="mi">3</span><span 
class="o">);</span></code></pre></figure>
 
 <h3 id="partitionby">PartitionBy</h3>
 
 <p>Re-partitions this MessageStream using the key returned by the provided 
keyExtractor and returns the transformed MessageStream. Messages are sent 
through an intermediate stream during repartitioning.</p>
 
-<div class="highlight"><pre><code class="java"><span 
class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> 
<span class="n">pageViews</span> <span class="o">=</span> <span 
class="o">...</span>
 <span class="c1">// Repartition pageView by userId.</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;&gt;</span> <span 
class="n">partitionedPageViews</span> <span class="o">=</span>
                                         <span class="n">pageViews</span><span 
class="o">.</span><span class="na">partitionBy</span><span 
class="o">(</span><span class="n">pageView</span> <span class="o">-&gt;</span> 
<span class="n">pageView</span><span class="o">.</span><span 
class="na">getUserId</span><span class="o">(),</span> <span class="c1">// key 
extractor</span>
@@ -385,41 +386,41 @@
                                         <span class="n">KVSerde</span><span 
class="o">.</span><span class="na">of</span><span class="o">(</span><span 
class="k">new</span> <span class="n">StringSerde</span><span 
class="o">(),</span> <span class="k">new</span> <span 
class="n">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span 
class="n">PageView</span><span class="o">.</span><span 
class="na">class</span><span class="o">)),</span> <span class="c1">// 
serdes</span>
                                         <span 
class="s">&quot;partitioned-page-views&quot;</span><span class="o">)</span> 
<span class="c1">// operator ID</span>
 
-<span class="n">The</span> <span class="n">operator</span> <span 
class="n">ID</span> <span class="n">should</span> <span class="n">be</span> 
<span class="n">unique</span> <span class="k">for</span> <span 
class="n">an</span> <span class="n">operator</span> <span 
class="n">within</span> <span class="n">the</span> <span 
class="n">application</span> <span class="n">and</span> <span 
class="n">is</span> <span class="n">used</span> <span class="n">to</span> <span 
class="n">identify</span> <span class="n">the</span> <span 
class="n">streams</span> <span class="n">and</span> <span 
class="n">stores</span> <span class="n">created</span> <span 
class="n">by</span> <span class="n">the</span> <span 
class="n">operator</span><span class="o">.</span></code></pre></div>
+<span class="n">The</span> <span class="n">operator</span> <span 
class="n">ID</span> <span class="n">should</span> <span class="n">be</span> 
<span class="n">unique</span> <span class="k">for</span> <span 
class="n">an</span> <span class="n">operator</span> <span 
class="n">within</span> <span class="n">the</span> <span 
class="n">application</span> <span class="n">and</span> <span 
class="n">is</span> <span class="n">used</span> <span class="n">to</span> <span 
class="n">identify</span> <span class="n">the</span> <span 
class="n">streams</span> <span class="n">and</span> <span 
class="n">stores</span> <span class="n">created</span> <span 
class="n">by</span> <span class="n">the</span> <span 
class="n">operator</span><span class="o">.</span></code></pre></figure>
 
 <h3 id="merge">Merge</h3>
 
 <p>Merges the MessageStream with all the provided MessageStreams and returns 
the merged stream.</p>
 
-<div class="highlight"><pre><code class="java"><span 
class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">ServiceCall</span><span class="o">&gt;</span> <span 
class="n">serviceCall1</span> <span class="o">=</span> <span 
class="o">...</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">ServiceCall</span><span 
class="o">&gt;</span> <span class="n">serviceCall1</span> <span 
class="o">=</span> <span class="o">...</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">ServiceCall</span><span class="o">&gt;</span> <span 
class="n">serviceCall2</span> <span class="o">=</span> <span 
class="o">...</span>
 <span class="c1">// Merge individual “ServiceCall” streams and create a 
new merged MessageStream</span>
-<span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">ServiceCall</span><span class="o">&gt;</span> <span 
class="n">serviceCallMerged</span> <span class="o">=</span> <span 
class="n">serviceCall1</span><span class="o">.</span><span 
class="na">merge</span><span class="o">(</span><span 
class="n">serviceCall2</span><span class="o">)</span></code></pre></div>
+<span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">ServiceCall</span><span class="o">&gt;</span> <span 
class="n">serviceCallMerged</span> <span class="o">=</span> <span 
class="n">serviceCall1</span><span class="o">.</span><span 
class="na">merge</span><span class="o">(</span><span 
class="n">serviceCall2</span><span class="o">)</span></code></pre></figure>
 
 <p>The merge transform preserves the order of each MessageStream, so if 
message <code>m1</code> appears before <code>m2</code> in any provided stream, 
then, <code>m1</code> also appears before <code>m2</code> in the merged 
stream.</p>
 
 <p>As an alternative to the <code>merge</code> instance method, you also can 
use the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-">MessageStream#mergeAll</a>
 static method to merge MessageStreams without operating on an initial 
stream.</p>
 
-<h3 id="sendto-(stream)">SendTo (stream)</h3>
+<h3 id="sendto-stream">SendTo (stream)</h3>
 
 <p>Sends all messages from this MessageStream to the provided OutputStream. 
You can specify the key and the value to be used for the outgoing message.</p>
 
-<div class="highlight"><pre><code class="java"><span class="c1">// Output a 
new message with userId as the key and region as the value to the 
“user-region” stream.</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// Output a new message with 
userId as the key and region as the value to the “user-region” 
stream.</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;&gt;</span> <span 
class="n">keyedPageViews</span> <span class="o">=</span> <span 
class="n">pageViews</span><span class="o">.</span><span 
class="na">map</span><span class="o">(</span><span class="n">KV</span><span 
class="o">.</span><span class="na">of</span><span class="o">(</span><span 
class="n">pageView</span><span class="o">.</span><span 
class="na">getUserId</span><span class="o">(),</span> <span 
class="n">pageView</span><span class="o">.</span><span 
class="na">getRegion</span><span class="o">()));</span>
 <span class="n">OutputStream</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">userRegions</span>
                            <span class="o">=</span> <span 
class="n">graph</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="err">“</span><span class="n">user</span><span class="o">-</span><span 
class="n">region</span><span class="err">”</span><span class="o">,</span>
                                                    <span 
class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="n">StringSerde</span><span 
class="o">()));</span>
-<span class="n">keyedPageViews</span><span class="o">.</span><span 
class="na">sendTo</span><span class="o">(</span><span 
class="n">userRegions</span><span class="o">);</span></code></pre></div>
+<span class="n">keyedPageViews</span><span class="o">.</span><span 
class="na">sendTo</span><span class="o">(</span><span 
class="n">userRegions</span><span class="o">);</span></code></pre></figure>
 
-<h3 id="sendto-(table)">SendTo (table)</h3>
+<h3 id="sendto-table">SendTo (table)</h3>
 
 <p>Sends all messages from this MessageStream to the provided table, the 
expected message type is KV.</p>
 
-<div class="highlight"><pre><code class="java"><span class="c1">// Write a new 
message with memberId as the key and profile as the value to a table.</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>  <span class="c1">// Write a new message with 
memberId as the key and profile as the value to a table.</span>
   <span class="n">streamGraph</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="s">&quot;Profile&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;</span><span class="n">Profile</span><span 
class="o">&gt;())</span>
       <span class="o">.</span><span class="na">map</span><span 
class="o">(</span><span class="n">m</span> <span class="o">-&gt;</span> <span 
class="n">KV</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="n">m</span><span class="o">.</span><span 
class="na">getMemberId</span><span class="o">(),</span> <span 
class="n">m</span><span class="o">))</span>
-      <span class="o">.</span><span class="na">sendTo</span><span 
class="o">(</span><span class="n">table</span><span 
class="o">);</span></code></pre></div>
+      <span class="o">.</span><span class="na">sendTo</span><span 
class="o">(</span><span class="n">table</span><span 
class="o">);</span></code></pre></figure>
 
 <h3 id="sink">Sink</h3>
 
@@ -427,25 +428,25 @@
 
 <p>This offers more control than sendTo since the SinkFunction has access to 
the <code>MessageCollector</code> and the <code>TaskCoordinator</code>. For 
instance, you can choose to manually commit offsets, or shut-down the job using 
the TaskCoordinator APIs. This operator can also be used to send messages to 
non-Samza systems (e.g. remote databases, REST services, etc.)</p>
 
-<div class="highlight"><pre><code class="java"><span class="c1">// Repartition 
pageView by userId.</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// Repartition pageView by 
userId.</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
 <span class="n">pageViews</span><span class="o">.</span><span 
class="na">sink</span><span class="o">(</span> <span class="o">(</span><span 
class="n">msg</span><span class="o">,</span> <span 
class="n">collector</span><span class="o">,</span> <span 
class="n">coordinator</span><span class="o">)</span> <span 
class="o">-&gt;</span> <span class="o">{</span>
 <span class="c1">// Construct a new outgoing message, and send it to a kafka 
topic named TransformedPageViewEvent.</span>
  <span class="n">collector</span><span class="o">.</span><span 
class="na">send</span><span class="o">(</span><span class="k">new</span> <span 
class="n">OutgoingMessageEnvelope</span><span class="o">(</span><span 
class="k">new</span> <span class="n">SystemStream</span><span 
class="o">(</span><span class="err">“</span><span class="n">kafka</span><span 
class="err">”</span><span class="o">,</span>
                          <span class="err">“</span><span 
class="n">TransformedPageViewEvent</span><span class="err">”</span><span 
class="o">),</span> <span class="n">msg</span><span class="o">));</span>
-<span class="o">}</span> <span class="o">)</span></code></pre></div>
+<span class="o">}</span> <span class="o">)</span></code></pre></figure>
 
-<h3 id="join-(stream-stream)">Join (stream-stream)</h3>
+<h3 id="join-stream-stream">Join (stream-stream)</h3>
 
 <p>The stream-stream Join operator joins messages from two MessageStreams 
using the provided pairwise <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/JoinFunction.html">JoinFunction</a>.
 Messages are joined when the keys extracted from messages from the first 
stream match keys extracted from messages in the second stream. Messages in 
each stream are retained for the provided ttl duration and join results are 
emitted as matches are found.</p>
 
-<div class="highlight"><pre><code class="java"><span class="c1">// Joins a 
stream of OrderRecord with a stream of ShipmentRecord by orderId with a TTL of 
20 minutes.</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// Joins a stream of OrderRecord 
with a stream of ShipmentRecord by orderId with a TTL of 20 minutes.</span>
 <span class="c1">// Results are produced to a new stream of 
FulfilledOrderRecord.</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">OrderRecord</span><span class="o">&gt;</span> <span 
class="n">orders</span> <span class="o">=</span> <span class="err">…</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">ShipmentRecord</span><span class="o">&gt;</span> <span 
class="n">shipments</span> <span class="o">=</span> <span class="err">…</span>
 
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">FulfilledOrderRecord</span><span class="o">&gt;</span> <span 
class="n">shippedOrders</span> <span class="o">=</span> <span 
class="n">orders</span><span class="o">.</span><span 
class="na">join</span><span class="o">(</span><span 
class="n">shipments</span><span class="o">,</span> <span class="k">new</span> 
<span class="n">OrderShipmentJoiner</span><span class="o">(),</span>
-    <span class="k">new</span> <span class="nf">StringSerde</span><span 
class="o">(),</span> <span class="c1">// serde for the join key</span>
+    <span class="k">new</span> <span class="n">StringSerde</span><span 
class="o">(),</span> <span class="c1">// serde for the join key</span>
     <span class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">OrderRecord</span><span 
class="o">.</span><span class="na">class</span><span class="o">),</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">ShipmentRecord</span><span 
class="o">.</span><span class="na">class</span><span class="o">),</span> <span 
class="c1">// serde for both streams</span>
     <span class="n">Duration</span><span class="o">.</span><span 
class="na">ofMinutes</span><span class="o">(</span><span 
class="mi">20</span><span class="o">),</span> <span class="c1">// join 
TTL</span>
     <span class="s">&quot;shipped-order-stream&quot;</span><span 
class="o">)</span> <span class="c1">// operator ID</span>
@@ -454,7 +455,7 @@
  <span class="kd">class</span> <span class="nc">OrderShipmentJoiner</span> 
<span class="kd">implements</span> <span class="n">JoinFunction</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">OrderRecord</span><span class="o">,</span> <span 
class="n">ShipmentRecord</span><span class="o">,</span> <span 
class="n">FulfilledOrderRecord</span><span class="o">&gt;</span> <span 
class="o">{</span>
    <span class="nd">@Override</span>
    <span class="kd">public</span> <span class="n">FulfilledOrderRecord</span> 
<span class="nf">apply</span><span class="o">(</span><span 
class="n">OrderRecord</span> <span class="n">message</span><span 
class="o">,</span> <span class="n">ShipmentRecord</span> <span 
class="n">otherMessage</span><span class="o">)</span> <span class="o">{</span>
-     <span class="k">return</span> <span class="k">new</span> <span 
class="nf">FulfilledOrderRecord</span><span class="o">(</span><span 
class="n">message</span><span class="o">.</span><span 
class="na">orderId</span><span class="o">,</span> <span 
class="n">message</span><span class="o">.</span><span 
class="na">orderTimestamp</span><span class="o">,</span> <span 
class="n">otherMessage</span><span class="o">.</span><span 
class="na">shipTimestamp</span><span class="o">);</span>
+     <span class="k">return</span> <span class="k">new</span> <span 
class="n">FulfilledOrderRecord</span><span class="o">(</span><span 
class="n">message</span><span class="o">.</span><span 
class="na">orderId</span><span class="o">,</span> <span 
class="n">message</span><span class="o">.</span><span 
class="na">orderTimestamp</span><span class="o">,</span> <span 
class="n">otherMessage</span><span class="o">.</span><span 
class="na">shipTimestamp</span><span class="o">);</span>
    <span class="o">}</span>
 
    <span class="nd">@Override</span>
@@ -466,36 +467,36 @@
    <span class="kd">public</span> <span class="n">String</span> <span 
class="nf">getSecondKey</span><span class="o">(</span><span 
class="n">ShipmentRecord</span> <span class="n">message</span><span 
class="o">)</span> <span class="o">{</span>
      <span class="k">return</span> <span class="n">message</span><span 
class="o">.</span><span class="na">orderId</span><span class="o">;</span>
    <span class="o">}</span>
- <span class="o">}</span></code></pre></div>
+ <span class="o">}</span></code></pre></figure>
 
-<h3 id="join-(stream-table)">Join (stream-table)</h3>
+<h3 id="join-stream-table">Join (stream-table)</h3>
 
 <p>The stream-table Join operator joins messages from a MessageStream using 
the provided <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/StreamTableJoinFunction.html">StreamTableJoinFunction</a>.
 Messages from the input stream are joined with record in table using key 
extracted from input messages. The join function is invoked with both the 
message and the record. If a record is not found in the table, a null value is 
provided; the join function can choose to return null (inner join) or an output 
message (left outer join). For join to function properly, it is important to 
ensure the input stream and table are partitioned using the same key as this 
impacts the physical placement of data. </p>
 
-<div class="highlight"><pre><code class="java"><span 
class="n">streamGraph</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="s">&quot;PageView&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;</span><span class="n">PageView</span><span 
class="o">&gt;())</span>
-      <span class="o">.</span><span class="na">partitionBy</span><span 
class="o">(</span><span class="nl">PageView:</span><span 
class="o">:</span><span class="n">getMemberId</span><span class="o">,</span> 
<span class="n">v</span> <span class="o">-&gt;</span> <span 
class="n">v</span><span class="o">,</span> <span 
class="s">&quot;p1&quot;</span><span class="o">)</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>  <span class="n">streamGraph</span><span 
class="o">.</span><span class="na">getInputStream</span><span 
class="o">(</span><span class="s">&quot;PageView&quot;</span><span 
class="o">,</span> <span class="k">new</span> <span 
class="n">NoOpSerde</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;())</span>
+      <span class="o">.</span><span class="na">partitionBy</span><span 
class="o">(</span><span class="n">PageView</span><span class="o">::</span><span 
class="n">getMemberId</span><span class="o">,</span> <span class="n">v</span> 
<span class="o">-&gt;</span> <span class="n">v</span><span class="o">,</span> 
<span class="s">&quot;p1&quot;</span><span class="o">)</span>
       <span class="o">.</span><span class="na">join</span><span 
class="o">(</span><span class="n">table</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">PageViewToProfileJoinFunction</span><span 
class="o">())</span>
-      <span class="o">...</span></code></pre></div>
+      <span class="o">...</span></code></pre></figure>
 
-<div class="highlight"><pre><code class="java"><span class="lineno"> 1</span> 
<span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">PageViewToProfileJoinFunction</span> <span 
class="kd">implements</span> <span class="n">StreamTableJoinFunction</span>
-<span class="lineno"> 2</span>     <span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span class="n">KV</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> 
<span class="n">PageView</span><span class="o">&gt;,</span> <span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Profile</span><span class="o">&gt;,</span> <span 
class="n">EnrichedPageView</span><span class="o">&gt;</span> <span 
class="o">{</span>
-<span class="lineno"> 3</span>   <span class="nd">@Override</span>
-<span class="lineno"> 4</span>   <span class="kd">public</span> <span 
class="n">EnrichedPageView</span> <span class="nf">apply</span><span 
class="o">(</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">m</span><span class="o">,</span> <span class="n">KV</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> 
<span class="n">Profile</span><span class="o">&gt;</span> <span 
class="n">r</span><span class="o">)</span> <span class="o">{</span>
-<span class="lineno"> 5</span>     <span class="k">return</span> <span 
class="n">r</span> <span class="o">!=</span> <span class="kc">null</span> <span 
class="o">?</span> 
-<span class="lineno"> 6</span>         <span class="k">new</span> <span 
class="nf">EnrichedPageView</span><span class="o">(...)</span>
-<span class="lineno"> 7</span>       <span class="o">:</span> <span 
class="kc">null</span><span class="o">;</span>
-<span class="lineno"> 8</span>   <span class="o">}</span>
-<span class="lineno"> 9</span> 
-<span class="lineno">10</span>   <span class="nd">@Override</span>
-<span class="lineno">11</span>   <span class="kd">public</span> <span 
class="n">Integer</span> <span class="nf">getMessageKey</span><span 
class="o">(</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">message</span><span class="o">)</span> <span class="o">{</span>
-<span class="lineno">12</span>     <span class="k">return</span> <span 
class="n">message</span><span class="o">.</span><span 
class="na">getKey</span><span class="o">();</span>
-<span class="lineno">13</span>   <span class="o">}</span>
-<span class="lineno">14</span> 
-<span class="lineno">15</span>   <span class="nd">@Override</span>
-<span class="lineno">16</span>   <span class="kd">public</span> <span 
class="n">Integer</span> <span class="nf">getRecordKey</span><span 
class="o">(</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Profile</span><span class="o">&gt;</span> <span 
class="n">record</span><span class="o">)</span> <span class="o">{</span>
-<span class="lineno">17</span>     <span class="k">return</span> <span 
class="n">record</span><span class="o">.</span><span 
class="na">getKey</span><span class="o">();</span>
-<span class="lineno">18</span>   <span class="o">}</span>
-<span class="lineno">19</span> <span class="o">}</span></code></pre></div>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="lineno"> 1 </span><span 
class="kd">public</span> <span class="kd">class</span> <span 
class="nc">PageViewToProfileJoinFunction</span> <span 
class="kd">implements</span> <span class="n">StreamTableJoinFunction</span>
+<span class="lineno"> 2 </span>    <span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span class="n">KV</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> 
<span class="n">PageView</span><span class="o">&gt;,</span> <span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Profile</span><span class="o">&gt;,</span> <span 
class="n">EnrichedPageView</span><span class="o">&gt;</span> <span 
class="o">{</span>
+<span class="lineno"> 3 </span>  <span class="nd">@Override</span>
+<span class="lineno"> 4 </span>  <span class="kd">public</span> <span 
class="n">EnrichedPageView</span> <span class="nf">apply</span><span 
class="o">(</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">m</span><span class="o">,</span> <span class="n">KV</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> 
<span class="n">Profile</span><span class="o">&gt;</span> <span 
class="n">r</span><span class="o">)</span> <span class="o">{</span>
+<span class="lineno"> 5 </span>    <span class="k">return</span> <span 
class="n">r</span> <span class="o">!=</span> <span class="kc">null</span> <span 
class="o">?</span> 
+<span class="lineno"> 6 </span>        <span class="k">new</span> <span 
class="n">EnrichedPageView</span><span class="o">(...)</span>
+<span class="lineno"> 7 </span>      <span class="o">:</span> <span 
class="kc">null</span><span class="o">;</span>
+<span class="lineno"> 8 </span>  <span class="o">}</span>
+<span class="lineno"> 9 </span>
+<span class="lineno">10 </span>  <span class="nd">@Override</span>
+<span class="lineno">11 </span>  <span class="kd">public</span> <span 
class="n">Integer</span> <span class="nf">getMessageKey</span><span 
class="o">(</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">message</span><span class="o">)</span> <span class="o">{</span>
+<span class="lineno">12 </span>    <span class="k">return</span> <span 
class="n">message</span><span class="o">.</span><span 
class="na">getKey</span><span class="o">();</span>
+<span class="lineno">13 </span>  <span class="o">}</span>
+<span class="lineno">14 </span>
+<span class="lineno">15 </span>  <span class="nd">@Override</span>
+<span class="lineno">16 </span>  <span class="kd">public</span> <span 
class="n">Integer</span> <span class="nf">getRecordKey</span><span 
class="o">(</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Profile</span><span class="o">&gt;</span> <span 
class="n">record</span><span class="o">)</span> <span class="o">{</span>
+<span class="lineno">17 </span>    <span class="k">return</span> <span 
class="n">record</span><span class="o">.</span><span 
class="na">getKey</span><span class="o">();</span>
+<span class="lineno">18 </span>  <span class="o">}</span>
+<span class="lineno">19 </span><span class="o">}</span></code></pre></figure>
 
 <h3 id="window">Window</h3>
 
@@ -513,7 +514,7 @@
 
 <p>An <em>accumulating window</em> retains window results from previous 
emissions. Each emission will contain all messages that arrived since the 
beginning of the window.</p>
 
-<h4 id="window-types:">Window Types:</h4>
+<h4 id="window-types">Window Types:</h4>
 
 <p>The Samza high-level API currently supports tumbling and session 
windows.</p>
 
@@ -521,13 +522,13 @@
 
 <p>Examples:</p>
 
-<div class="highlight"><pre><code class="java"><span class="c1">// Group the 
pageView stream into 3 second tumbling windows keyed by the userId.</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// Group the pageView stream 
into 3 second tumbling windows keyed by the userId.</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">WindowPane</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Collection</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;&gt;&gt;</span> <span 
class="o">=</span>
                      <span class="n">pageViews</span><span 
class="o">.</span><span class="na">window</span><span class="o">(</span>
                          <span class="n">Windows</span><span 
class="o">.</span><span class="na">keyedTumblingWindow</span><span 
class="o">(</span><span class="n">pageView</span> <span class="o">-&gt;</span> 
<span class="n">pageView</span><span class="o">.</span><span 
class="na">getUserId</span><span class="o">(),</span> <span class="c1">// key 
extractor</span>
                            <span class="n">Duration</span><span 
class="o">.</span><span class="na">ofSeconds</span><span 
class="o">(</span><span class="mi">30</span><span class="o">),</span> <span 
class="c1">// window duration</span>
-                           <span class="k">new</span> <span 
class="nf">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">)));</span>
+                           <span class="k">new</span> <span 
class="n">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">)));</span>
 
 
 <span class="c1">// Compute the maximum value over tumbling windows of 3 
seconds.</span>
@@ -535,20 +536,20 @@
 <span class="n">Supplier</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">initialValue</span> <span class="o">=</span> <span 
class="o">()</span> <span class="o">-&gt;</span> <span 
class="n">Integer</span><span class="o">.</span><span 
class="na">MIN_VALUE</span><span class="o">;</span>
 <span class="n">FoldLeftFunction</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">aggregateFunction</span> <span class="o">=</span> <span 
class="o">(</span><span class="n">msg</span><span class="o">,</span> <span 
class="n">oldValue</span><span class="o">)</span> <span class="o">-&gt;</span> 
<span class="n">Math</span><span class="o">.</span><span 
class="na">max</span><span class="o">(</span><span class="n">msg</span><span 
class="o">,</span> <span class="n">oldValue</span><span class="o">);</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">WindowPane</span><span class="o">&lt;</span><span 
class="n">Void</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">windowedStream</span> <span class="o">=</span>
-         <span class="n">integers</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Windows</span><span class="o">.</span><span 
class="na">tumblingWindow</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">30</span><span class="o">),</span> <span 
class="n">initialValue</span><span class="o">,</span> <span 
class="n">aggregateFunction</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">IntegerSerde</span><span 
class="o">()));</span></code></pre></div>
+         <span class="n">integers</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Windows</span><span class="o">.</span><span 
class="na">tumblingWindow</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">30</span><span class="o">),</span> <span 
class="n">initialValue</span><span class="o">,</span> <span 
class="n">aggregateFunction</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">IntegerSerde</span><span 
class="o">()));</span></code></pre></figure>
 
 <p><strong>Session Window</strong>: A session window groups a MessageStream 
into sessions. A session captures a period of activity over a MessageStream and 
is defined by a gap. A session is closed and results are emitted if no new 
messages arrive for the window for the gap duration.</p>
 
 <p>Examples:</p>
 
-<div class="highlight"><pre><code class="java"><span class="c1">// Sessionize 
a stream of page views, and count the number of page-views in a session for 
every user.</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// Sessionize a stream of page 
views, and count the number of page-views in a session for every user.</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span class="err">…</span>
 <span class="n">Supplier</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">initialValue</span> <span class="o">=</span> <span 
class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span>
 <span class="n">FoldLeftFunction</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">countAggregator</span> <span class="o">=</span> <span 
class="o">(</span><span class="n">pageView</span><span class="o">,</span> <span 
class="n">oldCount</span><span class="o">)</span> <span class="o">-&gt;</span> 
<span class="n">oldCount</span> <span class="o">+</span> <span 
class="mi">1</span><span class="o">;</span>
 <span class="n">Duration</span> <span class="n">sessionGap</span> <span 
class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span 
class="na">ofMinutes</span><span class="o">(</span><span 
class="mi">3</span><span class="o">);</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">WindowPane</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">sessionCounts</span> <span class="o">=</span> <span 
class="n">pageViews</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Windows</span><span class="o">.</span><span 
class="na">keyedSessionWindow</span><span class="o">(</span>
     <span class="n">pageView</span> <span class="o">-&gt;</span> <span 
class="n">pageView</span><span class="o">.</span><span 
class="na">getUserId</span><span class="o">(),</span> <span 
class="n">sessionGap</span><span class="o">,</span> <span 
class="n">initialValue</span><span class="o">,</span> <span 
class="n">countAggregator</span><span class="o">,</span>
-        <span class="k">new</span> <span class="nf">StringSerde</span><span 
class="o">(),</span> <span class="k">new</span> <span 
class="n">IntegerSerde</span><span class="o">()));</span>
+        <span class="k">new</span> <span class="n">StringSerde</span><span 
class="o">(),</span> <span class="k">new</span> <span 
class="n">IntegerSerde</span><span class="o">()));</span>
 
 <span class="c1">// Compute the maximum value over tumbling windows of 3 
seconds.</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">integers</span> <span class="o">=</span> <span class="err">…</span>
@@ -557,7 +558,7 @@
 <span class="n">FoldLeftFunction</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">aggregateFunction</span> <span class="o">=</span> <span 
class="o">(</span><span class="n">msg</span><span class="o">,</span> <span 
class="n">oldValue</span><span class="o">)</span> <span class="o">-&gt;</span> 
<span class="n">Math</span><span class="o">.</span><span 
class="na">max</span><span class="o">(</span><span class="n">msg</span><span 
class="o">,</span> <span class="n">oldValue</span><span class="o">)</span>
 <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">WindowPane</span><span class="o">&lt;</span><span 
class="n">Void</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">windowedStream</span> <span class="o">=</span>
      <span class="n">integers</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Windows</span><span class="o">.</span><span 
class="na">tumblingWindow</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">3</span><span class="o">),</span> <span 
class="n">initialValue</span><span class="o">,</span> <span 
class="n">aggregateFunction</span><span class="o">,</span>
-         <span class="k">new</span> <span class="nf">IntegerSerde</span><span 
class="o">()))</span></code></pre></div>
+         <span class="k">new</span> <span class="n">IntegerSerde</span><span 
class="o">()))</span></code></pre></figure>
 
 <h3 id="table">Table</h3>
 
@@ -569,11 +570,11 @@
 <li>Join a stream with the table using the join() operator</li>
 </ol>
 
-<div class="highlight"><pre><code class="java"><span class="lineno">1</span> 
<span class="kd">final</span> <span class="n">StreamApplication</span> <span 
class="n">app</span> <span class="o">=</span> <span class="o">(</span><span 
class="n">streamGraph</span><span class="o">,</span> <span 
class="n">cfg</span><span class="o">)</span> <span class="o">-&gt;</span> <span 
class="o">{</span>
-<span class="lineno">2</span>   <span class="n">Table</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Profile</span><span class="o">&gt;&gt;</span> <span 
class="n">table</span> <span class="o">=</span> <span 
class="n">streamGraph</span><span class="o">.</span><span 
class="na">getTable</span><span class="o">(</span><span class="k">new</span> 
<span class="n">InMemoryTableDescriptor</span><span class="o">(</span><span 
class="s">&quot;t1&quot;</span><span class="o">)</span>
-<span class="lineno">3</span>       <span class="o">.</span><span 
class="na">withSerde</span><span class="o">(</span><span 
class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">IntegerSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="n">ProfileJsonSerde</span><span 
class="o">())));</span>
-<span class="lineno">4</span>   <span class="o">...</span>
-<span class="lineno">5</span> <span class="o">};</span></code></pre></div>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="lineno">1 </span><span 
class="kd">final</span> <span class="n">StreamApplication</span> <span 
class="n">app</span> <span class="o">=</span> <span class="o">(</span><span 
class="n">streamGraph</span><span class="o">,</span> <span 
class="n">cfg</span><span class="o">)</span> <span class="o">-&gt;</span> <span 
class="o">{</span>
+<span class="lineno">2 </span>  <span class="n">Table</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Profile</span><span class="o">&gt;&gt;</span> <span 
class="n">table</span> <span class="o">=</span> <span 
class="n">streamGraph</span><span class="o">.</span><span 
class="na">getTable</span><span class="o">(</span><span class="k">new</span> 
<span class="n">InMemoryTableDescriptor</span><span class="o">(</span><span 
class="s">&quot;t1&quot;</span><span class="o">)</span>
+<span class="lineno">3 </span>      <span class="o">.</span><span 
class="na">withSerde</span><span class="o">(</span><span 
class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">IntegerSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="n">ProfileJsonSerde</span><span 
class="o">())));</span>
+<span class="lineno">4 </span>  <span class="o">...</span>
+<span class="lineno">5 </span><span class="o">};</span></code></pre></figure>
 
 <p>Example above creates a TableDescriptor object, which contains all 
information about a table. The currently supported table types are <a 
href="https://github.com/apache/samza/blob/master/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java";>InMemoryTableDescriptor</a>
 and <a 
href="https://github.com/apache/samza/blob/master/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/rocksdb/RocksDbTableDescriptor.java";>RocksDbTableDescriptor</a>.
 Notice the type of records in a table is KV, and <a 
href="https://samza.apache.org/learn/documentation/latest/container/serialization.html";>Serdes</a>
 for both key and value of records needs to be defined (line 4). Additional 
parameters can be added based on individual table types. </p>
 
@@ -671,20 +672,20 @@ There is one leader processor which gene
 
 <p>To use ZooKeeper-based coordination, the following configs are required:</p>
 
-<div class="highlight"><pre><code class="jproperties"><span 
class="na">job.coordinator.factory</span><span class="o">=</span><span 
class="s">org.apache.samza.zk.ZkJobCoordinatorFactory</span>
+<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">job.coordinator.factory</span><span class="o">=</span><span 
class="s">org.apache.samza.zk.ZkJobCoordinatorFactory</span>
 <span class="na">job.coordinator.zk.connect</span><span 
class="o">=</span><span class="s">yourzkconnection</span>
-<span class="na">task.name.grouper.factory</span><span class="o">=</span><span 
class="s">org.apache.samza.container.grouper.task.GroupByContainerIdsFactory</span></code></pre></div>
+<span class="na">task.name.grouper.factory</span><span class="o">=</span><span 
class="s">org.apache.samza.container.grouper.task.GroupByContainerIdsFactory</span></code></pre></figure>
 
 <p>To use external coordination, the following configs are needed:</p>
 
-<div class="highlight"><pre><code class="properties"><span 
class="na">job.coordinator.factory</span><span class="o">=</span><span 
class="s">org.apache.samza.standalone.PassthroughJobCoordinatorFactory</span>
-<span class="na">task.name.grouper.factory</span><span class="o">=</span><span 
class="s">org.apache.samza.container.grouper.task.GroupByContainerIdsFactory</span></code></pre></div>
+<figure class="highlight"><pre><code class="language-properties" 
data-lang="properties"><span></span><span 
class="na">job.coordinator.factory</span><span class="o">=</span><span 
class="s">org.apache.samza.standalone.PassthroughJobCoordinatorFactory</span>
+<span class="na">task.name.grouper.factory</span><span class="o">=</span><span 
class="s">org.apache.samza.container.grouper.task.GroupByContainerIdsFactory</span></code></pre></figure>
 
 <h4 id="api">API</h4>
 
 <p>As mentioned in the <a href="#architecture">architecture</a> section above, 
you use the LocalApplicationRunner to launch your processors from your 
application code, like this:</p>
 
-<div class="highlight"><pre><code class="java"><span class="kd">public</span> 
<span class="kd">class</span> <span 
class="nc">WikipediaZkLocalApplication</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kd">public</span> <span 
class="kd">class</span> <span class="nc">WikipediaZkLocalApplication</span> 
<span class="o">{</span>
 
  <span class="kd">public</span> <span class="kd">static</span> <span 
class="kt">void</span> <span class="nf">main</span><span 
class="o">(</span><span class="n">String</span><span class="o">[]</span> <span 
class="n">args</span><span class="o">)</span> <span class="o">{</span>
    <span class="n">CommandLine</span> <span class="n">cmdLine</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">CommandLine</span><span class="o">();</span>
@@ -697,7 +698,7 @@ There is one leader processor which gene
    <span class="n">runner</span><span class="o">.</span><span 
class="na">run</span><span class="o">(</span><span class="n">app</span><span 
class="o">);</span>
    <span class="n">runner</span><span class="o">.</span><span 
class="na">waitForFinish</span><span class="o">();</span>
  <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
+<span class="o">}</span></code></pre></figure>
 
 <p>In the code above, <code>WikipediaApplication</code> is an application 
written with the <a href="#high-level-api">high level API</a>.</p>
 

Modified: samza/site/startup/releases/latest/release-notes.html
URL: 
http://svn.apache.org/viewvc/samza/site/startup/releases/latest/release-notes.html?rev=1832281&r1=1832280&r2=1832281&view=diff
==============================================================================
--- samza/site/startup/releases/latest/release-notes.html (original)
+++ samza/site/startup/releases/latest/release-notes.html Fri May 25 22:28:38 
2018
@@ -44,9 +44,7 @@
                 <!-- this icon only shows in versioned pages -->
                 
                   
-                    
-                  
-                  <a 
href="http://samza.apache.org/startup/releases/0.14/release-notes.html";><i 
id="switch-version-button"></i></a>
+                  <a 
href="http://samza.apache.org/startup/releases/0.14/release-notes";><i 
id="switch-version-button"></i></a>
                    <!-- links for the navigation bar -->
                 
 
@@ -225,7 +223,7 @@ Read more about it in the <a href="/lear
   
     <script>
       $( document ).ready(function() {
-        if ( $.fn.urlExists( "/startup/releases/0.14/release-notes.html" ) ) {
+        if ( $.fn.urlExists( "/startup/releases/0.14/release-notes" ) ) {
           $("#switch-version-button").addClass("fa fa-history masthead-icon");
         }
       });


Reply via email to