Modified: samza/site/learn/documentation/latest/api/test-framework.html
URL: 
http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/api/test-framework.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/api/test-framework.html (original)
+++ samza/site/learn/documentation/latest/api/test-framework.html Wed Jan 18 
19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a 
href="/learn/documentation/1.8.0/api/test-framework">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/1.7.0/api/test-framework">1.7.0</a></li>
+
+              
+
               <li class="hide"><a 
href="/learn/documentation/1.6.0/api/test-framework">1.6.0</a></li>
 
               
@@ -639,18 +653,17 @@
    limitations under the License.
 -->
 
-<h1 id="what-is-samzas-integration-test-framework">What is Samza&rsquo;s 
Integration Test Framework ?</h1>
+<h1 id="what-is-samzas-integration-test-framework-">What is Samza’s 
Integration Test Framework ?</h1>
 
 <ul>
-<li>  Samza provides an Integration framework which allows you to test 
applications by quickly running them against a few messages and asserting on 
expected results. This alleviates the need to set up dependencies like Kafka, 
Yarn, Zookeeper to test your Samza applications</li>
-<li>  Integration Framework can test the new StreamDSL (StreamApplication) and 
Task APIs (TaskApplication) as well as supports testing for legacy low level 
(StreamTask and AsyncStreamTask) samza jobs</li>
+  <li>Samza provides an Integration framework which allows you to test 
applications by quickly running them against a few messages and asserting on 
expected results. This alleviates the need to set up dependencies like Kafka, 
Yarn, Zookeeper to test your Samza applications</li>
+  <li>Integration Framework can test the new StreamDSL (StreamApplication) and 
Task APIs (TaskApplication) as well as supports testing for legacy low level 
(StreamTask and AsyncStreamTask) samza jobs</li>
 </ul>
 
 <h1 id="some-prerequisite-information">Some Prerequisite Information</h1>
-
 <ol>
-<li> Your Samza job will be executed in single container mode and framework 
will set all the required configs for you to run your job (more on configs 
later)</li>
-<li> Your Samza job will read from a special kind of bounded streams 
introduced in the next section, containing finite number of messages to make 
testing feasible.</li>
+  <li>Your Samza job will be executed in single container mode and framework 
will set all the required configs for you to run your job (more on configs 
later)</li>
+  <li>Your Samza job will read from a special kind of bounded streams 
introduced in the next section, containing finite number of messages to make 
testing feasible.</li>
 </ol>
 
 <h1 id="key-concepts">Key Concepts</h1>
@@ -658,37 +671,36 @@
 <h2 id="introduction-to-in-memory-system-and-streams">Introduction to In 
Memory System and Streams</h2>
 
 <ol>
-<li> With Samza 1.0 we now get the feature of using streams that are 
maintained in memory using an in memory system.</li>
-<li> These in memory streams are described by InMemoryInputDescriptor, 
InMemoryOutputDescriptor and the corresponding system is described by 
InMemorySystemDescriptors</li>
-<li> These streams are like Kafka streams but there lifecycle is maintained in 
memory which means they get initialized with your job, are available throughout 
its run and are destroyed after the test ends . </li>
+  <li>With Samza 1.0 we now get the feature of using streams that are 
maintained in memory using an in memory system.</li>
+  <li>These in memory streams are described by InMemoryInputDescriptor, 
InMemoryOutputDescriptor and the corresponding system is described by 
InMemorySystemDescriptors</li>
+  <li>These streams are like Kafka streams but there lifecycle is maintained 
in memory which means they get initialized with your job, are available 
throughout its run and are destroyed after the test ends .</li>
 </ol>
 
 <h2 id="introduction-to-testrunner-api">Introduction to TestRunner api</h2>
-
 <ol>
-<li> Samza 1.0 introduces a new TestRunner api to set up a test for Samza job, 
add configs, configure input/output streams, run the job in testing mode</li>
-<li> TestRunner also provides utilities to consume contents of a stream once 
the test has ran successfully</li>
-<li> TestRunner does basic config setup for you by default, you have 
flexibility to change these default configs if required</li>
-<li> TestRunner supports stateless and stateful job testing. TestRunner works 
with InMemoryTables and RocksDB Tables </li>
+  <li>Samza 1.0 introduces a new TestRunner api to set up a test for Samza 
job, add configs, configure input/output streams, run the job in testing 
mode</li>
+  <li>TestRunner also provides utilities to consume contents of a stream once 
the test has ran successfully</li>
+  <li>TestRunner does basic config setup for you by default, you have 
flexibility to change these default configs if required</li>
+  <li>TestRunner supports stateless and stateful job testing. TestRunner works 
with InMemoryTables and RocksDB Tables</li>
 </ol>
 
 <h2 id="how-to-write-test">How To Write Test</h2>
 
 <p>For example, here is a StreamApplication that validates and decorates page 
views with viewer’s profile information.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
-    <span class="kd">class</span> <span 
class="nc">BadPageViewFilterApplication</span> <span 
class="kd">implements</span> <span class="n">StreamApplication</span> <span 
class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 
+    <span class="kd">class</span> <span 
class="nc">BadPageViewFilterApplication</span> <span 
class="kd">implements</span> <span class="nc">StreamApplication</span> <span 
class="o">{</span>
         <span class="nd">@Override</span>
-        <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">describe</span><span class="o">(</span><span 
class="n">StreamApplicationDescriptor</span> <span 
class="n">appDesc</span><span class="o">)</span> <span class="o">{</span> <span 
class="err">…</span> <span class="o">}</span>
+        <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">describe</span><span class="o">(</span><span 
class="nc">StreamApplicationDescriptor</span> <span 
class="n">appDesc</span><span class="o">)</span> <span class="o">{</span> <span 
class="err">…</span> <span class="o">}</span>
     <span class="o">}</span>
     
-    <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">BadPageViewFilter</span> <span class="kd">implements</span> <span 
class="n">StreamApplication</span> <span class="o">{</span>
+    <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">BadPageViewFilter</span> <span class="kd">implements</span> <span 
class="nc">StreamApplication</span> <span class="o">{</span>
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">describe</span><span class="o">(</span><span 
class="n">StreamApplicationDescriptor</span> <span 
class="n">appDesc</span><span class="o">)</span> <span class="o">{</span>
-        <span class="n">KafkaSystemDescriptor</span> <span 
class="n">kafka</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;test&quot;</span><span class="o">);</span>
-        <span class="n">InputDescriptor</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViewInput</span> <span class="o">=</span> <span 
class="n">kafka</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">));</span>
-        <span class="n">OutputDescriptor</span><span 
class="o">&lt;</span><span class="n">DecoratedPageView</span><span 
class="o">&gt;</span> <span class="n">outputPageViews</span> <span 
class="o">=</span> <span class="n">kafka</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span> <span 
class="err">“</span><span class="n">decorated</span><span 
class="o">-</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">DecoratedPageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">));</span>    
-        <span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span 
class="n">appDesc</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="n">pageViewInput</span><span class="o">);</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">describe</span><span class="o">(</span><span 
class="nc">StreamApplicationDescriptor</span> <span 
class="n">appDesc</span><span class="o">)</span> <span class="o">{</span>
+        <span class="nc">KafkaSystemDescriptor</span> <span 
class="n">kafka</span> <span class="o">=</span> <span class="k">new</span> 
<span class="nc">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="s">"test"</span><span class="o">);</span>
+        <span class="nc">InputDescriptor</span><span 
class="o">&lt;</span><span class="nc">PageView</span><span 
class="o">&gt;</span> <span class="n">pageViewInput</span> <span 
class="o">=</span> <span class="n">kafka</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">));</span>
+        <span class="nc">OutputDescriptor</span><span 
class="o">&lt;</span><span class="nc">DecoratedPageView</span><span 
class="o">&gt;</span> <span class="n">outputPageViews</span> <span 
class="o">=</span> <span class="n">kafka</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span> <span 
class="err">“</span><span class="n">decorated</span><span 
class="o">-</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">DecoratedPageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">));</span>    
+        <span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span 
class="n">appDesc</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="n">pageViewInput</span><span class="o">);</span>
         <span class="n">pageViews</span><span class="o">.</span><span 
