Modified: samza/site/learn/tutorials/latest/hello-samza-high-level-code.html
URL: 
http://svn.apache.org/viewvc/samza/site/learn/tutorials/latest/hello-samza-high-level-code.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/tutorials/latest/hello-samza-high-level-code.html 
(original)
+++ samza/site/learn/tutorials/latest/hello-samza-high-level-code.html Wed Jan 
18 19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.6.0">1.6.0</a>
       
         
@@ -545,57 +551,54 @@
    limitations under the License.
 -->
 
-<p>This tutorial introduces the high level API by showing you how to build 
wikipedia application from the <a 
href="hello-samza-high-level-yarn.html">hello-samza high level API Yarn 
tutorial</a>. Upon completion of this tutorial, you&rsquo;ll know how to 
implement and configure a <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a>.
 Along the way, you&rsquo;ll see how to use some of the basic operators as well 
as how to leverage key-value stores and metrics in an app.</p>
+<p>This tutorial introduces the high level API by showing you how to build 
wikipedia application from the [hello-samza high level API Yarn tutorial] 
(hello-samza-high-level-yarn.html). Upon completion of this tutorial, you’ll 
know how to implement and configure a <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a>.
 Along the way, you’ll see how to use some of the basic operators as well as 
how to leverage key-value stores and metrics in an app.</p>
 
-<p>The same <a 
href="https://github.com/apache/samza-hello-samza";>hello-samza</a> project is 
used for this tutorial as for many of the others. You will clone that project 
and by the end of the tutorial, you will have implemented a duplicate of the 
<code>WikipediaApplication</code>.</p>
+<p>The same <a 
href="https://github.com/apache/samza-hello-samza";>hello-samza</a> project is 
used for this tutorial as for many of the others. You will clone that project 
and by the end of the tutorial, you will have implemented a duplicate of the 
<code class="language-plaintext 
highlighter-rouge">WikipediaApplication</code>.</p>
 
-<p>Let&rsquo;s get started.</p>
+<p>Let’s get started.</p>
 
 <h3 id="get-the-code">Get the Code</h3>
 
 <p>Check out the hello-samza project:</p>
 
-<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span>git clone 
https://gitbox.apache.org/repos/asf/samza-hello-samza.git hello-samza
-<span class="nb">cd</span> hello-samza
+<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash">git clone 
https://gitbox.apache.org/repos/asf/samza-hello-samza.git hello-samza
+<span class="nb">cd </span>hello-samza
 git checkout latest</code></pre></figure>
 
-<p>This project already contains implementations of the wikipedia application 
using both the Low Level Task API and the High Level Streams API. The Low Level 
Task API implementations are in the <code>samza.examples.wikipedia.task</code> 
package. The High Level Streams API implementation is in the 
<code>samza.examples.wikipedia.application</code> package.</p>
+<p>This project already contains implementations of the wikipedia application 
using both the Low Level Task API and the High Level Streams API. The Low Level 
Task API implementations are in the <code class="language-plaintext 
highlighter-rouge">samza.examples.wikipedia.task</code> package. The High Level 
Streams API implementation is in the <code class="language-plaintext 
highlighter-rouge">samza.examples.wikipedia.application</code> package.</p>
 
 <p>This tutorial will provide step by step instructions to recreate the 
existing wikipedia application.</p>
 
 <h3 id="introduction-to-wikipedia-consumer">Introduction to Wikipedia 
Consumer</h3>
+<p>In order to consume events from Wikipedia, the hello-samza project includes 
a <code class="language-plaintext 
highlighter-rouge">WikipediaSystemFactory</code> implementation of the Samza <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/system/SystemFactory.html">SystemFactory</a>
 that provides a <code class="language-plaintext 
highlighter-rouge">WikipediaConsumer</code>.</p>
 
-<p>In order to consume events from Wikipedia, the hello-samza project includes 
a <code>WikipediaSystemFactory</code> implementation of the Samza <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/system/SystemFactory.html">SystemFactory</a>
 that provides a <code>WikipediaConsumer</code>.</p>
-
-<p>The WikipediaConsumer is an implementation of <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/system/SystemConsumer.html">SystemConsumer</a>
 that can consume events from Wikipedia. It is also a listener for events from 
the <code>WikipediaFeed</code>. It&rsquo;s important to note that the events 
received in <code>onEvent</code> are of the type 
<code>WikipediaFeedEvent</code>, so we will expect that type for messages on 
our input streams. For other systems, the messages may come in the form of 
<code>byte[]</code>. In that case you may want to configure a samza <a 
href="/learn/documentation/latest/container/serialization.html">serde</a> and 
the application should expect the output type of that serde.</p>
+<p>The WikipediaConsumer is an implementation of <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/system/SystemConsumer.html">SystemConsumer</a>
 that can consume events from Wikipedia. It is also a listener for events from 
the <code class="language-plaintext highlighter-rouge">WikipediaFeed</code>. 
It’s important to note that the events received in <code 
class="language-plaintext highlighter-rouge">onEvent</code> are of the type 
<code class="language-plaintext highlighter-rouge">WikipediaFeedEvent</code>, 
so we will expect that type for messages on our input streams. For other 
systems, the messages may come in the form of <code class="language-plaintext 
highlighter-rouge">byte[]</code>. In that case you may want to configure a 
samza <a 
href="/learn/documentation/latest/container/serialization.html">serde</a> and 
the application should expect the output type of that serde.</p>
 
-<p>Now that we understand the Wikipedia system and the types of inputs 
we&rsquo;ll be processing, we can proceed with creating our application.</p>
+<p>Now that we understand the Wikipedia system and the types of inputs we’ll 
be processing, we can proceed with creating our application.</p>
 
 <h3 id="create-the-initial-config">Create the Initial Config</h3>
-
 <p>In the hello-samza project, configs are kept in the 
<em>src/main/config/</em> path. This is where we will add the config for our 
application.
 Create a new file named <em>my-wikipedia-application.properties</em> in this 
location.</p>
 
 <h4 id="core-configuration">Core Configuration</h4>
+<p>Let’s start by adding some of the core properties to the file:</p>
 
-<p>Let&rsquo;s start by adding some of the core properties to the file:</p>
-
-<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span><span class="c1"># Licensed to the Apache 
Software Foundation (ASF) under one</span>
-<span class="c1"># or more contributor license agreements.  See the NOTICE 
file</span>
-<span class="c1"># distributed with this work for additional information</span>
-<span class="c1"># regarding copyright ownership.  The ASF licenses this 
file</span>
-<span class="c1"># to you under the Apache License, Version 2.0 (the</span>
-<span class="c1"># &quot;License&quot;); you may not use this file except in 
compliance</span>
-<span class="c1"># with the License.  You may obtain a copy of the License 
at</span>
-<span class="c1">#</span>
-<span class="c1">#   http://www.apache.org/licenses/LICENSE-2.0</span>
-<span class="c1">#</span>
-<span class="c1"># Unless required by applicable law or agreed to in 
writing,</span>
-<span class="c1"># software distributed under the License is distributed on 
an</span>
-<span class="c1"># &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS 
OF ANY</span>
-<span class="c1"># KIND, either express or implied.  See the License for 
the</span>
-<span class="c1"># specific language governing permissions and 
limitations</span>
-<span class="c1"># under the License.</span>
+<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span class="c"># Licensed to the Apache Software Foundation 
(ASF) under one</span>
+<span class="c"># or more contributor license agreements.  See the NOTICE 
file</span>
+<span class="c"># distributed with this work for additional information</span>
+<span class="c"># regarding copyright ownership.  The ASF licenses this 
file</span>
+<span class="c"># to you under the Apache License, Version 2.0 (the</span>
+<span class="c"># "License"); you may not use this file except in 
compliance</span>
+<span class="c"># with the License.  You may obtain a copy of the License 
at</span>
+<span class="c">#</span>
+<span class="c">#   http://www.apache.org/licenses/LICENSE-2.0</span>
+<span class="c">#</span>
+<span class="c"># Unless required by applicable law or agreed to in 
writing,</span>
+<span class="c"># software distributed under the License is distributed on 
an</span>
+<span class="c"># "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span>
+<span class="c"># KIND, either express or implied.  See the License for 
the</span>
+<span class="c"># specific language governing permissions and 
limitations</span>
+<span class="c"># under the License.</span>
 
 app.class<span 
class="o">=</span>samza.examples.wikipedia.application.MyWikipediaApplication
 app.runner.class<span 