class="na">filter</span><span class="o">(</span><span 
class="k">this</span><span class="o">::</span><span 
class="n">isValidPageView</span><span class="o">)</span>
             <span class="o">.</span><span class="na">map</span><span 
class="o">(</span><span class="k">this</span><span class="o">::</span><span 
class="n">addProfileInformation</span><span class="o">)</span>
             <span class="o">.</span><span class="na">sendTo</span><span 
class="o">(</span><span class="n">appDesc</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="n">outputPageViews</span><span class="o">));</span>
@@ -699,141 +711,138 @@
 <p>There are 4 simple steps to write a test for your stream processing logic 
and assert on the output</p>
 
 <h2 id="step-1-construct-an-inmemorysystem">Step 1: Construct an 
InMemorySystem</h2>
+<p>In the example we are writing we use a Kafka system called “test”, so 
we will configure an equivalent in memory system (name should be the same as 
used in job) as shown below:</p>
 
-<p>In the example we are writing we use a Kafka system called 
&ldquo;test&rdquo;, so we will configure an equivalent in memory system (name 
should be the same as used in job) as shown below:   </p>
-
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
-    <span class="n">InMemorySystemDescriptor</span> <span 
class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">InMemorySystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;test&quot;</span><span class="o">);</span></code></pre></figure>
-
-<h2 id="step-2-initialize-your-input-and-output-streams">Step 2:  Initialize 
your input and output streams</h2>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 
+    <span class="nc">InMemorySystemDescriptor</span> <span 
class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> 
<span class="nc">InMemorySystemDescriptor</span><span class="o">(</span><span 
class="s">"test"</span><span class="o">);</span></code></pre></figure>
 
+<h2 id="step-2--initialize-your-input-and-output-streams">Step 2:  Initialize 
your input and output streams</h2>
 <ol>
-<li> TestRunner API uses a special kind of input and output streams called in 
memory streams which are easy to define and write assertions on.</li>
-<li> Data in these streams are maintained in memory hence they always use a 
NoOpSerde&lt;&gt;</li>
-<li> You need to configure all the stream that your job reads/writes to. </li>
-<li> You can obtain handle of these streams from the system we initialized in 
previous step</li>
-<li> We have two choices when we configure a stream type </li>
+  <li>TestRunner API uses a special kind of input and output streams called in 
memory streams which are easy to define and write assertions on.</li>
+  <li>Data in these streams are maintained in memory hence they always use a 
NoOpSerde&lt;&gt;</li>
+  <li>You need to configure all the stream that your job reads/writes to.</li>
+  <li>You can obtain handle of these streams from the system we initialized in 
previous step</li>
+  <li>We have two choices when we configure a stream type</li>
 </ol>
 
 <p>Input Stream described by InMemoryInputDescriptor, these streams need to be 
initialized with messages (data), since your job reads this.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>     
-     <span class="n">InMemoryInputDescriptor</span><span 
class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> 
<span class="n">pageViewInput</span> <span class="o">=</span> <span 
class="n">inMemory</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;());</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
  
+     <span class="nc">InMemoryInputDescriptor</span><span 
class="o">&lt;</span><span class="nc">PageView</span><span 
class="o">&gt;</span> <span class="n">pageViewInput</span> <span 
class="o">=</span> <span class="n">inMemory</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="nc">NoOpSerde</span><span 
class="o">&lt;&gt;());</span></code></pre></figure>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span class="na">    INFO</span><span 
class="o">:</span> <span class="s">Use the org.apache.samza.operators.KV as the 
message type ex: InMemoryInputDescriptor&lt;KV&lt;String,PageView&gt;&gt; as 
the message type</span>
-    <span class="err">to</span> <span class="err">use</span> <span 
class="err">key</span> <span class="err">of</span> <span class="err">the</span> 
<span class="err">KV</span> <span class="err">(String</span> <span 
class="err">here)</span> <span class="err">as</span> <span 
class="err">key</span> <span class="err">and</span> <span 
class="err">value</span> <span class="err">as</span> <span 
class="err">message</span> <span class="err">(PageView</span> <span 
class="err">here)</span> <span class="err">for</span> <span 
class="err">the</span> <span class="err">IncomingMessageEnvelope</span> <span 
class="err">in</span> <span class="err">samza</span> <span 
class="err">job,</span> <span class="err">using</span> <span 
class="err">all</span> <span class="err">the</span> <span 
class="err">other</span> <span class="err">data</span> <span 
class="err">types</span> <span class="err">will</span> <span 
class="err">result</span> <span class="err">in</span> <span 
class="err">key</span> <span class="err">
 of</span> <span class="err">the</span> <span class="err">the</span> <span 
class="err">IncomingMessageEnvelope</span> <span class="err">set</span> <span 
class="err">to</span> <span class="err">null</span> </code></pre></figure>
+<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties">    INFO: Use the org.apache.samza.operators.KV as the 
message type ex: InMemoryInputDescriptor&lt;KV&lt;String,PageView&gt;&gt; as 
the message type
+    to use key of the KV (String here) as key and value as message (PageView 
here) for the IncomingMessageEnvelope in samza job, using all the other data 
types will result in key of the the IncomingMessageEnvelope set to null 
</code></pre></figure>
 
 <p>Output Stream described by InMemoryOutputDescriptor, these streams need to 
be initialized with with a partition count and are empty since your job writes 
to these streams</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span 
class="n">InMemoryOutputDescriptor</span><span class="o">&lt;</span><span 
class="n">DecoratedPageView</span><span class="o">&gt;</span> <span 
class="n">outputPageViews</span> <span class="o">=</span> <span 
class="n">inMemory</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">&quot;decorated-page-views&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;())</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">InMemoryOutputDescriptor</span><span 
class="o">&lt;</span><span class="nc">DecoratedPageView</span><span 
class="o">&gt;</span> <span class="n">outputPageViews</span> <span 
class="o">=</span> <span class="n">inMemory</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">"decorated-page-views"</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">NoOpSerde</span><span 
class="o">&lt;&gt;())</span></code></pre></figure>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span class="na">    Note</span><span 
class="o">:</span> <span class="s">Input streams are immutable - ie., once they 
have been created you can&#39;t modify their contents eg: by adding new 
messages&quot;All input streams are supposed to be 
bounded</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties">    Note: Input streams are immutable - ie., once they 
have been created you can't modify their contents eg: by adding new 
messages"All input streams are supposed to be bounded</code></pre></figure>
 
 <h2 id="step-3-create-a-testrunner">Step 3: Create a TestRunner</h2>
 
 <ol>
-<li> Initialize a TestRunner of your Samza job</li>
-<li> Configure TestRunner with input streams and mock data to it </li>
-<li> Configure TestRunner with output streams with a partition count</li>
-<li> Add any configs if necessary</li>
-<li> Run the test runner</li>
+  <li>Initialize a TestRunner of your Samza job</li>
+  <li>Configure TestRunner with input streams and mock data to it</li>
+  <li>Configure TestRunner with output streams with a partition count</li>
+  <li>Add any configs if necessary</li>
+  <li>Run the test runner</li>
 </ol>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">List</span><span 
class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> 
<span class="n">pageViews</span> <span class="o">=</span> <span 
class="n">generateData</span><span class="o">(...);</span>
-    <span class="n">TestRunner</span>
-       <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">BadPageViewFilterApplication</span><span class="o">())</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">List</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span 
class="n">generateData</span><span class="o">(...);</span>
+    <span class="nc">TestRunner</span>
+       <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="nc">BadPageViewFilterApplication</span><span class="o">())</span>
        <span class="o">.</span><span class="na">addInputStream</span><span 
class="o">(</span><span class="n">pageViewInput</span><span class="o">,</span> 
<span class="n">pageViews</span><span class="o">)</span>
        <span class="o">.</span><span class="na">addOutputStream</span><span 
class="o">(</span><span class="n">outputPageViews</span><span 
class="o">,</span> <span class="mi">10</span><span class="o">)</span>
-       <span class="o">.</span><span class="na">run</span><span 
class="o">(</span><span class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1500</span><span class="o">));</span></code></pre></figure>
+       <span class="o">.</span><span class="na">run</span><span 
class="o">(</span><span class="nc">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1500</span><span class="o">));</span></code></pre></figure>
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span class="na">    Info</span><span 
class="o">:</span> <span class="s">Use addConfig(Map&lt;String, String&gt; 
configs) or addConfig(String key, String value) to add/modify any config in the 
TestRunner</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties">    Info: Use addConfig(Map&lt;String, String&gt; 
configs) or addConfig(String key, String value) to add/modify any config in the 
TestRunner</code></pre></figure>
 
 <h2 id="step-4-assert-on-the-output-stream">Step-4: Assert on the output 
stream</h2>
 
 <p>You have the following choices for asserting the results of your tests</p>
 
 <ol>
-<li>You can use StreamAssert utils on your In Memory Streams to do consumption 
of all partitions</li>
+  <li>You can use StreamAssert utils on your In Memory Streams to do 
consumption of all partitions</li>
 </ol>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 
     <span class="c1">// Consume multi-paritioned stream, key of the map 
represents partitionId</span>
-    <span class="n">Map</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">expOutput</span><span class="o">;</span>
-    <span class="n">StreamAssert</span><span class="o">.</span><span 
class="na">containsInOrder</span><span class="o">(</span><span 
class="n">outputPageViews</span><span class="o">,</span> <span 
class="n">expectedOutput</span><span class="o">,</span> <span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span>
+    <span class="nc">Map</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">,</span> <span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">expOutput</span><span class="o">;</span>
+    <span class="nc">StreamAssert</span><span class="o">.</span><span 
class="na">containsInOrder</span><span class="o">(</span><span 
class="n">outputPageViews</span><span class="o">,</span> <span 
class="n">expectedOutput</span><span class="o">,</span> <span 
class="nc">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span>
     <span class="c1">// Consume single paritioned stream</span>
-    <span class="n">StreamAssert</span><span class="o">.</span><span 
class="na">containsInOrder</span><span class="o">(</span><span 
class="n">outputPageViews</span><span class="o">,</span> <span 
class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(...),</span> <span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span></code></pre></figure>
+    <span class="nc">StreamAssert</span><span class="o">.</span><span 
class="na">containsInOrder</span><span class="o">(</span><span 
class="n">outputPageViews</span><span class="o">,</span> <span 
class="nc">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(...),</span> <span 
class="nc">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span></code></pre></figure>
 
 <ol>
-<li>You have the flexibility to define your custom assertions using API 
TestRunner.consumeStream() to assert on any partitions of the stream</li>
+  <li>You have the flexibility to define your custom assertions using API 
TestRunner.consumeStream() to assert on any partitions of the stream</li>
 </ol>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">Assert</span><span 
class="o">.</span><span class="na">assertEquals</span><span class="o">(</span>
-        <span class="n">TestRunner</span><span class="o">.</span><span 
class="na">consumeStream</span><span class="o">(</span><span 
class="n">outputPageViews</span><span class="o">,</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">)).</span><span 
class="na">get</span><span class="o">(</span><span class="mi">0</span><span 
class="o">).</span><span class="na">size</span><span class="o">(),</span><span 
class="mi">1</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">Assert</span><span class="o">.</span><span 
class="na">assertEquals</span><span class="o">(</span>
+        <span class="nc">TestRunner</span><span class="o">.</span><span 
class="na">consumeStream</span><span class="o">(</span><span 
class="n">outputPageViews</span><span class="o">,</span><span 
class="nc">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">)).</span><span 
class="na">get</span><span class="o">(</span><span class="mi">0</span><span 
class="o">).</span><span class="na">size</span><span class="o">(),</span><span 
class="mi">1</span>
        <span class="o">);</span></code></pre></figure>
 
 <p>Complete Glance at the code</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="nd">@Test</span>
-    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">testStreamDSLApi</span><span class="o">()</span> <span 
class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nd">@Test</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">testStreamDSLApi</span><span class="o">()</span> <span 
class="kd">throws</span> <span class="nc">Exception</span> <span 
class="o">{</span>
      <span class="c1">// Generate Mock Data</span>
-     <span class="n">List</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span 
class="n">genrateMockInput</span><span class="o">(...);</span>
-     <span class="n">List</span><span class="o">&lt;</span><span 
class="n">DecoratedPageView</span><span class="o">&gt;</span> <span 
class="n">expectedOutput</span> <span class="o">=</span> <span 
class="n">genrateMockOutput</span><span class="o">(...);</span>
+     <span class="nc">List</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span 
class="n">genrateMockInput</span><span class="o">(...);</span>
+     <span class="nc">List</span><span class="o">&lt;</span><span 
class="nc">DecoratedPageView</span><span class="o">&gt;</span> <span 
class="n">expectedOutput</span> <span class="o">=</span> <span 
class="n">genrateMockOutput</span><span class="o">(...);</span>
     
      <span class="c1">// Configure System and Stream Descriptors</span>
-     <span class="n">InMemorySystemDescriptor</span> <span 
class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">InMemorySystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;test&quot;</span><span class="o">);</span>
-     <span class="n">InMemoryInputDescriptor</span><span 
class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> 
<span class="n">pageViewInput</span> <span class="o">=</span> <span 
class="n">inMemory</span>
-        <span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;());</span>
-     <span class="n">InMemoryOutputDescriptor</span><span 
class="o">&lt;</span><span class="n">DecoratedPageView</span><span 
class="o">&gt;</span> <span class="n">outputPageView</span> <span 
class="o">=</span> <span class="n">inMemory</span>
-        <span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">decorated</span><span 
class="o">-</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;())</span>
+     <span class="nc">InMemorySystemDescriptor</span> <span 
class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> 
<span class="nc">InMemorySystemDescriptor</span><span class="o">(</span><span 
class="s">"test"</span><span class="o">);</span>
+     <span class="nc">InMemoryInputDescriptor</span><span 
class="o">&lt;</span><span class="nc">PageView</span><span 
class="o">&gt;</span> <span class="n">pageViewInput</span> <span 
class="o">=</span> <span class="n">inMemory</span>
+        <span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="nc">NoOpSerde</span><span 
class="o">&lt;&gt;());</span>
+     <span class="nc">InMemoryOutputDescriptor</span><span 
class="o">&lt;</span><span class="nc">DecoratedPageView</span><span 
class="o">&gt;</span> <span class="n">outputPageView</span> <span 
class="o">=</span> <span class="n">inMemory</span>
+        <span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">decorated</span><span 
class="o">-</span><span class="n">page</span><span class="o">-</span><span 
class="n">views</span><span class="err">”</span><span class="o">,</span> 
<span class="k">new</span> <span class="nc">NoOpSerde</span><span 
class="o">&lt;&gt;())</span>
      
      <span class="c1">// Configure the TestRunner </span>
-     <span class="n">TestRunner</span>
-         <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">BadPageViewFilterApplication</span><span class="o">())</span>
+     <span class="nc">TestRunner</span>
+         <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="nc">BadPageViewFilterApplication</span><span class="o">())</span>
          <span class="o">.</span><span class="na">addInputStream</span><span 
class="o">(</span><span class="n">pageViewInput</span><span class="o">,</span> 
<span class="n">pageViews</span><span class="o">)</span>
          <span class="o">.</span><span class="na">addOutputStream</span><span 
class="o">(</span><span class="n">outputPageView</span><span class="o">,</span> 
<span class="mi">10</span><span class="o">)</span>
-         <span class="o">.</span><span class="na">run</span><span 
class="o">(</span><span class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1500</span><span class="o">));</span>
+         <span class="o">.</span><span class="na">run</span><span 
class="o">(</span><span class="nc">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1500</span><span class="o">));</span>
     
      <span class="c1">// Assert the results</span>
-     <span class="n">StreamAssert</span><span class="o">.</span><span 
class="na">containsInOrder</span><span class="o">(</span><span 
class="n">expectedOutput</span><span class="o">,</span> <span 
class="n">outputPageView</span><span class="o">,</span> <span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span>
+     <span class="nc">StreamAssert</span><span class="o">.</span><span 
class="na">containsInOrder</span><span class="o">(</span><span 
class="n">expectedOutput</span><span class="o">,</span> <span 
class="n">outputPageView</span><span class="o">,</span> <span 
class="nc">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span>
     <span class="o">}</span></code></pre></figure>
- 
 
 <h3 id="example-for-low-level-api">Example for Low Level Api:</h3>
 
 <p>For a Low Level Task API</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="kd">public</span> <span 
class="kd">class</span> <span class="nc">BadPageViewFilter</span> <span 
class="kd">implements</span> <span class="n">TaskApplication</span> <span 
class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">BadPageViewFilter</span> <span class="kd">implements</span> <span 
class="nc">TaskApplication</span> <span class="o">{</span>
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">describe</span><span class="o">(</span><span 
class="n">TaskApplicationDescriptor</span> <span class="n">appDesc</span><span 
class="o">)</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">describe</span><span class="o">(</span><span 
class="nc">TaskApplicationDescriptor</span> <span class="n">appDesc</span><span 
class="o">)</span> <span class="o">{</span>
         <span class="c1">// Add input, output streams and tables</span>
-        <span class="n">KafkaSystemDescriptor</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">PageViewEvent</span><span class="o">&gt;</span> <span 
class="n">kafkaSystem</span> <span class="o">=</span> 
-            <span class="k">new</span> <span 
class="n">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">kafka</span><span 
class="err">”</span><span class="o">)</span>
+        <span class="nc">KafkaSystemDescriptor</span><span 
class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> 
<span class="nc">PageViewEvent</span><span class="o">&gt;</span> <span 
class="n">kafkaSystem</span> <span class="o">=</span> 
+            <span class="k">new</span> <span 
class="nf">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">kafka</span><span 
class="err">”</span><span class="o">)</span>
               <span class="o">.</span><span 