class="o">=</span>org.apache.samza.runtime.RemoteApplicationRunner
@@ -604,37 +607,36 @@ job.factory.class<span class="o">=</span
 job.name<span class="o">=</span>my-wikipedia-application
 job.default.system<span class="o">=</span>kafka
 
-yarn.package.path<span class="o">=</span>file://<span 
class="si">${</span><span class="nv">basedir</span><span 
class="si">}</span>/target/<span class="si">${</span><span 
class="nv">project</span><span class="p">.artifactId</span><span 
class="si">}</span>-<span class="si">${</span><span class="nv">pom</span><span 
class="p">.version</span><span 
class="si">}</span>-dist.tar.gz</code></pre></figure>
+yarn.package.path<span class="o">=</span>file://<span class="k">${</span><span 
class="nv">basedir</span><span class="k">}</span>/target/<span 
class="k">${</span><span class="nv">project</span><span 
class="p">.artifactId</span><span class="k">}</span>-<span 
class="k">${</span><span class="nv">pom</span><span 
class="p">.version</span><span class="k">}</span><span 
class="nt">-dist</span>.tar.gz</code></pre></figure>
 
-<p>Be sure to include the Apache header. The project will not compile without 
it. </p>
+<p>Be sure to include the Apache header. The project will not compile without 
it.</p>
 
-<p>Here&rsquo;s a brief summary of what we configured so far.</p>
+<p>Here’s a brief summary of what we configured so far.</p>
 
 <ul>
-<li><strong>app.class</strong>: the class that defines the application logic. 
We will create this class later.</li>
-<li><strong>app.runner.class</strong>: the runner implementation which will 
launch our application. Since we are using YARN, we use 
<code>RemoteApplicationRunner</code> which is required for any cluster-based 
deployment.</li>
-<li><strong>job.factory.class</strong>: the <a 
href="/learn/documentation/latest/jobs/job-runner.html">factory</a> that will 
create the runtime instances of our jobs. Since we are using YARN, we want each 
job to be created as a <a 
href="/learn/documentation/latest/jobs/yarn-jobs.html">YARN job</a>, so we use 
<code>YarnJobFactory</code></li>
-<li><strong>job.name</strong>: the primary identifier for the job.</li>
-<li><strong>job.default.system</strong>: the default system to use for input, 
output, and internal metadata streams. This can be overridden on a per-stream 
basis. The <em>kafka</em> system will be defined in the next section.</li>
-<li><strong>yarn.package.path</strong>: tells YARN where to find the <a 
href="/learn/documentation/latest/jobs/packaging.html">job package</a> so the 
Node Managers can download it.</li>
+  <li><strong>app.class</strong>: the class that defines the application 
logic. We will create this class later.</li>
+  <li><strong>app.runner.class</strong>: the runner implementation which will 
launch our application. Since we are using YARN, we use <code 
class="language-plaintext highlighter-rouge">RemoteApplicationRunner</code> 
which is required for any cluster-based deployment.</li>
+  <li><strong>job.factory.class</strong>: the <a 
href="/learn/documentation/latest/jobs/job-runner.html">factory</a> that will 
create the runtime instances of our jobs. Since we are using YARN, we want each 
job to be created as a <a 
href="/learn/documentation/latest/jobs/yarn-jobs.html">YARN job</a>, so we use 
<code class="language-plaintext highlighter-rouge">YarnJobFactory</code></li>
+  <li><strong>job.name</strong>: the primary identifier for the job.</li>
+  <li><strong>job.default.system</strong>: the default system to use for 
input, output, and internal metadata streams. This can be overridden on a 
per-stream basis. The <em>kafka</em> system will be defined in the next 
section.</li>
+  <li><strong>yarn.package.path</strong>: tells YARN where to find the <a 
href="/learn/documentation/latest/jobs/packaging.html">job package</a> so the 
Node Managers can download it.</li>
 </ul>
 
 <p>These basic configurations are enough to launch the application on YARN but 
we haven’t defined any streaming systems for Samza to use, so the application 
would not process anything.</p>
 
-<p>Next, let&rsquo;s define the streaming systems with which the application 
will interact. </p>
+<p>Next, let’s define the streaming systems with which the application will 
interact.</p>
 
 <h4 id="define-systems">Define Systems</h4>
-
 <p>This Wikipedia application will consume events from Wikipedia and produce 
stats to a Kafka topic. We need to define those systems in config before Samza 
can use them. Add the following lines to the config:</p>
 
-<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span>systems.wikipedia.samza.factory<span 
class="o">=</span>samza.examples.wikipedia.system.WikipediaSystemFactory
+<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash">systems.wikipedia.samza.factory<span 
class="o">=</span>samza.examples.wikipedia.system.WikipediaSystemFactory
 systems.wikipedia.host<span class="o">=</span>irc.wikimedia.org
-systems.wikipedia.port<span class="o">=</span><span class="m">6667</span>
+systems.wikipedia.port<span class="o">=</span>6667
 
 systems.kafka.samza.factory<span 
class="o">=</span>org.apache.samza.system.kafka.KafkaSystemFactory
 systems.kafka.consumer.zookeeper.connect<span class="o">=</span>localhost:2181/
 systems.kafka.producer.bootstrap.servers<span class="o">=</span>localhost:9092
-systems.kafka.default.stream.replication.factor<span class="o">=</span><span 
class="m">1</span></code></pre></figure>
+systems.kafka.default.stream.replication.factor<span 
class="o">=</span>1</code></pre></figure>
 
 <p>The above configuration defines 2 systems; one called <em>wikipedia</em> 
and one called <em>kafka</em>.</p>
 
@@ -643,31 +645,30 @@ systems.kafka.default.stream.replication
 <p>For the <em>kafka</em> system, we set the default replication factor to 1 
for all streams because this application is intended for a demo deployment 
which utilizes a Kafka cluster with only 1 broker, so a replication factor 
larger than 1 is invalid.</p>
 
 <h4 id="configure-streams">Configure Streams</h4>
+<p>Samza identifies streams using a unique stream ID. In most cases, the 
stream ID is the same as the actual stream name. However, if a stream has a 
name that doesn’t match the pattern <code class="language-plaintext 
highlighter-rouge">[A-Za-z0-9_-]+</code>, we need to configure a separate 
<em>physical.name</em> to associate the actual stream name with a legal stream 
ID. The Wikipedia channels we will consume have a ‘#’ character in the 
names. So for each of them we must pick a legal stream ID and then configure 
the physical name to match the channel.</p>
 
-<p>Samza identifies streams using a unique stream ID. In most cases, the 
stream ID is the same as the actual stream name. However, if a stream has a 
name that doesn&rsquo;t match the pattern <code>[A-Za-z0-9_-]+</code>, we need 
to configure a separate <em>physical.name</em> to associate the actual stream 
name with a legal stream ID. The Wikipedia channels we will consume have a 
&lsquo;#&rsquo; character in the names. So for each of them we must pick a 
legal stream ID and then configure the physical name to match the channel.</p>
-
-<p>Samza uses the <em>job.default.system</em> for any streams that do not 
explicitly specify a system. In the previous sections, we defined 2 systems, 
<em>wikipedia</em> and <em>kafka</em>, and we configured <em>kafka</em> as the 
default. To understand why, let&rsquo;s look at the streams and how Samza will 
use them.</p>
+<p>Samza uses the <em>job.default.system</em> for any streams that do not 
explicitly specify a system. In the previous sections, we defined 2 systems, 
<em>wikipedia</em> and <em>kafka</em>, and we configured <em>kafka</em> as the 
default. To understand why, let’s look at the streams and how Samza will use 
them.</p>
 
 <p>For this app, Samza will:</p>
 
 <ol>
-<li>Consume from input streams</li>
-<li>Produce to an output stream and a metrics stream</li>
-<li>Both produce and consume from job-coordination, checkpoint, and changelog 
streams</li>
+  <li>Consume from input streams</li>
+  <li>Produce to an output stream and a metrics stream</li>
+  <li>Both produce and consume from job-coordination, checkpoint, and 
changelog streams</li>
 </ol>
 