class="na">withConsumerZkConnect</span><span class="o">(</span><span 
class="n">myZkServers</span><span class="o">)</span>
               <span class="o">.</span><span 
class="na">withProducerBootstrapServers</span><span class="o">(</span><span 
class="n">myBrokers</span><span class="o">);</span>
-        <span class="n">KVSerde</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">PageViewEvent</span><span class="o">&gt;</span> <span 
class="n">serde</span> <span class="o">=</span> 
-            <span class="n">KVSerde</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="n">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;</span><span class="n">PageViewEvent</span><span 
class="o">&gt;());</span>
+        <span class="nc">KVSerde</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">PageViewEvent</span><span class="o">&gt;</span> <span 
class="n">serde</span> <span class="o">=</span> 
+            <span class="nc">KVSerde</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="nc">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;</span><span class="nc">PageViewEvent</span><span 
class="o">&gt;());</span>
         <span class="c1">// Add input, output streams and tables</span>
         <span class="n">appDesc</span><span class="o">.</span><span 
class="na">withInputStream</span><span class="o">(</span><span 
class="n">kafkaSystem</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">pageViewEvent</span><span 
class="err">”</span><span class="o">,</span> <span 
class="n">serde</span><span class="o">))</span>
             <span class="o">.</span><span 
class="na">withOutputStream</span><span class="o">(</span><span 
class="n">kafkaSystem</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="err">“</span><span class="n">goodPageViewEvent</span><span 
class="err">”</span><span class="o">,</span> <span 
class="n">serde</span><span class="o">))</span>
-            <span class="o">.</span><span class="na">withTable</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">RocksDBTableDescriptor</span><span class="o">(</span>
-                <span class="err">“</span><span 
class="n">badPageUrlTable</span><span class="err">”</span><span 
class="o">,</span> <span class="n">KVSerde</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="n">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="n">IntegerSerde</span><span 
class="o">())</span>
-            <span class="o">.</span><span 
class="na">withTaskFactory</span><span class="o">(</span><span 
class="k">new</span> <span class="n">BadPageViewTaskFactory</span><span 
class="o">());</span>
+            <span class="o">.</span><span class="na">withTable</span><span 
class="o">(</span><span class="k">new</span> <span 
class="nc">RocksDBTableDescriptor</span><span class="o">(</span>
+                <span class="err">“</span><span 
class="n">badPageUrlTable</span><span class="err">”</span><span 
class="o">,</span> <span class="nc">KVSerde</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="nc">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="nc">IntegerSerde</span><span 
class="o">())</span>
+            <span class="o">.</span><span 
class="na">withTaskFactory</span><span class="o">(</span><span 
class="k">new</span> <span class="nc">BadPageViewTaskFactory</span><span 
class="o">());</span>
       <span class="o">}</span>
     <span class="o">}</span>
     
-    <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">BadPageViewTaskFactory</span> <span class="kd">implements</span> 
<span class="n">StreamTaskFactory</span> <span class="o">{</span>
+    <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">BadPageViewTaskFactory</span> <span class="kd">implements</span> 
<span class="nc">StreamTaskFactory</span> <span class="o">{</span>
       <span class="nd">@Override</span>
-      <span class="kd">public</span> <span class="n">StreamTask</span> <span 
class="nf">createInstance</span><span class="o">()</span> <span 
class="o">{</span>
+      <span class="kd">public</span> <span class="nc">StreamTask</span> <span 
class="nf">createInstance</span><span class="o">()</span> <span 
class="o">{</span>
         <span class="c1">// Add input, output streams and tables</span>
-        <span class="k">return</span> <span class="k">new</span> <span 
class="n">BadPageViewFilterTask</span><span class="o">();</span>
+        <span class="k">return</span> <span class="k">new</span> <span 
class="nf">BadPageViewFilterTask</span><span class="o">();</span>
       <span class="o">}</span>
     <span class="o">}</span>
     
-     <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">BadPageViewFilterTask</span> <span class="kd">implements</span> 
<span class="n">StreamTask</span> <span class="o">{</span>
+     <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">BadPageViewFilterTask</span> <span class="kd">implements</span> 
<span class="nc">StreamTask</span> <span class="o">{</span>
        <span class="nd">@Override</span>
-       <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">process</span><span class="o">(</span><span 
class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span 
class="o">,</span>
-                           <span class="n">MessageCollector</span> <span 
class="n">collector</span><span class="o">,</span>
-                           <span class="n">TaskCoordinator</span> <span 
class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
+       <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">process</span><span class="o">(</span><span 
class="nc">IncomingMessageEnvelope</span> <span class="n">envelope</span><span 
class="o">,</span>
+                           <span class="nc">MessageCollector</span> <span 
class="n">collector</span><span class="o">,</span>
+                           <span class="nc">TaskCoordinator</span> <span 
class="n">coordinator</span><span class="o">)</span> <span class="o">{</span>
          <span class="c1">// process message synchronously</span>
         <span class="o">}</span>
      <span class="o">}</span>   
@@ -841,77 +850,77 @@
      
      <span class="nd">@Test</span>
      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">testBadPageViewFilterTaskApplication</span><span class="o">()</span> 
<span class="o">{</span>
-       <span class="n">List</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">badPageViews</span> <span class="o">=</span> <span 
class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span 
class="n">generatePageViews</span><span class="o">(..));</span>
-       <span class="n">List</span><span class="o">&lt;</span><span 
class="n">Profile</span><span class="o">&gt;</span> <span 
class="n">expectedGoodPageViews</span> <span class="o">=</span> <span 
class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span 
class="n">generatePageViews</span><span class="o">(..));</span>
+       <span class="nc">List</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">badPageViews</span> <span class="o">=</span> <span 
class="nc">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span 
class="n">generatePageViews</span><span class="o">(..));</span>
+       <span class="nc">List</span><span class="o">&lt;</span><span 
class="nc">Profile</span><span class="o">&gt;</span> <span 
class="n">expectedGoodPageViews</span> <span class="o">=</span> <span 
class="nc">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span 
class="n">generatePageViews</span><span class="o">(..));</span>
      
-       <span class="n">InMemorySystemDescriptor</span> <span 
class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">InMemorySystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;kafka&quot;</span><span class="o">);</span>
+       <span class="nc">InMemorySystemDescriptor</span> <span 
class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> 
<span class="nc">InMemorySystemDescriptor</span><span class="o">(</span><span 
class="s">"kafka"</span><span class="o">);</span>
      
-       <span class="n">InMemoryInputDescriptor</span><span 
class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> 
<span class="n">pageViewInput</span> <span class="o">=</span> <span 
class="n">inMemory</span>
-          <span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">&quot;pageViewEvent&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;());</span>
+       <span class="nc">InMemoryInputDescriptor</span><span 
class="o">&lt;</span><span class="nc">PageView</span><span 
class="o">&gt;</span> <span class="n">pageViewInput</span> <span 
class="o">=</span> <span class="n">inMemory</span>
+          <span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">"pageViewEvent"</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">NoOpSerde</span><span 
class="o">&lt;&gt;());</span>
      
-       <span class="n">InMemoryOutputDescriptor</span><span 
class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> 
<span class="n">pageViewOutput</span> <span class="o">=</span> <span 
class="n">inMemory</span>
-          <span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">&quot;goodPageViewEvent&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;());</span>
+       <span class="nc">InMemoryOutputDescriptor</span><span 
class="o">&lt;</span><span class="nc">PageView</span><span 
class="o">&gt;</span> <span class="n">pageViewOutput</span> <span 
class="o">=</span> <span class="n">inMemory</span>
+          <span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">"goodPageViewEvent"</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">NoOpSerde</span><span 
class="o">&lt;&gt;());</span>
      
-       <span class="n">TestRunner</span>
-          <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">BadPageViewFilter</span><span class="o">())</span>
+       <span class="nc">TestRunner</span>
+          <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="nc">BadPageViewFilter</span><span class="o">())</span>
           <span class="o">.</span><span class="na">addInputStream</span><span 
class="o">(</span><span class="n">pageViewInput</span><span class="o">,</span> 
<span class="n">badPageViews</span><span class="o">)</span>
           <span class="o">.</span><span class="na">addOutputStream</span><span 