-<p>While the <em>wikipedia</em> system is necessary for case 1, it does not 
support producers (we can&rsquo;t write Samza output to Wikipedia), which are 
needed for cases 2-3. So it is more convenient to use <em>kafka</em> as the 
default system. We can then explicitly configure the input streams to use the 
<em>wikipedia</em> system.</p>
+<p>While the <em>wikipedia</em> system is necessary for case 1, it does not 
support producers (we can’t write Samza output to Wikipedia), which are 
needed for cases 2-3. So it is more convenient to use <em>kafka</em> as the 
default system. We can then explicitly configure the input streams to use the 
<em>wikipedia</em> system.</p>
 
-<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span>streams.en-wikipedia.samza.system<span 
class="o">=</span>wikipedia
-streams.en-wikipedia.samza.physical.name<span class="o">=</span><span 
class="c1">#en.wikipedia</span>
+<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash">streams.en-wikipedia.samza.system<span 
class="o">=</span>wikipedia
+streams.en-wikipedia.samza.physical.name<span class="o">=</span><span 
class="c">#en.wikipedia</span>
 
 streams.en-wiktionary.samza.system<span class="o">=</span>wikipedia
-streams.en-wiktionary.samza.physical.name<span class="o">=</span><span 
class="c1">#en.wiktionary</span>
+streams.en-wiktionary.samza.physical.name<span class="o">=</span><span 
class="c">#en.wiktionary</span>
 
 streams.en-wikinews.samza.system<span class="o">=</span>wikipedia
-streams.en-wikinews.samza.physical.name<span class="o">=</span><span 
class="c1">#en.wikinews</span></code></pre></figure>
+streams.en-wikinews.samza.physical.name<span class="o">=</span><span 
class="c">#en.wikinews</span></code></pre></figure>
 
-<p>The above configurations declare 3 streams with IDs, <em>en-wikipedia</em>, 
<em>en-wiktionary</em>, and <em>en-wikinews</em>. It associates each stream 
with the <em>wikipedia</em> system we defined earlier and set the physical name 
to the corresponding Wikipedia channel. </p>
+<p>The above configurations declare 3 streams with IDs, <em>en-wikipedia</em>, 
<em>en-wiktionary</em>, and <em>en-wikinews</em>. It associates each stream 
with the <em>wikipedia</em> system we defined earlier and set the physical name 
to the corresponding Wikipedia channel.</p>
 
 <p>Since all the Kafka streams for cases 2-3 are on the default system and do 
not include special characters in their names, we do not need to configure them 
explicitly.</p>
 
@@ -676,38 +677,37 @@ streams.en-wikinews.samza.physical.name<
 <p>With the core configuration settled, we turn our attention to code.</p>
 
 <h3 id="define-application-logic">Define Application Logic</h3>
+<p>Let’s create the application class we configured above. The next 8 
sections walk you through writing the code for the Wikipedia application.</p>
 
-<p>Let&rsquo;s create the application class we configured above. The next 8 
sections walk you through writing the code for the Wikipedia application.</p>
+<p>Create a new class named <code class="language-plaintext 
highlighter-rouge">MyWikipediaApplication</code> in the <code 
class="language-plaintext 
highlighter-rouge">samza.examples.wikipedia.application</code> package. The 
class must implement <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a>
 and should look like this:</p>
 
-<p>Create a new class named <code>MyWikipediaApplication</code> in the 
<code>samza.examples.wikipedia.application</code> package. The class must 
implement <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a>
 and should look like this:</p>
-
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="cm">/*</span>
-<span class="cm"> * Licensed to the Apache Software Foundation (ASF) under 
one</span>
-<span class="cm"> * or more contributor license agreements.  See the NOTICE 
file</span>
-<span class="cm"> * distributed with this work for additional 
information</span>
-<span class="cm"> * regarding copyright ownership.  The ASF licenses this 
file</span>
-<span class="cm"> * to you under the Apache License, Version 2.0 (the</span>
-<span class="cm"> * &quot;License&quot;); you may not use this file except in 
compliance</span>
-<span class="cm"> * with the License.  You may obtain a copy of the License 
at</span>
-<span class="cm"> *</span>
-<span class="cm"> *   http://www.apache.org/licenses/LICENSE-2.0</span>
-<span class="cm"> *</span>
-<span class="cm"> * Unless required by applicable law or agreed to in 
writing,</span>
-<span class="cm"> * software distributed under the License is distributed on 
an</span>
-<span class="cm"> * &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS 
OF ANY</span>
-<span class="cm"> * KIND, either express or implied.  See the License for 
the</span>
-<span class="cm"> * specific language governing permissions and 
limitations</span>
-<span class="cm"> * under the License.</span>
-<span class="cm"> */</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="cm">/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */</span>
 <span class="kn">package</span> <span 
class="nn">samza.examples.wikipedia.application</span><span class="o">;</span>
 
 <span class="kn">import</span> <span 
class="nn">org.apache.samza.application.StreamApplication</span><span 
class="o">;</span>
 <span class="kn">import</span> <span 
class="nn">org.apache.samza.config.Config</span><span class="o">;</span>
 <span class="kn">import</span> <span 
class="nn">org.apache.samza.operators.StreamGraph</span><span class="o">;</span>
 
-<span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">MyWikipediaApplication</span> <span class="kd">implements</span> 
<span class="n">StreamApplication</span><span class="o">{</span>
+<span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">MyWikipediaApplication</span> <span class="kd">implements</span> 
<span class="nc">StreamApplication</span><span class="o">{</span>
   <span class="nd">@Override</span>
-  <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">init</span><span class="o">(</span><span 
class="n">StreamGraph</span> <span class="n">streamGraph</span><span 
class="o">,</span> <span class="n">Config</span> <span 
class="n">config</span><span class="o">)</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">init</span><span class="o">(</span><span 
class="nc">StreamGraph</span> <span class="n">streamGraph</span><span 
class="o">,</span> <span class="nc">Config</span> <span 
class="n">config</span><span class="o">)</span> <span class="o">{</span>
     
   <span class="o">}</span>
 <span class="o">}</span></code></pre></figure>
@@ -719,63 +719,59 @@ streams.en-wikinews.samza.physical.name<
 <p>Next, we will declare the input streams for the Wikipedia application.</p>
 
 <h4 id="inputs">Inputs</h4>
+<p>The Wikipedia application consumes events from three channels. Let’s 
declare each of those channels as an input streams via the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/StreamGraph.html">StreamGraph</a>
 in the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a>
 method.</p>
 
-<p>The Wikipedia application consumes events from three channels. Let&rsquo;s 
declare each of those channels as an input streams via the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/StreamGraph.html">StreamGraph</a>
 in the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a>
 method.</p>
-
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">WikipediaFeedEvent</span><span 
class="o">&gt;</span> <span class="n">wikipediaEvents</span> <span 
class="o">=</span> <span class="n">streamGraph</span><span 
class="o">.</span><span class="na">getInputStream</span><span 
class="o">(</span><span class="s">&quot;en-wikipedia&quot;</span><span 
class="o">,</span> <span class="k">new</span> <span 
class="n">NoOpSerde</span><span class="o">&lt;&gt;());</span>
-<span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">WikipediaFeedEvent</span><span class="o">&gt;</span> <span 
class="n">wiktionaryEvents</span> <span class="o">=</span> <span 
class="n">streamGraph</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="s">&quot;en-wiktionary&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;());</span>
-<span class="n">MessageStream</span><span class="o">&lt;</span><span 
class="n">WikipediaFeedEvent</span><span class="o">&gt;</span> <span 
class="n">wikiNewsEvents</span> <span class="o">=</span> <span 
class="n">streamGraph</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="s">&quot;en-wikinews&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">NoOpSerde</span><span 
class="o">&lt;&gt;());</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="nc">MessageStream</span><span 
class="o">&lt;</span><span class="nc">WikipediaFeedEvent</span><span 
class="o">&gt;</span> <span class="n">wikipediaEvents</span> <span 
class="o">=</span> <span class="n">streamGraph</span><span 
class="o">.</span><span class="na">getInputStream</span><span 
class="o">(</span><span class="s">"en-wikipedia"</span><span class="o">,</span> 
<span class="k">new</span> <span class="nc">NoOpSerde</span><span 
class="o">&lt;&gt;());</span>
+<span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">WikipediaFeedEvent</span><span class="o">&gt;</span> <span 
class="n">wiktionaryEvents</span> <span class="o">=</span> <span 
class="n">streamGraph</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="s">"en-wiktionary"</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">NoOpSerde</span><span 
class="o">&lt;&gt;());</span>
+<span class="nc">MessageStream</span><span class="o">&lt;</span><span 
class="nc">WikipediaFeedEvent</span><span class="o">&gt;</span> <span 
class="n">wikiNewsEvents</span> <span class="o">=</span> <span 
class="n">streamGraph</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="s">"en-wikinews"</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">NoOpSerde</span><span 
class="o">&lt;&gt;());</span></code></pre></figure>
 
 <p>The first argument to the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/StreamGraph.html#getInputStream-java.lang.String-org.apache.samza.serializers.Serde-">getInputStream</a>
 method is the stream ID. Each ID must match the corresponding stream IDs we 
configured earlier.
-The second argument is the <code>Serde</code> used to deserialize the message. 
We&rsquo;ve set this to a <code>NoOpSerde</code> since our 
<code>wikipedia</code> system already returns <code>WikipediaFeedEvent</code>s 
and there is no need for further deserialization.</p>
+The second argument is the <code class="language-plaintext 
highlighter-rouge">Serde</code> used to deserialize the message. We’ve set 
this to a <code class="language-plaintext highlighter-rouge">NoOpSerde</code> 
since our <code class="language-plaintext highlighter-rouge">wikipedia</code> 
system already returns <code class="language-plaintext 
highlighter-rouge">WikipediaFeedEvent</code>s and there is no need for further 
deserialization.</p>
 
 <p>Note the streams are all MessageStreams of type WikipediaFeedEvent. <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html">MessageStream</a>
 is the in-memory representation of a stream in Samza. It uses generics to 
ensure type safety across the streams and operations.</p>
 
 <h4 id="merge">Merge</h4>
-
-<p>We&rsquo;d like to use the same processing logic for all three input 
streams, so we will use the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-">mergeAll</a>
 operator to merge them together. Note: this is not the same as a <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#join-org.apache.samza.operators.MessageStream-org.apache.samza.operators.functions.JoinFunction-java.time.Duration-">join</a>
 because we are not associating events by key. We are simply combining three 
streams into one, like a union.</p>
+<p>We’d like to use the same processing logic for all three input streams, 
so we will use the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-">mergeAll</a>
 operator to merge them together. Note: this is not the same as a <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#join-org.apache.samza.operators.MessageStream-org.apache.samza.operators.functions.JoinFunction-java.time.Duration-">join</a>
 because we are not associating events by key. We are simply combining three 
streams into one, like a union.</p>
 
 <p>Add the following snippet to the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a>
 method. It merges all the input streams into a new one called 
<em>allWikipediaEvents</em></p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">WikipediaFeed</span><span 
class="o">.</span><span class="na">WikipediaFeedEvent</span><span 
class="o">&gt;</span> <span class="n">allWikipediaEvents</span> <span 
class="o">=</span> <span class="n">MessageStream</span><span 
class="o">.</span><span class="na">mergeAll</span><span class="o">(</span><span 
class="n">ImmutableList</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="n">wikipediaEvents</span><span class="o">,</span> <span 
class="n">wiktionaryEvents</span><span class="o">,</span> <span 
class="n">wikiNewsEvents</span><span class="o">));</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="nc">MessageStream</span><span 
class="o">&lt;</span><span class="nc">WikipediaFeed</span><span 
class="o">.</span><span class="na">WikipediaFeedEvent</span><span 
class="o">&gt;</span> <span class="n">allWikipediaEvents</span> <span 
class="o">=</span> <span class="nc">MessageStream</span><span 
class="o">.</span><span class="na">mergeAll</span><span class="o">(</span><span 
class="nc">ImmutableList</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="n">wikipediaEvents</span><span class="o">,</span> <span 
class="n">wiktionaryEvents</span><span class="o">,</span> <span 
class="n">wikiNewsEvents</span><span class="o">));</span></code></pre></figure>
 
 <p>Note there is a <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#merge-java.util.Collection-">merge</a>
 operator instance method on MessageStream, but the static <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-">mergeAll</a>
 method is a more convenient alternative if you need to merge many streams.</p>
 
 <h4 id="parse">Parse</h4>
+<p>The next step is to parse the events and extract some information. We will 
use the pre-existing `WikipediaParser.parseEvent()’ method to do this. The 
parser extracts some flags we want to monitor as well as some metadata about 
the event. Inspect the method signature. The input is a WikipediaFeedEvents and 
the output is a Map&lt;String, Object&gt;. These types will be reflected in the 
types of the streams before and after the operation.</p>
 
-<p>The next step is to parse the events and extract some information. We will 
use the pre-existing `WikipediaParser.parseEvent()&lsquo; method to do this. 
The parser extracts some flags we want to monitor as well as some metadata 
about the event. Inspect the method signature. The input is a 
WikipediaFeedEvents and the output is a Map<String, Object>. These types will 
be reflected in the types of the streams before and after the operation.</p>
+<p>In the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a>
 method, invoke the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-">map</a>
 operation on <code class="language-plaintext 
highlighter-rouge">allWikipediaEvents</code>, passing the <code 
class="language-plaintext highlighter-rouge">WikipediaParser::parseEvent</code> 
method reference as follows:</p>
 
-<p>In the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a>
 method, invoke the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-">map</a>
 operation on <code>allWikipediaEvents</code>, passing the 