class="o">(</span><span class="n">pageViewOutput</span><span class="o">,</span> 
<span class="mi">1</span><span class="o">)</span>
-          <span class="o">.</span><span class="na">run</span><span 
class="o">(</span><span class="n">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">2</span><span class="o">));</span>
+          <span class="o">.</span><span class="na">run</span><span 
class="o">(</span><span class="nc">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">2</span><span class="o">));</span>
      
-       <span class="n">StreamAssert</span><span class="o">.</span><span 
class="na">containsInOrder</span><span class="o">(</span><span 
class="n">expectedGoodPageViews</span><span class="o">,</span> <span 
class="n">pageViewOutput</span><span class="o">,</span> <span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span>
+       <span class="nc">StreamAssert</span><span class="o">.</span><span 
class="na">containsInOrder</span><span class="o">(</span><span 
class="n">expectedGoodPageViews</span><span class="o">,</span> <span 
class="n">pageViewOutput</span><span class="o">,</span> <span 
class="nc">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span>
      <span class="o">}</span></code></pre></figure>
 
 <p>Follow a similar approach for Legacy Low Level API, just provide the 
classname 
 (class implementing StreamTask or AsyncStreamTask) to TestRunner</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>      <span class="kd">public</span> <span 
class="kd">class</span> <span class="nc">MultiplyByTenStreamTask</span> <span 
class="kd">implements</span> <span class="n">StreamTask</span> <span 
class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
   <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">MultiplyByTenStreamTask</span> <span class="kd">implements</span> 
<span class="nc">StreamTask</span> <span class="o">{</span>
        <span class="nd">@Override</span>
-       <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">process</span><span class="o">(</span><span 
class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span 
class="o">,</span> <span class="n">MessageCollector</span> <span 
class="n">collector</span><span class="o">,</span> <span 
class="n">TaskCoordinator</span> <span class="n">coordinator</span><span 
class="o">)</span>
-           <span class="kd">throws</span> <span class="n">Exception</span> 
<span class="o">{</span>
-         <span class="n">Integer</span> <span class="n">obj</span> <span 
class="o">=</span> <span class="o">(</span><span class="n">Integer</span><span 
class="o">)</span> <span class="n">envelope</span><span class="o">.</span><span 
class="na">getMessage</span><span class="o">();</span>
-         <span class="n">collector</span><span class="o">.</span><span 
class="na">send</span><span class="o">(</span><span class="k">new</span> <span 
class="n">OutgoingMessageEnvelope</span><span class="o">(</span><span 
class="k">new</span> <span class="n">SystemStream</span><span 
class="o">(</span><span class="s">&quot;test&quot;</span><span 
class="o">,</span> <span class="s">&quot;output&quot;</span><span 
class="o">),</span>
+       <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">process</span><span class="o">(</span><span 
class="nc">IncomingMessageEnvelope</span> <span class="n">envelope</span><span 
class="o">,</span> <span class="nc">MessageCollector</span> <span 
class="n">collector</span><span class="o">,</span> <span 
class="nc">TaskCoordinator</span> <span class="n">coordinator</span><span 
class="o">)</span>
+           <span class="kd">throws</span> <span class="nc">Exception</span> 
<span class="o">{</span>
+         <span class="nc">Integer</span> <span class="n">obj</span> <span 
class="o">=</span> <span class="o">(</span><span class="nc">Integer</span><span 
class="o">)</span> <span class="n">envelope</span><span class="o">.</span><span 
class="na">getMessage</span><span class="o">();</span>
+         <span class="n">collector</span><span class="o">.</span><span 
class="na">send</span><span class="o">(</span><span class="k">new</span> <span 
class="nc">OutgoingMessageEnvelope</span><span class="o">(</span><span 
class="k">new</span> <span class="nc">SystemStream</span><span 
class="o">(</span><span class="s">"test"</span><span class="o">,</span> <span 
class="s">"output"</span><span class="o">),</span>
              <span class="n">envelope</span><span class="o">.</span><span 
class="na">getKey</span><span class="o">(),</span> <span 
class="n">envelope</span><span class="o">.</span><span 
class="na">getKey</span><span class="o">(),</span> <span class="n">obj</span> 
<span class="o">*</span> <span class="mi">10</span><span class="o">));</span>
        <span class="o">}</span>
       <span class="o">}</span>
        
       <span class="nd">@Test</span>
-      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">testLowLevelApi</span><span class="o">()</span> <span 
class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
-        <span class="n">List</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">inputList</span> <span class="o">=</span> <span 
class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span class="mi">1</span><span 
class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span 
class="mi">3</span><span class="o">,</span> <span class="mi">4</span><span 
class="o">,</span> <span class="mi">5</span><span class="o">);</span>
-        <span class="n">List</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">outputList</span> <span class="o">=</span> <span 
class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span class="mi">10</span><span 
class="o">,</span> <span class="mi">20</span><span class="o">,</span> <span 
class="mi">30</span><span class="o">,</span> <span class="mi">40</span><span 
class="o">,</span> <span class="mi">50</span><span class="o">);</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">testLowLevelApi</span><span class="o">()</span> <span 
class="kd">throws</span> <span class="nc">Exception</span> <span 
class="o">{</span>
+        <span class="nc">List</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">inputList</span> <span class="o">=</span> <span 
class="nc">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span class="mi">1</span><span 
class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span 
class="mi">3</span><span class="o">,</span> <span class="mi">4</span><span 
class="o">,</span> <span class="mi">5</span><span class="o">);</span>
+        <span class="nc">List</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">outputList</span> <span class="o">=</span> <span 
class="nc">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span class="mi">10</span><span 
class="o">,</span> <span class="mi">20</span><span class="o">,</span> <span 
class="mi">30</span><span class="o">,</span> <span class="mi">40</span><span 
class="o">,</span> <span class="mi">50</span><span class="o">);</span>
        
-        <span class="n">InMemorySystemDescriptor</span> <span 
class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">InMemorySystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;test&quot;</span><span class="o">);</span>
+        <span class="nc">InMemorySystemDescriptor</span> <span 
class="n">inMemory</span> <span class="o">=</span> <span class="k">new</span> 
<span class="nc">InMemorySystemDescriptor</span><span class="o">(</span><span 
class="s">"test"</span><span class="o">);</span>
        
-        <span class="n">InMemoryInputDescriptor</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> 
<span class="n">numInput</span> <span class="o">=</span> <span 
class="n">inMemory</span>
-           <span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">&quot;input&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;</span><span class="n">Integer</span><span 
class="o">&gt;());</span>
+        <span class="nc">InMemoryInputDescriptor</span><span 
class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> 
<span class="n">numInput</span> <span class="o">=</span> <span 
class="n">inMemory</span>
+           <span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">"input"</span><span class="o">,</span> <span class="k">new</span> 
<span class="nc">NoOpSerde</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">&gt;());</span>
        
-        <span class="n">InMemoryOutputDescriptor</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> 
<span class="n">numOutput</span> <span class="o">=</span> <span 
class="n">inMemory</span>
-           <span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">&quot;output&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;</span><span class="n">Integer</span><span 
class="o">&gt;());</span>
+        <span class="nc">InMemoryOutputDescriptor</span><span 
class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> 
<span class="n">numOutput</span> <span class="o">=</span> <span 
class="n">inMemory</span>
+           <span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">"output"</span><span class="o">,</span> <span class="k">new</span> 
<span class="nc">NoOpSerde</span><span class="o">&lt;</span><span 
class="nc">Integer</span><span class="o">&gt;());</span>
        
-        <span class="n">TestRunner</span>
-           <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="n">MyStreamTestTask</span><span 
class="o">.</span><span class="na">class</span><span class="o">)</span>
+        <span class="nc">TestRunner</span>
+           <span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="nc">MyStreamTestTask</span><span 
class="o">.</span><span class="na">class</span><span class="o">)</span>
            <span class="o">.</span><span class="na">addInputStream</span><span 
class="o">(</span><span class="n">numInput</span><span class="o">,</span> <span 
class="n">inputList</span><span class="o">)</span>
            <span class="o">.</span><span 