<code>WikipediaParser::parseEvent</code> method reference as follows:</p>
-
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">allWikipediaEvents</span><span 
class="o">.</span><span class="na">map</span><span class="o">(</span><span 
class="n">WikipediaParser</span><span class="o">::</span><span 
class="n">parseEvent</span><span class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">allWikipediaEvents</span><span 
class="o">.</span><span class="na">map</span><span class="o">(</span><span 
class="nl">WikipediaParser:</span><span class="o">:</span><span 
class="n">parseEvent</span><span class="o">);</span></code></pre></figure>
 
 <h4 id="window">Window</h4>
-
-<p>Now that we have the relevant information extracted, let&rsquo;s perform 
some aggregations over a 10-second <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/Window.html">window</a>.</p>
+<p>Now that we have the relevant information extracted, let’s perform some 
aggregations over a 10-second <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/Window.html">window</a>.</p>
 
 <p>First, we need a container class for statistics we want to track. Add the 
following static class after the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a>
 method.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kd">private</span> <span 
class="kd">static</span> <span class="kd">class</span> <span 
class="nc">WikipediaStats</span> <span class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="kd">private</span> <span class="kd">static</span> 
<span class="kd">class</span> <span class="nc">WikipediaStats</span> <span 
class="o">{</span>
   <span class="kt">int</span> <span class="n">edits</span> <span 
class="o">=</span> <span class="mi">0</span><span class="o">;</span>
   <span class="kt">int</span> <span class="n">byteDiff</span> <span 
class="o">=</span> <span class="mi">0</span><span class="o">;</span>
-  <span class="n">Set</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">titles</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">HashSet</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;();</span>
-  <span class="n">Map</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">counts</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">HashMap</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;();</span>
+  <span class="nc">Set</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">&gt;</span> <span 
class="n">titles</span> <span class="o">=</span> <span class="k">new</span> 
<span class="nc">HashSet</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">&gt;();</span>
+  <span class="nc">Map</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">counts</span> <span class="o">=</span> <span class="k">new</span> 
<span class="nc">HashMap</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">Integer</span><span class="o">&gt;();</span>
 <span class="o">}</span></code></pre></figure>
 
-<p>Now we need to define the logic to aggregate the stats over the duration of 
the window. To do this, we implement <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FoldLeftFunction.html">FoldLeftFunction</a>
 by adding the following class after the <code>WikipediaStats</code> class:</p>
+<p>Now we need to define the logic to aggregate the stats over the duration of 
the window. To do this, we implement <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FoldLeftFunction.html">FoldLeftFunction</a>
 by adding the following class after the <code class="language-plaintext 
highlighter-rouge">WikipediaStats</code> class:</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kd">private</span> <span 
class="kd">class</span> <span class="nc">WikipediaStatsAggregator</span> <span 
class="kd">implements</span> <span class="n">FoldLeftFunction</span><span 
class="o">&lt;</span><span class="n">Map</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Object</span><span class="o">&gt;,</span> <span 
class="n">WikipediaStats</span><span class="o">&gt;</span> <span 
class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="kd">private</span> <span class="kd">class</span> 
<span class="nc">WikipediaStatsAggregator</span> <span 
class="kd">implements</span> <span class="nc">FoldLeftFunction</span><span 
class="o">&lt;</span><span class="nc">Map</span><span 
class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> 
<span class="nc">Object</span><span class="o">&gt;,</span> <span 
class="nc">WikipediaStats</span><span class="o">&gt;</span> <span 
class="o">{</span>
 
   <span class="nd">@Override</span>
-  <span class="kd">public</span> <span class="n">WikipediaStats</span> <span 
class="nf">apply</span><span class="o">(</span><span class="n">Map</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Object</span><span class="o">&gt;</span> <span 
class="n">edit</span><span class="o">,</span> <span 
class="n">WikipediaStats</span> <span class="n">stats</span><span 
class="o">)</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="nc">WikipediaStats</span> <span 
class="nf">apply</span><span class="o">(</span><span class="nc">Map</span><span 
class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> 
<span class="nc">Object</span><span class="o">&gt;</span> <span 
class="n">edit</span><span class="o">,</span> <span 
class="nc">WikipediaStats</span> <span class="n">stats</span><span 
class="o">)</span> <span class="o">{</span>
     <span class="c1">// Update window stats</span>
     <span class="n">stats</span><span class="o">.</span><span 
class="na">edits</span><span class="o">++;</span>
-    <span class="n">stats</span><span class="o">.</span><span 
class="na">byteDiff</span> <span class="o">+=</span> <span 
class="o">(</span><span class="n">Integer</span><span class="o">)</span> <span 
class="n">edit</span><span class="o">.</span><span class="na">get</span><span 
class="o">(</span><span class="s">&quot;diff-bytes&quot;</span><span 
class="o">);</span>
-    <span class="n">stats</span><span class="o">.</span><span 
class="na">titles</span><span class="o">.</span><span 
class="na">add</span><span class="o">((</span><span 
class="n">String</span><span class="o">)</span> <span 
class="n">edit</span><span class="o">.</span><span class="na">get</span><span 
class="o">(</span><span class="s">&quot;title&quot;</span><span 
class="o">));</span>
+    <span class="n">stats</span><span class="o">.</span><span 
class="na">byteDiff</span> <span class="o">+=</span> <span 
class="o">(</span><span class="nc">Integer</span><span class="o">)</span> <span 
class="n">edit</span><span class="o">.</span><span class="na">get</span><span 
class="o">(</span><span class="s">"diff-bytes"</span><span class="o">);</span>
+    <span class="n">stats</span><span class="o">.</span><span 
class="na">titles</span><span class="o">.</span><span 
class="na">add</span><span class="o">((</span><span 
class="nc">String</span><span class="o">)</span> <span 
class="n">edit</span><span class="o">.</span><span class="na">get</span><span 
class="o">(</span><span class="s">"title"</span><span class="o">));</span>
 
-    <span class="n">Map</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Boolean</span><span class="o">&gt;</span> <span 
class="n">flags</span> <span class="o">=</span> <span class="o">(</span><span 
class="n">Map</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Boolean</span><span class="o">&gt;)</span> <span 
class="n">edit</span><span class="o">.</span><span class="na">get</span><span 
class="o">(</span><span class="s">&quot;flags&quot;</span><span 
class="o">);</span>
-    <span class="k">for</span> <span class="o">(</span><span 
class="n">Map</span><span class="o">.</span><span class="na">Entry</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Boolean</span><span class="o">&gt;</span> <span 
class="n">flag</span> <span class="o">:</span> <span 
class="n">flags</span><span class="o">.</span><span 
class="na">entrySet</span><span class="o">())</span> <span class="o">{</span>
-      <span class="k">if</span> <span class="o">(</span><span 
class="n">Boolean</span><span class="o">.</span><span 
class="na">TRUE</span><span class="o">.</span><span 
class="na">equals</span><span class="o">(</span><span 
class="n">flag</span><span class="o">.</span><span 
class="na">getValue</span><span class="o">()))</span> <span class="o">{</span>
+    <span class="nc">Map</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">Boolean</span><span class="o">&gt;</span> <span 
class="n">flags</span> <span class="o">=</span> <span class="o">(</span><span 
class="nc">Map</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">Boolean</span><span class="o">&gt;)</span> <span 
class="n">edit</span><span class="o">.</span><span class="na">get</span><span 
class="o">(</span><span class="s">"flags"</span><span class="o">);</span>
+    <span class="k">for</span> <span class="o">(</span><span 
class="nc">Map</span><span class="o">.</span><span class="na">Entry</span><span 
class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> 
<span class="nc">Boolean</span><span class="o">&gt;</span> <span 
class="n">flag</span> <span class="o">:</span> <span 
class="n">flags</span><span class="o">.</span><span 
class="na">entrySet</span><span class="o">())</span> <span class="o">{</span>
+      <span class="k">if</span> <span class="o">(</span><span 
class="nc">Boolean</span><span class="o">.</span><span 
class="na">TRUE</span><span class="o">.</span><span 
class="na">equals</span><span class="o">(</span><span 
class="n">flag</span><span class="o">.</span><span 
class="na">getValue</span><span class="o">()))</span> <span class="o">{</span>
         <span class="n">stats</span><span class="o">.</span><span 
class="na">counts</span><span class="o">.</span><span 
class="na">compute</span><span class="o">(</span><span 
class="n">flag</span><span class="o">.</span><span 
class="na">getKey</span><span class="o">(),</span> <span 
class="o">(</span><span class="n">k</span><span class="o">,</span> <span 
class="n">v</span><span class="o">)</span> <span class="o">-&gt;</span> <span 
class="n">v</span> <span class="o">==</span> <span class="kc">null</span> <span 
class="o">?</span> <span class="mi">0</span> <span class="o">:</span> <span 
class="n">v</span> <span class="o">+</span> <span class="mi">1</span><span 
class="o">);</span>
       <span class="o">}</span>
     <span class="o">}</span>
@@ -784,29 +780,28 @@ The second argument is the <code>Serde</
   <span class="o">}</span>
 <span class="o">}</span></code></pre></figure>
 
-<p>Note: the type parameters for <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FoldLeftFunction.html">FoldLeftFunction</a>
 reflect the upstream data type and the window value type, respectively. In our 
case, the upstream type is the output of the parser and the window value is our 
<code>WikipediaStats</code> class.</p>
+<p>Note: the type parameters for <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FoldLeftFunction.html">FoldLeftFunction</a>
 reflect the upstream data type and the window value type, respectively. In our 
case, the upstream type is the output of the parser and the window value is our 
<code class="language-plaintext highlighter-rouge">WikipediaStats</code> 
class.</p>
 
 <p>Finally, we can define our <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/Window.html">window</a>
 back in the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a>
 method by chaining the result of the parser:</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">allWikipediaEvents</span><span 
class="o">.</span><span class="na">map</span><span class="o">(</span><span 
class="n">WikipediaParser</span><span class="o">::</span><span 
class="n">parseEvent</span><span class="o">)</span>
-        <span class="o">.</span><span class="na">window</span><span 
class="o">(</span><span class="n">Windows</span><span class="o">.</span><span 
class="na">tumblingWindow</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">10</span><span class="o">),</span> <span 
class="n">WikipediaStats</span><span class="o">::</span><span 
class="k">new</span><span class="o">,</span> <span class="k">new</span> <span 
class="n">WikipediaStatsAggregator</span><span class="o">(),</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">WikipediaStats</span><span 
class="o">.</span><span class="na">class</span><span 
class="o">)));</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">allWikipediaEvents</span><span 
class="o">.</span><span class="na">map</span><span class="o">(</span><span 
class="nl">WikipediaParser:</span><span class="o">:</span><span 
class="n">parseEvent</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">window</span><span 
class="o">(</span><span class="nc">Windows</span><span class="o">.</span><span 
class="na">tumblingWindow</span><span class="o">(</span><span 
class="nc">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">10</span><span class="o">),</span> <span 
class="nl">WikipediaStats:</span><span class="o">:</span><span 
class="k">new</span><span class="o">,</span> <span class="k">new</span> <span 
class="nc">WikipediaStatsAggregator</span><span class="o">(),</span> <span 
class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">WikipediaStats</span><span 
class="o">.</span><span class="na">class</span><span 
class="o">)));</span></code></pre></figure>
 
-<p>This defines an unkeyed <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/Windows.html">tumbling
 window</a> that spans 10s, which instantiates a new 
<code>WikipediaStats</code> object at the beginning of each window and 
aggregates the stats using <code>WikipediaStatsAggregator</code>.</p>
+<p>This defines an unkeyed <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/Windows.html">tumbling
 window</a> that spans 10s, which instantiates a new <code 
class="language-plaintext highlighter-rouge">WikipediaStats</code> object at 
the beginning of each window and aggregates the stats using <code 
class="language-plaintext 
highlighter-rouge">WikipediaStatsAggregator</code>.</p>
 
-<p>The output of the window is a <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/WindowPane.html">WindowPane</a>
 with a key and value. Since we used an unkeyed tumbling window, the key is 
<code>Void</code>. The value is our <code>WikipediaStats</code> object.</p>
+<p>The output of the window is a <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/WindowPane.html">WindowPane</a>
 with a key and value. Since we used an unkeyed tumbling window, the key is 
<code class="language-plaintext highlighter-rouge">Void</code>. The value is 
our <code class="language-plaintext highlighter-rouge">WikipediaStats</code> 
object.</p>
 
 <h4 id="output">Output</h4>
+<p>We will do a <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-">map</a>
 at the end to format our window output. Let’s begin by defining a simple 
container class for our formatted output.</p>
 
-<p>We will do a <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-">map</a>
 at the end to format our window output. Let&rsquo;s begin by defining a simple 
container class for our formatted output.</p>
-
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>  <span class="kd">static</span> <span 
class="kd">class</span> <span class="nc">WikipediaStatsOutput</span> <span 
class="o">{</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">  
<span class="kd">static</span> <span class="kd">class</span> <span 
class="nc">WikipediaStatsOutput</span> <span class="o">{</span>
     <span class="kd">public</span> <span class="kt">int</span> <span 
class="n">edits</span><span class="o">;</span>
     <span class="kd">public</span> <span class="kt">int</span> <span 
class="n">bytesAdded</span><span class="o">;</span>
     <span class="kd">public</span> <span class="kt">int</span> <span 
class="n">uniqueTitles</span><span class="o">;</span>
-    <span class="kd">public</span> <span class="n">Map</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">counts</span><span class="o">;</span>
+    <span class="kd">public</span> <span class="nc">Map</span><span 
class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> 
<span class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">counts</span><span class="o">;</span>
 
     <span class="kd">public</span> <span 
class="nf">WikipediaStatsOutput</span><span class="o">(</span><span 
class="kt">int</span> <span class="n">edits</span><span class="o">,</span> 
<span class="kt">int</span> <span class="n">bytesAdded</span><span 
class="o">,</span> <span class="kt">int</span> <span 
class="n">uniqueTitles</span><span class="o">,</span>
-        <span class="n">Map</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">counts</span><span class="o">)</span> <span class="o">{</span>
+        <span class="nc">Map</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">counts</span><span class="o">)</span> <span class="o">{</span>
       <span class="k">this</span><span class="o">.</span><span 
class="na">edits</span> <span class="o">=</span> <span 
class="n">edits</span><span class="o">;</span>
       <span class="k">this</span><span class="o">.</span><span 
class="na">bytesAdded</span> <span class="o">=</span> <span 
class="n">bytesAdded</span><span class="o">;</span>
       <span class="k">this</span><span class="o">.</span><span 
class="na">uniqueTitles</span> <span class="o">=</span> <span 
class="n">uniqueTitles</span><span class="o">;</span>
@@ -816,46 +811,45 @@ The second argument is the <code>Serde</
 
 <p>Paste the following after the aggregator class:</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>  <span class="kd">private</span> <span 
class="n">WikipediaStatsOutput</span> <span class="nf">formatOutput</span><span 
class="o">(</span><span class="n">WindowPane</span><span 
class="o">&lt;</span><span class="n">Void</span><span class="o">,</span> <span 
class="n">WikipediaStats</span><span class="o">&gt;</span> <span 
class="n">statsWindowPane</span><span class="o">)</span> <span 
class="o">{</span>
-    <span class="n">WikipediaStats</span> <span class="n">stats</span> <span 
class="o">=</span> <span class="n">statsWindowPane</span><span 
class="o">.</span><span class="na">getMessage</span><span class="o">();</span>
-    <span class="k">return</span> <span class="k">new</span> <span 
class="n">WikipediaStatsOutput</span><span class="o">(</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">  
<span class="kd">private</span> <span class="nc">WikipediaStatsOutput</span> 
<span class="nf">formatOutput</span><span class="o">(</span><span 
class="nc">WindowPane</span><span class="o">&lt;</span><span 
class="nc">Void</span><span class="o">,</span> <span 
class="nc">WikipediaStats</span><span class="o">&gt;</span> <span 
class="n">statsWindowPane</span><span class="o">)</span> <span 
class="o">{</span>
+    <span class="nc">WikipediaStats</span> <span class="n">stats</span> <span 
class="o">=</span> <span class="n">statsWindowPane</span><span 
class="o">.</span><span class="na">getMessage</span><span class="o">();</span>
+    <span class="k">return</span> <span class="k">new</span> <span 
class="nf">WikipediaStatsOutput</span><span class="o">(</span>
         <span class="n">stats</span><span class="o">.</span><span 
class="na">edits</span><span class="o">,</span> <span 
class="n">stats</span><span class="o">.</span><span 
class="na">byteDiff</span><span class="o">,</span> <span 
class="n">stats</span><span class="o">.</span><span 
class="na">titles</span><span class="o">.</span><span 
class="na">size</span><span class="o">(),</span> <span 
class="n">stats</span><span class="o">.</span><span 
class="na">counts</span><span class="o">);</span>
   <span class="o">}</span></code></pre></figure>
 
 <p>Now, we can invoke the method by adding another <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-">map</a>
 operation to the chain in <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-">init</a>.
 The operator chain should now look like this:</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">allWikipediaEvents</span><span 
class="o">.</span><span class="na">map</span><span class="o">(</span><span 
class="n">WikipediaParser</span><span class="o">::</span><span 
class="n">parseEvent</span><span class="o">)</span>
-        <span class="o">.</span><span class="na">window</span><span 
class="o">(</span><span class="n">Windows</span><span class="o">.</span><span 
class="na">tumblingWindow</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">10</span><span class="o">),</span> <span 
class="n">WikipediaStats</span><span class="o">::</span><span 
class="k">new</span><span class="o">,</span> <span class="k">new</span> <span 
class="n">WikipediaStatsAggregator</span><span class="o">()))</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">allWikipediaEvents</span><span 
class="o">.</span><span class="na">map</span><span class="o">(</span><span 
class="nl">WikipediaParser:</span><span class="o">:</span><span 
class="n">parseEvent</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">window</span><span 
class="o">(</span><span class="nc">Windows</span><span class="o">.</span><span 
class="na">tumblingWindow</span><span class="o">(</span><span 
class="nc">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">10</span><span class="o">),</span> <span 
class="nl">WikipediaStats:</span><span class="o">:</span><span 
class="k">new</span><span class="o">,</span> <span class="k">new</span> <span 
class="nc">WikipediaStatsAggregator</span><span class="o">()))</span>
         <span class="o">.</span><span class="na">map</span><span 
class="o">(</span><span class="k">this</span><span class="o">::</span><span 
class="n">formatOutput</span><span class="o">);</span></code></pre></figure>
 
 <p>Next we need to get the output stream to which we will send the stats. 
Insert the following line below the creation of the 3 input streams:</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>  <span class="n">OutputStream</span><span 
class="o">&lt;</span><span class="n">WikipediaStatsOutput</span><span 
class="o">&gt;</span> <span class="n">wikipediaStats</span> <span 
class="o">=</span>
-     <span class="n">graph</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="s">&quot;wikipedia-stats&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">WikipediaStatsOutput</span><span 
class="o">.</span><span class="na">class</span><span 
class="o">));</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">  
<span class="nc">OutputStream</span><span class="o">&lt;</span><span 
class="nc">WikipediaStatsOutput</span><span class="o">&gt;</span> <span 
class="n">wikipediaStats</span> <span class="o">=</span>
+     <span class="n">graph</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="s">"wikipedia-stats"</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">WikipediaStatsOutput</span><span 
class="o">.</span><span class="na">class</span><span 
class="o">));</span></code></pre></figure>
 
 <p>The <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/OutputStream.html">OutputStream</a>
 is parameterized by the type of the output.</p>
 
-<p>The first parameter of <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/StreamGraph.html#getOutputStream-java.lang.String-org.apache.samza.serializers.Serde-">getOutputStream</a>
 is the output stream ID. We will use <em>wikipedia-stats</em> and since it 
contains no special characters, we won&rsquo;t bother configuring a physical 
name so Samza will use the stream ID as the topic name.
-The second parameter is the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/serializers/Serde.html">Serde</a>
 to serialize the outgoing message. We will set it to <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/serializers/JsonSerdeV2.html">JsonSerdeV2</a>
 to serialize our <code>WikipediaStatsOutput</code> as a JSON string.</p>
+<p>The first parameter of <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/StreamGraph.html#getOutputStream-java.lang.String-org.apache.samza.serializers.Serde-">getOutputStream</a>
 is the output stream ID. We will use <em>wikipedia-stats</em> and since it 
contains no special characters, we won’t bother configuring a physical name 
so Samza will use the stream ID as the topic name.
+The second parameter is the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/serializers/Serde.html">Serde</a>
 to serialize the outgoing message. We will set it to <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/serializers/JsonSerdeV2.html">JsonSerdeV2</a>
 to serialize our <code class="language-plaintext 
highlighter-rouge">WikipediaStatsOutput</code> as a JSON string.</p>
 
 <p>Finally, we can send our output to the output stream using the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#sendTo-org.apache.samza.operators.OutputStream-">sendTo</a>
 operator:</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">allWikipediaEvents</span><span 
class="o">.</span><span class="na">map</span><span class="o">(</span><span 
class="n">WikipediaParser</span><span class="o">::</span><span 
class="n">parseEvent</span><span class="o">)</span>
-        <span class="o">.</span><span class="na">window</span><span 
class="o">(</span><span class="n">Windows</span><span class="o">.</span><span 
class="na">tumblingWindow</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">10</span><span class="o">),</span> <span 
class="n">WikipediaStats</span><span class="o">::</span><span 
class="k">new</span><span class="o">,</span> <span class="k">new</span> <span 
class="n">WikipediaStatsAggregator</span><span class="o">()))</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">allWikipediaEvents</span><span 
class="o">.</span><span class="na">map</span><span class="o">(</span><span 
class="nl">WikipediaParser:</span><span class="o">:</span><span 
class="n">parseEvent</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">window</span><span 
class="o">(</span><span class="nc">Windows</span><span class="o">.</span><span 
class="na">tumblingWindow</span><span class="o">(</span><span 
class="nc">Duration</span><span class="o">.</span><span 
class="na">ofSeconds</span><span class="o">(</span><span 
class="mi">10</span><span class="o">),</span> <span 
class="nl">WikipediaStats:</span><span class="o">:</span><span 
class="k">new</span><span class="o">,</span> <span class="k">new</span> <span 
class="nc">WikipediaStatsAggregator</span><span class="o">()))</span>
         <span class="o">.</span><span class="na">map</span><span 
class="o">(</span><span class="k">this</span><span class="o">::</span><span 
class="n">formatOutput</span><span class="o">)</span>
         <span class="o">.</span><span class="na">sendTo</span><span 
class="o">(</span><span class="n">wikipediaStats</span><span 
class="o">);</span></code></pre></figure>
 
-<p>Tip: Because the MessageStream type information is preserved in the 
operator chain, it is often easier to define the OutputStream inline with the 
<a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#sendTo-org.apache.samza.operators.OutputStream-">sendTo</a>
 operator and then refactor it for readability. That way you don&rsquo;t have 
to hunt down the types.</p>
+<p>Tip: Because the MessageStream type information is preserved in the 
operator chain, it is often easier to define the OutputStream inline with the 
<a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#sendTo-org.apache.samza.operators.OutputStream-">sendTo</a>
 operator and then refactor it for readability. That way you don’t have to 
hunt down the types.</p>
 
 <h4 id="kvstore">KVStore</h4>
-
 <p>We now have an operational Wikipedia application which provides stats 
aggregated over a 10 second interval. One of those stats is a count of the 
number of edits within the 10s window. But what if we want to keep an 
additional durable counter of the total edits?</p>
 
 <p>We will do this by keeping a separate count outside the window and 
persisting it in a <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/storage/kv/KeyValueStore.html">KeyValueStore</a>.</p>
 
 <p>We start by defining the store in the config file:</p>
 
-<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span>serializers.registry.string.class<span 
class="o">=</span>org.apache.samza.serializers.StringSerdeFactory
+<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash">serializers.registry.string.class<span 
class="o">=</span>org.apache.samza.serializers.StringSerdeFactory
 serializers.registry.integer.class<span 
class="o">=</span>org.apache.samza.serializers.IntegerSerdeFactory
 
 stores.wikipedia-stats.factory<span 
class="o">=</span>org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
@@ -863,40 +857,39 @@ stores.wikipedia-stats.changelog<span cl
 stores.wikipedia-stats.key.serde<span class="o">=</span>string
 stores.wikipedia-stats.msg.serde<span 
class="o">=</span>integer</code></pre></figure>
 
-<p>These properties declare a <a href="http://rocksdb.org/";>RocksDB</a> 
key-value store named &ldquo;wikipedia-stats&rdquo;. The store is replicated to 
a changelog stream called &ldquo;wikipedia-stats-changelog&rdquo; on the 
<em>kafka</em> system for durability. It uses the <em>string</em> and 
<em>integer</em> serdes for keys and values respectively.</p>
+<p>These properties declare a <a href="http://rocksdb.org/";>RocksDB</a> 
key-value store named “wikipedia-stats”. The store is replicated to a 
changelog stream called “wikipedia-stats-changelog” on the <em>kafka</em> 
system for durability. It uses the <em>string</em> and <em>integer</em> serdes 
for keys and values respectively.</p>
 
-<p>Next, we add a total count member variable to the 
<code>WikipediaStats</code> class, and to the <code>WikipediaStatsOutput</code> 
class:</p>
+<p>Next, we add a total count member variable to the <code 
class="language-plaintext highlighter-rouge">WikipediaStats</code> class, and 
to the <code class="language-plaintext 
highlighter-rouge">WikipediaStatsOutput</code> class:</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kt">int</span> <span 
class="n">totalEdits</span> <span class="o">=</span> <span 
class="mi">0</span><span class="o">;</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="kt">int</span> <span class="n">totalEdits</span> 
<span class="o">=</span> <span class="mi">0</span><span 
class="o">;</span></code></pre></figure>
 
-<p>To use the store in the application, we need to get it from the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/task/TaskContext.html">TaskContext</a>.
 Also, since we want to emit the total edit count along with the window edit 
count, it&rsquo;s easiest to update both of them in our aggregator. Declare the 
store as a member variable of the <code>WikipediaStatsAggregator</code> 
class:</p>
+<p>To use the store in the application, we need to get it from the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/task/TaskContext.html">TaskContext</a>.
 Also, since we want to emit the total edit count along with the window edit 
count, it’s easiest to update both of them in our aggregator. Declare the 
store as a member variable of the <code class="language-plaintext 
highlighter-rouge">WikipediaStatsAggregator</code> class:</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kd">private</span> <span 
class="n">KeyValueStore</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">store</span><span class="o">;</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="kd">private</span> <span 
class="nc">KeyValueStore</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">Integer</span><span class="o">&gt;</span> <span 
class="n">store</span><span class="o">;</span></code></pre></figure>
 
-<p>Then override the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/InitableFunction.html#init-org.apache.samza.context.Context-">init</a>
 method in <code>WikipediaStatsAggregator</code> to initialize the store.</p>
+<p>Then override the <a 
href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/InitableFunction.html#init-org.apache.samza.context.Context-">init</a>
 method in <code class="language-plaintext 
highlighter-rouge">WikipediaStatsAggregator</code> to initialize the store.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="nd">@Override</span>
-<span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">init</span><span class="o">(</span><span class="n">Config</span> 
<span class="n">config</span><span class="o">,</span> <span 
class="n">TaskContext</span> <span class="n">context</span><span 
class="o">)</span> <span class="o">{</span>
-  <span class="n">store</span> <span class="o">=</span> <span 
class="o">(</span><span class="n">KeyValueStore</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Integer</span><span class="o">&gt;)</span> <span 
class="n">context</span><span class="o">.</span><span 
class="na">getStore</span><span class="o">(</span><span 
class="s">&quot;wikipedia-stats&quot;</span><span class="o">);</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="nd">@Override</span>
+<span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">init</span><span class="o">(</span><span class="nc">Config</span> 
<span class="n">config</span><span class="o">,</span> <span 
class="nc">TaskContext</span> <span class="n">context</span><span 
class="o">)</span> <span class="o">{</span>
+  <span class="n">store</span> <span class="o">=</span> <span 
class="o">(</span><span class="nc">KeyValueStore</span><span 
class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> 
<span class="nc">Integer</span><span class="o">&gt;)</span> <span 
class="n">context</span><span class="o">.</span><span 
class="na">getStore</span><span class="o">(</span><span 
class="s">"wikipedia-stats"</span><span class="o">);</span>
 <span class="o">}</span></code></pre></figure>
 
-<p>Update and persist the counter in the <code>apply</code> method.</p>
+<p>Update and persist the counter in the <code class="language-plaintext 
highlighter-rouge">apply</code> method.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">Integer</span> <span 
class="n">editsAllTime</span> <span class="o">=</span> <span 
class="n">store</span><span class="o">.</span><span class="na">get</span><span 
class="o">(</span><span class="s">&quot;count-edits-all-time&quot;</span><span 
class="o">);</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="nc">Integer</span> <span 
class="n">editsAllTime</span> <span class="o">=</span> <span 
class="n">store</span><span class="o">.</span><span class="na">get</span><span 
class="o">(</span><span class="s">"count-edits-all-time"</span><span 
class="o">);</span>
 <span class="k">if</span> <span class="o">(</span><span 
class="n">editsAllTime</span> <span class="o">==</span> <span 
class="kc">null</span><span class="o">)</span> <span 
class="n">editsAllTime</span> <span class="o">=</span> <span 
class="mi">0</span><span class="o">;</span>
 <span class="n">editsAllTime</span><span class="o">++;</span>
-<span class="n">store</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;count-edits-all-time&quot;</span><span class="o">,</span> <span 
class="n">editsAllTime</span><span class="o">);</span>
+<span class="n">store</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">"count-edits-all-time"</span><span class="o">,</span> <span 
class="n">editsAllTime</span><span class="o">);</span>
 <span class="n">stats</span><span class="o">.</span><span 