class="na">addOutputStream</span><span class="o">(</span><span 
class="n">numOutput</span><span class="o">,</span> <span 
class="mi">1</span><span class="o">)</span>
-           <span class="o">.</span><span class="na">run</span><span 
class="o">(</span><span class="n">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">1</span><span class="o">));</span>
+           <span class="o">.</span><span class="na">run</span><span 
class="o">(</span><span class="nc">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">1</span><span class="o">));</span>
        
-        <span class="n">Assert</span><span class="o">.</span><span 
class="na">assertThat</span><span class="o">(</span><span 
class="n">TestRunner</span><span class="o">.</span><span 
class="na">consumeStream</span><span class="o">(</span><span 
class="n">imod</span><span class="o">,</span> <span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">)).</span><span 
class="na">get</span><span class="o">(</span><span class="mi">0</span><span 
class="o">),</span>
-           <span class="n">IsIterableContainingInOrder</span><span 
class="o">.</span><span class="na">contains</span><span class="o">(</span><span 
class="n">outputList</span><span class="o">.</span><span 
class="na">toArray</span><span class="o">()));;</span>
+        <span class="nc">Assert</span><span class="o">.</span><span 
class="na">assertThat</span><span class="o">(</span><span 
class="nc">TestRunner</span><span class="o">.</span><span 
class="na">consumeStream</span><span class="o">(</span><span 
class="n">imod</span><span class="o">,</span> <span 
class="nc">Duration</span><span class="o">.</span><span 
class="na">ofMillis</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">)).</span><span 
class="na">get</span><span class="o">(</span><span class="mi">0</span><span 
class="o">),</span>
+           <span class="nc">IsIterableContainingInOrder</span><span 
class="o">.</span><span class="na">contains</span><span class="o">(</span><span 
class="n">outputList</span><span class="o">.</span><span 
class="na">toArray</span><span class="o">()));;</span>
       <span class="o">}</span></code></pre></figure>
 
 <h2 id="stateful-testing">Stateful Testing</h2>
 
 <ol>
-<li>There is no additional config/changes required for TestRunner apis for 
testing samza jobs using StreamApplication or TaskApplication APIs</li>
-<li>Legacy task api only supports RocksDbTable and needs following configs to 
be added to TestRunner. 
-For example if your job is using a RocksDbTable named &ldquo;my-store&rdquo; 
with key and msg serde of String type</li>
+  <li>There is no additional config/changes required for TestRunner apis for 
testing samza jobs using StreamApplication or TaskApplication APIs</li>
+  <li>Legacy task api only supports RocksDbTable and needs following configs 
to be added to TestRunner. 
+For example if your job is using a RocksDbTable named “my-store” with key 
and msg serde of String type</li>
 </ol>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">Map</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">config</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">HashMap</span><span class="o">&lt;&gt;();</span>
-    <span class="n">config</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;stores.my-store.factory&quot;</span><span class="o">,</span> 
<span 
class="s">&quot;org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory&quot;</span><span
 class="o">);</span>
-    <span class="n">config</span><span class="o">.</span><span 
class="na">out</span><span class="o">(</span><span 
class="s">&quot;serializers.registry.string.class&quot;</span><span 
class="o">,</span> <span 
class="s">&quot;org.apache.samza.serializers.StringSerdeFactory&quot;</span><span
 class="o">);</span>
-    <span class="n">config</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;stores.my-store.key.serde&quot;</span><span class="o">,</span> 
<span class="s">&quot;string&quot;</span><span class="o">);</span>
-    <span class="n">config</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;stores.my-store.msg.serde&quot;</span><span class="o">,</span> 
<span class="s">&quot;string&quot;</span><span class="o">);</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">Map</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">String</span><span class="o">&gt;</span> <span 
class="n">config</span> <span class="o">=</span> <span class="k">new</span> 
<span class="nc">HashMap</span><span class="o">&lt;&gt;();</span>
+    <span class="n">config</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">"stores.my-store.factory"</span><span class="o">,</span> <span 
class="s">"org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"</span><span
 class="o">);</span>
+    <span class="n">config</span><span class="o">.</span><span 
class="na">out</span><span class="o">(</span><span 
class="s">"serializers.registry.string.class"</span><span class="o">,</span> 
<span class="s">"org.apache.samza.serializers.StringSerdeFactory"</span><span 
class="o">);</span>
+    <span class="n">config</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">"stores.my-store.key.serde"</span><span class="o">,</span> <span 
class="s">"string"</span><span class="o">);</span>
+    <span class="n">config</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">"stores.my-store.msg.serde"</span><span class="o">,</span> <span 
class="s">"string"</span><span class="o">);</span>
     
-    <span class="n">TestRunner</span>
+    <span class="nc">TestRunner</span>
         <span class="o">.</span><span class="na">of</span><span 
class="o">(...)</span>
         <span class="o">.</span><span class="na">addConfig</span><span 
class="o">(</span><span class="n">config</span><span class="o">)</span>
         <span class="o">...</span>

Modified: 
samza/site/learn/documentation/latest/architecture/architecture-overview.html
URL: 
http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/architecture/architecture-overview.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- 
samza/site/learn/documentation/latest/architecture/architecture-overview.html 
(original)
+++ 
samza/site/learn/documentation/latest/architecture/architecture-overview.html 
Wed Jan 18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a 
href="/learn/documentation/1.8.0/architecture/architecture-overview">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/1.7.0/architecture/architecture-overview">1.7.0</a></li>
+
+              
+
               <li class="hide"><a 
href="/learn/documentation/1.6.0/architecture/architecture-overview">1.6.0</a></li>
 
               
@@ -640,77 +654,74 @@
 -->
 
 <ul>
-<li><a href="#distributed-execution">Distributed execution</a>
-
-<ul>
-<li><a href="#task">Task</a></li>
-<li><a href="#container">Container</a></li>
-<li><a href="#coordinator">Coordinator</a></li>
-</ul></li>
-<li><a href="#threading-model">Threading model and ordering</a></li>
-<li><a href="#incremental-checkpoints">Incremental checkpointing</a></li>
-<li><a href="#state-management">State management</a></li>
-<li><a href="#fault-tolerance-of-state">Fault tolerance of state</a></li>
-<li><a href="#host-affinity">Host affinity</a></li>
+  <li><a href="#distributed-execution">Distributed execution</a>
+    <ul>
+      <li><a href="#task">Task</a></li>
+      <li><a href="#container">Container</a></li>
+      <li><a href="#coordinator">Coordinator</a></li>
+    </ul>
+  </li>
+  <li><a href="#threading-model">Threading model and ordering</a></li>
+  <li><a href="#incremental-checkpoints">Incremental checkpointing</a></li>
+  <li><a href="#state-management">State management</a></li>
+  <li><a href="#fault-tolerance-of-state">Fault tolerance of state</a></li>
+  <li><a href="#host-affinity">Host affinity</a></li>
 </ul>
 
 <h2 id="distributed-execution">Distributed execution</h2>
 
 <h3 id="task">Task</h3>
 
-<p><img src="/img/latest/learn/documentation/architecture/task-assignment.png" 
alt="diagram-large"></p>
+<p><img src="/img/latest/learn/documentation/architecture/task-assignment.png" 
alt="diagram-large" /></p>
 
-<p>Samza scales your application by logically breaking it down into multiple 
tasks. A task is the unit of parallelism for your application. Each task 
consumes data from one partition of your input streams. The assignment of 
partitions to tasks never changes: if a task is on a machine that fails, the 
task is restarted elsewhere, still consuming the same stream partitions. Since 
there is no ordering of messages across partitions, it allows tasks to execute 
entirely independent of each other without sharing any state. </p>
+<p>Samza scales your application by logically breaking it down into multiple 
tasks. A task is the unit of parallelism for your application. Each task 
consumes data from one partition of your input streams. The assignment of 
partitions to tasks never changes: if a task is on a machine that fails, the 
task is restarted elsewhere, still consuming the same stream partitions. Since 
there is no ordering of messages across partitions, it allows tasks to execute 
entirely independent of each other without sharing any state.</p>
 
 <h3 id="container">Container</h3>
+<p><img 
src="/img/latest/learn/documentation/architecture/distributed-execution.png" 
alt="diagram-large" /></p>
 
-<p><img 
src="/img/latest/learn/documentation/architecture/distributed-execution.png" 
alt="diagram-large"></p>
-
-<p>Just like a task is the logical unit of parallelism for your application, a 
container is the physical unit. You can think of each worker as a JVM process, 
which runs one or more tasks. An application typically has multiple containers 
distributed across hosts. </p>
+<p>Just like a task is the logical unit of parallelism for your application, a 
container is the physical unit. You can think of each worker as a JVM process, 
which runs one or more tasks. An application typically has multiple containers 
distributed across hosts.</p>
 
 <h3 id="coordinator">Coordinator</h3>
-
-<p>Each application also has a coordinator which manages the assignment of 
tasks across the individual containers. The coordinator monitors the liveness 
of individual containers and redistributes the tasks among the remaining ones 
during a failure. <br/><br/>
+<p>Each application also has a coordinator which manages the assignment of 
tasks across the individual containers. The coordinator monitors the liveness 
of individual containers and redistributes the tasks among the remaining ones 
during a failure. <br /><br />
 The coordinator itself is pluggable, enabling Samza to support multiple 
deployment options. You can use Samza as a light-weight embedded library that 
easily integrates with a larger application. Alternately, you can deploy and 
run it as a managed framework using a cluster-manager like YARN. It is worth 
noting that Samza is the only system that offers first-class support for both 
these deployment options. Some systems like Kafka-streams only support the 
embedded library model while others like Flink, Spark streaming etc., only 
offer the framework model for stream-processing.</p>
 
 <h3 id="threading-model-and-ordering">Threading model and ordering</h3>
 
-<p>Samza offers a flexible threading model to run each task. When running your 
applications, you can control the number of workers needed to process your 
data. You can also configure the number of threads each worker uses to run its 
assigned tasks. Each thread can run one or more tasks. Tasks don’t share any 
state - hence, you don’t have to worry about coordination across these 
threads. </p>
+<p>Samza offers a flexible threading model to run each task. When running your 
applications, you can control the number of workers needed to process your 
data. You can also configure the number of threads each worker uses to run its 
assigned tasks. Each thread can run one or more tasks. Tasks don’t share any 
state - hence, you don’t have to worry about coordination across these 
threads.</p>
 
 <p>Another common scenario in stream processing is to interact with remote 
services or databases. For example, a notifications system which processes each 
incoming message, generates an email and invokes a REST api to deliver it. 
Samza offers a fully asynchronous API for use-cases like this which require 
high-throughput remote I/O. 
 s
 By default, all messages delivered to a task are processed by the same thread. 
This guarantees in-order processing of messages within a partition. However, 
some applications don’t care about in-order processing of messages. For such 
use-cases, Samza also supports processing messages out-of-order within a single 
partition. This typically offers higher throughput by allowing for multiple 
concurrent messages in each partition.</p>
 
 <h3 id="incremental-checkpointing">Incremental checkpointing</h3>
+<p><img 
src="/img/latest/learn/documentation/architecture/incremental-checkpointing.png"
 alt="diagram-large" /></p>
 
-<p><img 
src="/img/latest/learn/documentation/architecture/incremental-checkpointing.png"
 alt="diagram-large"></p>
-
-<p>Samza guarantees that messages won’t be lost, even if your job crashes, 
if a machine dies, if there is a network fault, or something else goes wrong. 
To achieve this property, each task periodically persists the last processed 
offsets for its input stream partitions. If a task needs to be restarted on a 
different worker due to a failure, it resumes processing from its latest 
checkpoint. </p>
+<p>Samza guarantees that messages won’t be lost, even if your job crashes, 
if a machine dies, if there is a network fault, or something else goes wrong. 
To achieve this property, each task periodically persists the last processed 
offsets for its input stream partitions. If a task needs to be restarted on a 
different worker due to a failure, it resumes processing from its latest 
checkpoint.</p>
 
 <p>Samza’s checkpointing mechanism ensures each task also stores the 
contents of its state-store consistently with its last processed offsets. 
Checkpoints are flushed incrementally ie., the state-store only flushes the 
delta since the previous checkpoint instead of flushing its entire state.</p>
 
 <h3 id="state-management">State management</h3>
-
 <p>Samza offers scalable, high-performance storage to enable you to build 
stateful stream-processing applications. This is implemented by associating 
each Samza task with its own instance of a local database (aka. a state-store). 
The state-store associated with a particular task only stores data 
corresponding to the partitions processed by that task. This is important: when 
you scale out your job by giving it more computing resources, Samza 
transparently migrates the tasks from one machine to another. By giving each 
task its own state, tasks can be relocated without affecting your overall 
application. 
-<img src="/img/latest/learn/documentation/architecture/state-store.png" 
alt="diagram-large"></p>
+<img src="/img/latest/learn/documentation/architecture/state-store.png" 
alt="diagram-large" /></p>
 
-<p>Here are some key advantages of this architecture. <br/>
-- The state is stored on disk, so the job can maintain more state than would 
fit in memory. <br/>
-- It is stored on the same machine as the task, to avoid the performance 
problems of making database queries over the network. <br/>
-- Each job has its own store, to avoid the isolation issues in a shared remote 
database (if you make an expensive query, it affects only the current task, 
nobody else). <br/>
-- Different storage engines can be plugged in - for example, a remote 
data-store that enables richer query capabilities <br/></p>
+<p>Here are some key advantages of this architecture. <br /></p>
+<ul>
+  <li>The state is stored on disk, so the job can maintain more state than 
would fit in memory. <br /></li>
+  <li>It is stored on the same machine as the task, to avoid the performance 
problems of making database queries over the network. <br /></li>
+  <li>Each job has its own store, to avoid the isolation issues in a shared 
remote database (if you make an expensive query, it affects only the current 
task, nobody else). <br /></li>
+  <li>Different storage engines can be plugged in - for example, a remote 
data-store that enables richer query capabilities <br /></li>
+</ul>
 
 <h3 id="fault-tolerance-of-state">Fault tolerance of state</h3>
-
 <p>Distributed stream processing systems need recover quickly from failures to 
resume their processing. While having a durable local store offers great 
performance, we should still guarantee fault-tolerance. For this purpose, Samza 
replicates every change to the local store into a separate stream (aka. called 
a changelog for the store). This allows you to later recover the data in the 
store by reading the contents of the changelog from the beginning. A 
log-compacted Kafka topic is typically used as a changelog since Kafka 
automatically retains the most recent value for each key.
-<img src="/img/latest/learn/documentation/architecture/fault-tolerance.png" 
alt="diagram-large"></p>
+<img src="/img/latest/learn/documentation/architecture/fault-tolerance.png" 
alt="diagram-large" /></p>
 
 <h3 id="host-affinity">Host affinity</h3>
-
-<p>If your application has several terabytes of state, then bootstrapping it 
every time by reading the changelog will stall progress. So, it’s critical to 
be able to recover state swiftly during failures. For this purpose, Samza takes 
data-locality into account when scheduling tasks on hosts. This is implemented 
by persisting metadata about the host each task is currently running on. </p>
+<p>If your application has several terabytes of state, then bootstrapping it 
every time by reading the changelog will stall progress. So, it’s critical to 
be able to recover state swiftly during failures. For this purpose, Samza takes 
data-locality into account when scheduling tasks on hosts. This is implemented 
by persisting metadata about the host each task is currently running on.</p>
 
 <p>During a new deployment of the application, Samza tries to re-schedule the 
tasks on the same hosts they were previously on. This enables the task to 
re-use the snapshot of its local-state from its previous run on that host. We 
call this feature <em>host-affinity</em> since it tries to preserve the 
assignment of tasks to hosts. This is a key differentiator that enables Samza 
applications to scale to several terabytes of local-state with effectively zero 
downtime.</p>
 
+
            
         </div>
       </div>

Modified: samza/site/learn/documentation/latest/architecture/kinesis.html
URL: 
http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/architecture/kinesis.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/architecture/kinesis.html (original)
+++ samza/site/learn/documentation/latest/architecture/kinesis.html Wed Jan 18 
19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a 
href="/learn/documentation/1.8.0/architecture/kinesis">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/1.7.0/architecture/kinesis">1.7.0</a></li>
+
+              
+
               <li class="hide"><a 
href="/learn/documentation/1.6.0/architecture/kinesis">1.6.0</a></li>
 
               
@@ -641,6 +655,7 @@
 
 <h2 id="samza-architecture-page">Samza architecture page</h2>
 
+
            
         </div>
       </div>


Reply via email to