class="na">totalEdits</span> <span class="o">=</span> <span 
class="n">editsAllTime</span><span class="o">;</span></code></pre></figure>
 
-<p>Finally, update the <code>MyWikipediaApplication#formatOutput</code> method 
to include the total counter in its <code>WikipediaStatsOutput</code>.</p>
+<p>Finally, update the <code class="language-plaintext 
highlighter-rouge">MyWikipediaApplication#formatOutput</code> method to include 
the total counter in its <code class="language-plaintext 
highlighter-rouge">WikipediaStatsOutput</code>.</p>
 
 <h4 id="metrics">Metrics</h4>
-
-<p>Lastly, let&rsquo;s add a metric to the application which counts the number 
of repeat edits each topic within the window interval.</p>
+<p>Lastly, let’s add a metric to the application which counts the number of 
repeat edits each topic within the window interval.</p>
 
 <p>As with the key-value store, we must first define the metrics reporters in 
the config file.</p>
 
-<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span>metrics.reporters<span 
class="o">=</span>snapshot,jmx
+<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash">metrics.reporters<span class="o">=</span>snapshot,jmx
 metrics.reporter.snapshot.class<span 
class="o">=</span>org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
 metrics.reporter.snapshot.stream<span class="o">=</span>kafka.metrics
 metrics.reporter.jmx.class<span 
class="o">=</span>org.apache.samza.metrics.reporter.JmxReporterFactory</code></pre></figure>
@@ -905,28 +898,26 @@ metrics.reporter.jmx.class<span class="o
 
 <p>In the WikipediaStatsAggregator, declare a counter member variable.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kd">private</span> <span 
class="n">Counter</span> <span class="n">repeatEdits</span><span 
class="o">;</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="kd">private</span> <span 
class="nc">Counter</span> <span class="n">repeatEdits</span><span 
class="o">;</span></code></pre></figure>
 
-<p>Then add the following to the <code>WikipediaStatsAggregator#init</code> 
method to initialize the counter.</p>
+<p>Then add the following to the <code class="language-plaintext 
highlighter-rouge">WikipediaStatsAggregator#init</code> method to initialize 
the counter.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">repeatEdits</span> <span 
class="o">=</span> <span class="n">context</span><span class="o">.</span><span 
class="na">getMetricsRegistry</span><span class="o">().</span><span 
class="na">newCounter</span><span class="o">(</span><span 
class="s">&quot;edit-counters&quot;</span><span class="o">,</span> <span 
class="s">&quot;repeat-edits&quot;</span><span 
class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">repeatEdits</span> <span class="o">=</span> 
<span class="n">context</span><span class="o">.</span><span 
class="na">getMetricsRegistry</span><span class="o">().</span><span 
class="na">newCounter</span><span class="o">(</span><span 
class="s">"edit-counters"</span><span class="o">,</span> <span 
class="s">"repeat-edits"</span><span class="o">);</span></code></pre></figure>
 
-<p>Update and persist the counter from the <code>apply</code> method.</p>
+<p>Update and persist the counter from the <code class="language-plaintext 
highlighter-rouge">apply</code> method.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kt">boolean</span> <span 
class="n">newTitle</span> <span class="o">=</span> <span 
class="n">stats</span><span class="o">.</span><span 
class="na">titles</span><span class="o">.</span><span 
class="na">add</span><span class="o">((</span><span 
class="n">String</span><span class="o">)</span> <span 
class="n">edit</span><span class="o">.</span><span class="na">get</span><span 
class="o">(</span><span class="s">&quot;title&quot;</span><span 
class="o">));</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="kt">boolean</span> <span 
class="n">newTitle</span> <span class="o">=</span> <span 
class="n">stats</span><span class="o">.</span><span 
class="na">titles</span><span class="o">.</span><span 
class="na">add</span><span class="o">((</span><span 
class="nc">String</span><span class="o">)</span> <span 
class="n">edit</span><span class="o">.</span><span class="na">get</span><span 
class="o">(</span><span class="s">"title"</span><span class="o">));</span>
 
 <span class="k">if</span> <span class="o">(!</span><span 
class="n">newTitle</span><span class="o">)</span> <span class="o">{</span>
   <span class="n">repeatEdits</span><span class="o">.</span><span 
class="na">inc</span><span class="o">();</span>
-  <span class="n">log</span><span class="o">.</span><span 
class="na">info</span><span class="o">(</span><span class="s">&quot;Frequent 
edits for title: {}&quot;</span><span class="o">,</span> <span 
class="n">edit</span><span class="o">.</span><span class="na">get</span><span 
class="o">(</span><span class="s">&quot;title&quot;</span><span 
class="o">));</span>
+  <span class="n">log</span><span class="o">.</span><span 
class="na">info</span><span class="o">(</span><span class="s">"Frequent edits 
for title: {}"</span><span class="o">,</span> <span class="n">edit</span><span 
class="o">.</span><span class="na">get</span><span class="o">(</span><span 
class="s">"title"</span><span class="o">));</span>
 <span class="o">}</span></code></pre></figure>
 
 <h4 id="run-and-view-plan">Run and View Plan</h4>
-
-<p>You can set up the grid and run the application using the same instructions 
from the <a href="hello-samza-high-level-yarn.html">hello samza high level API 
Yarn tutorial</a>. The only difference is to replace the 
<code>wikipedia-application.properties</code> config file in the 
<em>config-path</em> command line parameter with 
<code>my-wikipedia-application.properties</code></p>
+<p>You can set up the grid and run the application using the same instructions 
from the [hello samza high level API Yarn tutorial] 
(hello-samza-high-level-yarn.html). The only difference is to replace the <code 
class="language-plaintext 
highlighter-rouge">wikipedia-application.properties</code> config file in the 
<em>config-path</em> command line parameter with <code 
class="language-plaintext 
highlighter-rouge">my-wikipedia-application.properties</code></p>
 
 <h3 id="summary">Summary</h3>
-
-<p>Congratulations! You have built and executed a Wikipedia stream application 
on Samza using the high level API. The final application should be directly 
comparable to the pre-existing <code>WikipediaApplication</code> in the 
project.</p>
+<p>Congratulations! You have built and executed a Wikipedia stream application 
on Samza using the high level API. The final application should be directly 
comparable to the pre-existing <code class="language-plaintext 
highlighter-rouge">WikipediaApplication</code> in the project.</p>
 
 <p>You can provide feedback on this tutorial in the <a 
href="mailto:[email protected]";>dev mailing list</a>.</p>
 


Reply via email to