Modified: samza/site/learn/documentation/latest/connectors/eventhubs.html
URL: 
http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/connectors/eventhubs.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/connectors/eventhubs.html (original)
+++ samza/site/learn/documentation/latest/connectors/eventhubs.html Wed Jan 18 
19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a 
href="/learn/documentation/1.8.0/connectors/eventhubs">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/1.7.0/connectors/eventhubs">1.7.0</a></li>
+
+              
+
               <li class="hide"><a 
href="/learn/documentation/1.6.0/connectors/eventhubs">1.6.0</a></li>
 
               
@@ -639,118 +653,122 @@
    limitations under the License.
 -->
 
-<h3 id="eventhubs-i-o-quickstart">EventHubs I/O: QuickStart</h3>
+<h3 id="eventhubs-io-quickstart">EventHubs I/O: QuickStart</h3>
 
-<p>The Samza EventHubs connector provides access to <a 
href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features";>Azure
 EventHubs</a>, Microsoft’s data streaming service on Azure. An eventhub is 
similar to a Kafka topic and can have multiple partitions with producers and 
consumers. Each message produced or consumed from an event hub is an instance 
of <a 
href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data";>EventData</a>.
 </p>
+<p>The Samza EventHubs connector provides access to <a 
href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features";>Azure
 EventHubs</a>, Microsoft’s data streaming service on Azure. An eventhub is 
similar to a Kafka topic and can have multiple partitions with producers and 
consumers. Each message produced or consumed from an event hub is an instance 
of <a 
href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data";>EventData</a>.</p>
 
 <p>The <a href="https://github.com/apache/samza-hello-samza";>hello-samza</a> 
project includes an <a 
href="../../../tutorials/versioned/samza-event-hubs-standalone.html">example</a>
 of reading and writing to EventHubs.</p>
 
 <h3 id="concepts">Concepts</h3>
 
-<h4 id="eventhubssystemdescriptor">EventHubsSystemDescriptor</h4>
+<p>####EventHubsSystemDescriptor</p>
 
-<p>Samza refers to any IO source (eg: Kafka) it interacts with as a 
<em>system</em>, whose properties are set using a corresponding 
<code>SystemDescriptor</code>. The <code>EventHubsSystemDescriptor</code> 
allows you to configure various properties for the <code>EventHubsClient</code> 
used by Samza.</p>
+<p>Samza refers to any IO source (eg: Kafka) it interacts with as a 
<em>system</em>, whose properties are set using a corresponding <code 
class="language-plaintext highlighter-rouge">SystemDescriptor</code>. The <code 
class="language-plaintext highlighter-rouge">EventHubsSystemDescriptor</code> 
allows you to configure various properties for the <code 
class="language-plaintext highlighter-rouge">EventHubsClient</code> used by 
Samza.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span> <span class="mi">1</span>  <span 
class="n">EventHubsSystemDescriptor</span> <span 
class="n">eventHubsSystemDescriptor</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">EventHubsSystemDescriptor</span><span 
class="o">(</span><span class="s">&quot;eventhubs&quot;</span><span 
class="o">).</span><span class="na">withNumClientThreads</span><span 
class="o">(</span><span class="mi">5</span><span 
class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"> 
<span class="mi">1</span>  <span class="nc">EventHubsSystemDescriptor</span> 
<span class="n">eventHubsSystemDescriptor</span> <span class="o">=</span> <span 
class="k">new</span> <span class="nc">EventHubsSystemDescriptor</span><span 
class="o">(</span><span class="s">"eventhubs"</span><span 
class="o">).</span><span class="na">withNumClientThreads</span><span 
class="o">(</span><span class="mi">5</span><span 
class="o">);</span></code></pre></figure>
 
-<h4 id="eventhubsinputdescriptor">EventHubsInputDescriptor</h4>
+<p>####EventHubsInputDescriptor</p>
 
 <p>The EventHubsInputDescriptor allows you to specify the properties of each 
EventHubs stream your application should read from. For each of your input 
streams, you should create a corresponding instance of EventHubsInputDescriptor 
by providing a topic-name and a serializer.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span 
class="n">EventHubsInputDescriptor</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">inputDescriptor</span> <span class="o">=</span> 
-        <span class="n">systemDescriptor</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="n">streamId</span><span class="o">,</span> <span 
class="s">&quot;eventhubs-namespace&quot;</span><span class="o">,</span> <span 
class="s">&quot;eventhubs-name&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">StringSerde</span><span 
class="o">())</span>
-          <span class="o">.</span><span class="na">withSasKeyName</span><span 
class="o">(</span><span class="s">&quot;secretkey&quot;</span><span 
class="o">)</span>
-          <span class="o">.</span><span class="na">withSasKey</span><span 
class="o">(</span><span class="s">&quot;sasToken-123&quot;</span><span 
class="o">)</span>
-          <span class="o">.</span><span 
class="na">withConsumerGroup</span><span class="o">(</span><span 
class="s">&quot;$notdefault&quot;</span><span 
class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">EventHubsInputDescriptor</span><span 
class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">String</span><span class="o">&gt;&gt;</span> <span 
class="n">inputDescriptor</span> <span class="o">=</span> 
+        <span class="n">systemDescriptor</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="n">streamId</span><span class="o">,</span> <span 
class="s">"eventhubs-namespace"</span><span class="o">,</span> <span 
class="s">"eventhubs-name"</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">StringSerde</span><span 
class="o">())</span>
+          <span class="o">.</span><span class="na">withSasKeyName</span><span 
class="o">(</span><span class="s">"secretkey"</span><span class="o">)</span>
+          <span class="o">.</span><span class="na">withSasKey</span><span 
class="o">(</span><span class="s">"sasToken-123"</span><span class="o">)</span>
+          <span class="o">.</span><span 
class="na">withConsumerGroup</span><span class="o">(</span><span 
class="s">"$notdefault"</span><span class="o">);</span></code></pre></figure>
 
-<p>By default, messages are sent and received as byte arrays. Samza then 
de-serializes them to typed objects using your provided Serde. For example, the 
above uses a <code>StringSerde</code> to de-serialize messages.</p>
+<p>By default, messages are sent and received as byte arrays. Samza then 
de-serializes them to typed objects using your provided Serde. For example, the 
above uses a <code class="language-plaintext 
highlighter-rouge">StringSerde</code> to de-serialize messages.</p>
 
-<h4 id="eventhubsoutputdescriptor">EventHubsOutputDescriptor</h4>
+<p>####EventHubsOutputDescriptor</p>
 
-<p>Similarly, the <code>EventHubsOutputDescriptor</code> allows you to specify 
the output streams for your application. For each output stream you write to in 
EventHubs, you should create an instance of 
<code>EventHubsOutputDescriptor</code>.</p>
+<p>Similarly, the <code class="language-plaintext 
highlighter-rouge">EventHubsOutputDescriptor</code> allows you to specify the 
output streams for your application. For each output stream you write to in 
EventHubs, you should create an instance of <code class="language-plaintext 
highlighter-rouge">EventHubsOutputDescriptor</code>.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span 
class="n">EventHubsOutputDescriptor</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">outputDescriptor</span> <span class="o">=</span>
-        <span class="n">systemDescriptor</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="n">OUTPUT_STREAM_ID</span><span class="o">,</span> <span 
class="n">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span 
class="n">EVENTHUBS_OUTPUT_ENTITY</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">StringSerde</span><span 
class="o">();)</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">EventHubsOutputDescriptor</span><span 
class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">String</span><span class="o">&gt;&gt;</span> <span 
class="n">outputDescriptor</span> <span class="o">=</span>
+        <span class="n">systemDescriptor</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="no">OUTPUT_STREAM_ID</span><span class="o">,</span> <span 
class="no">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span 
class="no">EVENTHUBS_OUTPUT_ENTITY</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">StringSerde</span><span 
class="o">();)</span>
             <span class="o">.</span><span 
class="na">withSasKeyName</span><span class="o">(..)</span>
             <span class="o">.</span><span class="na">withSasKey</span><span 
class="o">(..);</span></code></pre></figure>
 
-<h4 id="security-model">Security Model</h4>
-
-<p>Each EventHubs stream is scoped to a container called a <em>namespace</em>, 
which uniquely identifies an EventHubs in a region. EventHubs&rsquo;s <a 
href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-authentication-and-security-model-overview";>security
 model</a> is based on Shared Access Signatures(SAS). 
-Hence, you should also provide your SAS keys and tokens to access the stream. 
You can generate your SAS tokens using the </p>
-
-<h4 id="data-model">Data Model</h4>
+<p>####Security Model
+Each EventHubs stream is scoped to a container called a <em>namespace</em>, 
which uniquely identifies an EventHubs in a region. EventHubs’s <a 
href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-authentication-and-security-model-overview";>security
 model</a> is based on Shared Access Signatures(SAS). 
+Hence, you should also provide your SAS keys and tokens to access the stream. 
You can generate your SAS tokens using the</p>
 
-<p>Each event produced and consumed from an EventHubs stream is an instance of 
<a 
href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data";>EventData</a>,
 which wraps a byte-array payload. When producing to EventHubs, Samza 
serializes your object into an <code>EventData</code> payload before sending it 
over the wire. Likewise, when consuming messages from EventHubs, messages are 
de-serialized into typed objects using the provided Serde. </p>
+<p>####Data Model
+Each event produced and consumed from an EventHubs stream is an instance of <a 
href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data";>EventData</a>,
 which wraps a byte-array payload. When producing to EventHubs, Samza 
serializes your object into an <code class="language-plaintext 
highlighter-rouge">EventData</code> payload before sending it over the wire. 
Likewise, when consuming messages from EventHubs, messages are de-serialized 
into typed objects using the provided Serde.</p>
 
 <h3 id="configuration">Configuration</h3>
 
-<h4 id="producer-partitioning">Producer partitioning</h4>
+<p>####Producer partitioning</p>
 
-<p>You can use <code>#withPartitioningMethod</code> to control how outgoing 
messages are partitioned. The following partitioning schemes are supported:</p>
+<p>You can use <code class="language-plaintext 
highlighter-rouge">#withPartitioningMethod</code> to control how outgoing 
messages are partitioned. The following partitioning schemes are supported:</p>
 
 <ol>
-<li><p>EVENT_HUB_HASHING: By default, Samza computes the partition for an 
outgoing message based on the hash of its partition-key. This ensures that 
events with the same key are sent to the same partition. If this option is 
chosen, the partition key should be a string. If the partition key is not set, 
the key in the message is used for partitioning.</p></li>
-<li><p>PARTITION_KEY_AS_PARTITION: In this method, each message is sent to the 
partition specified by its partition key. This requires the partition key to be 
an integer. If the key is greater than the number of partitions, a modulo 
operation will be performed on the key. Similar to EVENT_HUB_HASHING, the key 
in the message is used if the partition key is not specified.</p></li>
-<li><p>ROUND_ROBIN: In this method, outgoing messages are distributed in a 
round-robin across all partitions. The key and the partition key in the message 
are ignored.</p></li>
+  <li>
+    <p>EVENT_HUB_HASHING: By default, Samza computes the partition for an 
outgoing message based on the hash of its partition-key. This ensures that 
events with the same key are sent to the same partition. If this option is 
chosen, the partition key should be a string. If the partition key is not set, 
the key in the message is used for partitioning.</p>
+  </li>
+  <li>
+    <p>PARTITION_KEY_AS_PARTITION: In this method, each message is sent to the 
partition specified by its partition key. This requires the partition key to be 
an integer. If the key is greater than the number of partitions, a modulo 
operation will be performed on the key. Similar to EVENT_HUB_HASHING, the key 
in the message is used if the partition key is not specified.</p>
+  </li>
+  <li>
+    <p>ROUND_ROBIN: In this method, outgoing messages are distributed in a 
round-robin across all partitions. The key and the partition key in the message 
are ignored.</p>
+  </li>
 </ol>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">EventHubsSystemDescriptor</span> 
<span class="n">systemDescriptor</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">EventHubsSystemDescriptor</span><span 
class="o">(</span><span class="s">&quot;eventhubs&quot;</span><span 
class="o">)</span>
-        <span class="o">.</span><span 
class="na">withPartitioningMethod</span><span class="o">(</span><span 
class="n">PartitioningMethod</span><span class="o">.</span><span 
class="na">EVENT_HUB_HASHING</span><span 
class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="nc">EventHubsSystemDescriptor</span> <span 
class="n">systemDescriptor</span> <span class="o">=</span> <span 
class="k">new</span> <span class="nc">EventHubsSystemDescriptor</span><span 
class="o">(</span><span class="s">"eventhubs"</span><span class="o">)</span>
+        <span class="o">.</span><span 
class="na">withPartitioningMethod</span><span class="o">(</span><span 
class="nc">PartitioningMethod</span><span class="o">.</span><span 
class="na">EVENT_HUB_HASHING</span><span 
class="o">);</span></code></pre></figure>
 
 <h4 id="consumer-groups">Consumer groups</h4>
 
-<p>Event Hubs supports the notion of <a 
href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups";>consumer
 groups</a> which enable multiple applications to have their own view of the 
event stream. Each partition is exclusively consumed by one consumer in the 
group. Each event hub stream has a pre-defined consumer group named $Default. 
You can define your own consumer group for your job using 
<code>withConsumerGroup</code>.</p>
+<p>Event Hubs supports the notion of <a 
href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups";>consumer
 groups</a> which enable multiple applications to have their own view of the 
event stream. Each partition is exclusively consumed by one consumer in the 
group. Each event hub stream has a pre-defined consumer group named $Default. 
You can define your own consumer group for your job using <code 
class="language-plaintext highlighter-rouge">withConsumerGroup</code>.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">EventHubsSystemDescriptor</span> 
<span class="n">systemDescriptor</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">EventHubsSystemDescriptor</span><span 
class="o">(</span><span class="s">&quot;eventhubs&quot;</span><span 
class="o">);</span>
-<span class="n">EventHubsInputDescriptor</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">inputDescriptor</span> <span class="o">=</span>
-        <span class="n">systemDescriptor</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="n">INPUT_STREAM_ID</span><span class="o">,</span> <span 
class="n">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span 
class="n">EVENTHUBS_INPUT_ENTITY</span><span class="o">,</span> <span 
class="n">serde</span><span class="o">)</span>
-            <span class="o">.</span><span 
class="na">withConsumerGroup</span><span class="o">(</span><span 
class="s">&quot;my-group&quot;</span><span 
class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="nc">EventHubsSystemDescriptor</span> <span 
class="n">systemDescriptor</span> <span class="o">=</span> <span 
class="k">new</span> <span class="nc">EventHubsSystemDescriptor</span><span 
class="o">(</span><span class="s">"eventhubs"</span><span class="o">);</span>
+<span class="nc">EventHubsInputDescriptor</span><span 
class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">String</span><span class="o">&gt;&gt;</span> <span 
class="n">inputDescriptor</span> <span class="o">=</span>
+        <span class="n">systemDescriptor</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="no">INPUT_STREAM_ID</span><span class="o">,</span> <span 
class="no">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span 
class="no">EVENTHUBS_INPUT_ENTITY</span><span class="o">,</span> <span 
class="n">serde</span><span class="o">)</span>
+            <span class="o">.</span><span 
class="na">withConsumerGroup</span><span class="o">(</span><span 
class="s">"my-group"</span><span class="o">);</span></code></pre></figure>
 
 <h4 id="consumer-buffer-size">Consumer buffer size</h4>
 
 <p>When the consumer reads a message from EventHubs, it appends them to a 
shared producer-consumer queue corresponding to its partition. This config 
determines the per-partition queue size. Setting a higher value for this config 
typically achieves a higher throughput at the expense of increased on-heap 
memory.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span> <span class="n">EventHubsSystemDescriptor</span> 
<span class="n">systemDescriptor</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">EventHubsSystemDescriptor</span><span 
class="o">(</span><span class="s">&quot;eventhubs&quot;</span><span 
class="o">)</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java"> 
<span class="nc">EventHubsSystemDescriptor</span> <span 
class="n">systemDescriptor</span> <span class="o">=</span> <span 
class="k">new</span> <span class="nc">EventHubsSystemDescriptor</span><span 
class="o">(</span><span class="s">"eventhubs"</span><span class="o">)</span>
         <span class="o">.</span><span 
class="na">withReceiveQueueSize</span><span class="o">(</span><span 
class="mi">10</span><span class="o">);</span></code></pre></figure>
 
 <h3 id="code-walkthrough">Code walkthrough</h3>
 
-<p>In this section, we will walk through a simple pipeline that reads from one 
EventHubs stream and copies each message to another output stream. </p>
+<p>In this section, we will walk through a simple pipeline that reads from one 
EventHubs stream and copies each message to another output stream.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="mi">1</span>    <span 
class="n">EventHubsSystemDescriptor</span> <span 
class="n">systemDescriptor</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">EventHubsSystemDescriptor</span><span 
class="o">(</span><span class="s">&quot;eventhubs&quot;</span><span 
class="o">).</span><span class="na">withNumClientThreads</span><span 
class="o">(</span><span class="mi">5</span><span class="o">);</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="mi">1</span>    <span 
class="nc">EventHubsSystemDescriptor</span> <span 
class="n">systemDescriptor</span> <span class="o">=</span> <span 
class="k">new</span> <span class="nc">EventHubsSystemDescriptor</span><span 
class="o">(</span><span class="s">"eventhubs"</span><span 
class="o">).</span><span class="na">withNumClientThreads</span><span 
class="o">(</span><span class="mi">5</span><span class="o">);</span>
 
-<span class="mi">2</span>    <span 
class="n">EventHubsInputDescriptor</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">inputDescriptor</span> <span class="o">=</span>
-        <span class="n">systemDescriptor</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="n">INPUT_STREAM_ID</span><span class="o">,</span> <span 
class="n">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span 
class="n">EVENTHUBS_INPUT_ENTITY</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">StringSerde</span><span 
class="o">())</span>
+<span class="mi">2</span>    <span 
class="nc">EventHubsInputDescriptor</span><span class="o">&lt;</span><span 
class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">String</span><span class="o">&gt;&gt;</span> <span 
class="n">inputDescriptor</span> <span class="o">=</span>
+        <span class="n">systemDescriptor</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="no">INPUT_STREAM_ID</span><span class="o">,</span> <span 
class="no">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span 
class="no">EVENTHUBS_INPUT_ENTITY</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">StringSerde</span><span 
class="o">())</span>
             <span class="o">.</span><span 
class="na">withSasKeyName</span><span class="o">(..)</span>
             <span class="o">.</span><span class="na">withSasKey</span><span 
class="o">(..));</span>
 
-<span class="mi">3</span>    <span 
class="n">EventHubsOutputDescriptor</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">outputDescriptor</span> <span class="o">=</span>
-        <span class="n">systemDescriptor</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="n">OUTPUT_STREAM_ID</span><span class="o">,</span> <span 
class="n">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span 
class="n">EVENTHUBS_OUTPUT_ENTITY</span><span class="o">,</span> <span 
class="n">serde</span><span class="o">)</span>
+<span class="mi">3</span>    <span 
class="nc">EventHubsOutputDescriptor</span><span class="o">&lt;</span><span 
class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">String</span><span class="o">&gt;&gt;</span> <span 
class="n">outputDescriptor</span> <span class="o">=</span>
+        <span class="n">systemDescriptor</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="no">OUTPUT_STREAM_ID</span><span class="o">,</span> <span 
class="no">EVENTHUBS_NAMESPACE</span><span class="o">,</span> <span 
class="no">EVENTHUBS_OUTPUT_ENTITY</span><span class="o">,</span> <span 
class="n">serde</span><span class="o">)</span>
             <span class="o">.</span><span 
class="na">withSasKeyName</span><span class="o">(..))</span>
             <span class="o">.</span><span class="na">withSasKey</span><span 
class="o">(..));</span>
 
-<span class="mi">4</span>    <span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">eventhubInput</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="n">inputDescriptor</span><span class="o">);</span>
-<span class="mi">5</span>    <span class="n">OutputStream</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">eventhubOutput</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="n">outputDescriptor</span><span class="o">);</span>
+<span class="mi">4</span>    <span class="nc">MessageStream</span><span 
class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">String</span><span class="o">&gt;&gt;</span> <span 
class="n">eventhubInput</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="n">inputDescriptor</span><span class="o">);</span>
+<span class="mi">5</span>    <span class="nc">OutputStream</span><span 
class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">String</span><span class="o">&gt;&gt;</span> <span 
class="n">eventhubOutput</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="n">outputDescriptor</span><span class="o">);</span>
 
     <span class="c1">// Define the execution flow with the High Level Streams 
API</span>
 <span class="mi">6</span>    <span class="n">eventhubInput</span>
 <span class="mi">7</span>        <span class="o">.</span><span 
class="na">map</span><span class="o">((</span><span 
class="n">message</span><span class="o">)</span> <span class="o">-&gt;</span> 
<span class="o">{</span>
-<span class="mi">8</span>          <span class="n">System</span><span 
class="o">.</span><span class="na">out</span><span class="o">.</span><span 
class="na">println</span><span class="o">(</span><span class="s">&quot;Received 
Key: &quot;</span> <span class="o">+</span> <span class="n">message</span><span 
class="o">.</span><span class="na">getKey</span><span class="o">());</span>
-<span class="mi">9</span>          <span class="n">System</span><span 
class="o">.</span><span class="na">out</span><span class="o">.</span><span 
class="na">println</span><span class="o">(</span><span class="s">&quot;Received 
Message: &quot;</span> <span class="o">+</span> <span 
class="n">message</span><span class="o">.</span><span 
class="na">getValue</span><span class="o">());</span>
+<span class="mi">8</span>          <span class="nc">System</span><span 
class="o">.</span><span class="na">out</span><span class="o">.</span><span 
class="na">println</span><span class="o">(</span><span class="s">"Received Key: 
"</span> <span class="o">+</span> <span class="n">message</span><span 
class="o">.</span><span class="na">getKey</span><span class="o">());</span>
+<span class="mi">9</span>          <span class="nc">System</span><span 
class="o">.</span><span class="na">out</span><span class="o">.</span><span 
class="na">println</span><span class="o">(</span><span class="s">"Received 
Message: "</span> <span class="o">+</span> <span class="n">message</span><span 
class="o">.</span><span class="na">getValue</span><span class="o">());</span>
 <span class="mi">10</span>          <span class="k">return</span> <span 
class="n">message</span><span class="o">;</span>
 <span class="mi">11</span>        <span class="o">})</span>
 <span class="mi">12</span>        <span class="o">.</span><span 
class="na">sendTo</span><span class="o">(</span><span 
class="n">eventhubOutput</span><span class="o">);</span></code></pre></figure>
 
-<p>-Line 1 instantiates an <code>EventHubsSystemDescriptor</code> configuring 
an EventHubsClient with 5 threads. To consume from other input sources like 
Kafka, you can define their corresponding descriptors. </p>
+<p>-Line 1 instantiates an <code class="language-plaintext 
highlighter-rouge">EventHubsSystemDescriptor</code> configuring an 
EventHubsClient with 5 threads. To consume from other input sources like Kafka, 
you can define their corresponding descriptors.</p>
 
-<p>-Line 2 creates an <code>EventHubsInputDescriptor</code> with a String 
serde for its values. Recall that Samza follows a KV data-model for input 
messages. In the case of EventHubs, the key is a string which is set to the <a 
href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data._system_properties.getpartitionkey?view=azure-java-stable#com_microsoft_azure_eventhubs__event_data__system_properties_getPartitionKey__";>partitionKey</a>
 in the message. Hence, no separate key serde is required. </p>
+<p>-Line 2 creates an <code class="language-plaintext 
highlighter-rouge">EventHubsInputDescriptor</code> with a String serde for its 
values. Recall that Samza follows a KV data-model for input messages. In the 
case of EventHubs, the key is a string which is set to the <a 
href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data._system_properties.getpartitionkey?view=azure-java-stable#com_microsoft_azure_eventhubs__event_data__system_properties_getPartitionKey__";>partitionKey</a>
 in the message. Hence, no separate key serde is required.</p>
 
-<p>-Line 3 creates an <code>EventHubsOutputDescriptor</code> to write to an 
EventHubs stream with the given credentials.</p>
+<p>-Line 3 creates an <code class="language-plaintext 
highlighter-rouge">EventHubsOutputDescriptor</code> to write to an EventHubs 
stream with the given credentials.</p>
 
-<p>-Line 4 obtains a <code>MessageStream</code> from the input descriptor that 
you can later chain operations on. </p>
+<p>-Line 4 obtains a <code class="language-plaintext 
highlighter-rouge">MessageStream</code> from the input descriptor that you can 
later chain operations on.</p>
 
-<p>-Line 5 creates an <code>OutputStream</code> with the previously defined 
<code>EventHubsOutputDescriptor</code> that you can send messages to.</p>
+<p>-Line 5 creates an <code class="language-plaintext 
highlighter-rouge">OutputStream</code> with the previously defined <code 
class="language-plaintext highlighter-rouge">EventHubsOutputDescriptor</code> 
that you can send messages to.</p>
 
 <p>-Line 7-12 define a simple pipeline that copies message from one EventHubs 
stream to another</p>
 

Modified: samza/site/learn/documentation/latest/connectors/hdfs.html
URL: 
http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/connectors/hdfs.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/connectors/hdfs.html (original)
+++ samza/site/learn/documentation/latest/connectors/hdfs.html Wed Jan 18 
19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a 
href="/learn/documentation/1.8.0/connectors/hdfs">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/1.7.0/connectors/hdfs">1.7.0</a></li>
+
+              
+
               <li class="hide"><a 
href="/learn/documentation/1.6.0/connectors/hdfs">1.6.0</a></li>
 
               
@@ -645,83 +659,87 @@
 To interact with HDFS, Samza requires your job to run on the same YARN 
cluster.</p>
 
 <h3 id="consuming-from-hdfs">Consuming from HDFS</h3>
-
 <h4 id="input-partitioning">Input Partitioning</h4>
 
-<p>Partitioning works at the level of individual directories and files. Each 
directory is treated as its own stream and each of its files is treated as a 
<em>partition</em>. For example, Samza creates 5 partitions when it&rsquo;s 
reading from a directory containing 5 files. There is no way to parallelize the 
consumption when reading from a single file - you can only have one container 
to process the file.</p>
+<p>Partitioning works at the level of individual directories and files. Each 
directory is treated as its own stream and each of its files is treated as a 
<em>partition</em>. For example, Samza creates 5 partitions when it’s reading 
from a directory containing 5 files. There is no way to parallelize the 
consumption when reading from a single file - you can only have one container 
to process the file.</p>
 
 <h4 id="input-event-format">Input Event format</h4>
-
-<p>Samza supports avro natively, and it&rsquo;s easy to extend to other 
serialization formats. Each avro record read from HDFS is wrapped into a 
message-envelope. The <a 
href="../api/javadocs/org/apache/samza/system/IncomingMessageEnvelope.html">envelope</a>
 contains these 3 fields:</p>
+<p>Samza supports avro natively, and it’s easy to extend to other 
serialization formats. Each avro record read from HDFS is wrapped into a 
message-envelope. The <a 
href="../api/javadocs/org/apache/samza/system/IncomingMessageEnvelope.html">envelope</a>
 contains these 3 fields:</p>
 
 <ul>
-<li><p>The key, which is empty</p></li>
-<li><p>The value, which is set to the avro <a 
href="https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html";>GenericRecord</a></p></li>
-<li><p>The partition, which is set to the name of the HDFS file</p></li>
+  <li>
+    <p>The key, which is empty</p>
+  </li>
+  <li>
+    <p>The value, which is set to the avro <a 
href="https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html";>GenericRecord</a></p>
+  </li>
+  <li>
+    <p>The partition, which is set to the name of the HDFS file</p>
+  </li>
 </ul>
 
 <p>To support non-avro input formats, you can implement the <a 
href="https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java";>SingleFileHdfsReader</a>
 interface.</p>
 
 <h4 id="endofstream">EndOfStream</h4>
 
-<p>While streaming sources like Kafka are unbounded, files on HDFS have finite 
data and have a notion of EOF. When reading from HDFS, your Samza job 
automatically exits after consuming all the data. You can implement <a 
href="../api/javadocs/org/apache/samza/task/EndOfStreamListenerTask.html">EndOfStreamListenerTask</a>
 to get a callback once EOF has been reached. </p>
+<p>While streaming sources like Kafka are unbounded, files on HDFS have finite 
data and have a notion of EOF. When reading from HDFS, your Samza job 
automatically exits after consuming all the data. You can implement <a 
href="../api/javadocs/org/apache/samza/task/EndOfStreamListenerTask.html">EndOfStreamListenerTask</a>
 to get a callback once EOF has been reached.</p>
 
 <h4 id="defining-streams">Defining streams</h4>
 
-<p>In Samza high level API, you can use <code>HdfsSystemDescriptor</code> to 
create a HDFS system. The stream name should be set to the name of the 
directory on HDFS.</p>
-
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="n">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;hdfs-clickstream&quot;</span><span class="o">);</span>
-<span class="n">HdfsInputDescriptor</span> <span class="n">hid</span> <span 
class="o">=</span> <span class="n">hsd</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">&quot;/data/clickstream/2016/09/11&quot;</span><span 
class="o">);</span></code></pre></figure>
+<p>In Samza high level API, you can use <code class="language-plaintext 
highlighter-rouge">HdfsSystemDescriptor</code> to create a HDFS system. The 
stream name should be set to the name of the directory on HDFS.</p>
 
-<p>The above example defines a stream called <code>hdfs-clickstream</code> 
that reads data from the <code>/data/clickstream/2016/09/11</code> directory. 
</p>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="nc">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="nc">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">"hdfs-clickstream"</span><span class="o">);</span>
+<span class="nc">HdfsInputDescriptor</span> <span class="n">hid</span> <span 
class="o">=</span> <span class="n">hsd</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">"/data/clickstream/2016/09/11"</span><span 
class="o">);</span></code></pre></figure>
 
-<h4 id="whitelists-blacklists">Whitelists &amp; Blacklists</h4>
+<p>The above example defines a stream called <code class="language-plaintext 
highlighter-rouge">hdfs-clickstream</code> that reads data from the <code 
class="language-plaintext 
highlighter-rouge">/data/clickstream/2016/09/11</code> directory.</p>
 
-<p>If you only want to consume from files that match a certain pattern, you 
can configure a whitelist. Likewise, you can also blacklist consuming from 
certain files. When both are specified, the <em>whitelist</em> selects the 
files to be filtered and the <em>blacklist</em> is later applied on its 
results. </p>
+<h4 id="whitelists--blacklists">Whitelists &amp; Blacklists</h4>
+<p>If you only want to consume from files that match a certain pattern, you 
can configure a whitelist. Likewise, you can also blacklist consuming from 
certain files. When both are specified, the <em>whitelist</em> selects the 
files to be filtered and the <em>blacklist</em> is later applied on its 
results.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="n">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
-                                        <span class="o">.</span><span 
class="na">withConsumerWhiteList</span><span class="o">(</span><span 
class="s">&quot;.*avro&quot;</span><span class="o">)</span>
-                                        <span class="o">.</span><span 
class="na">withConsumerBlackList</span><span class="o">(</span><span 
class="s">&quot;somefile.avro&quot;</span><span 
class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="nc">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="nc">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">"hdfs-clickstream"</span><span class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withConsumerWhiteList</span><span class="o">(</span><span 
class="s">".*avro"</span><span class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withConsumerBlackList</span><span class="o">(</span><span 
class="s">"somefile.avro"</span><span class="o">);</span></code></pre></figure>
 
 <h3 id="producing-to-hdfs">Producing to HDFS</h3>
 
 <h4 id="output-format">Output format</h4>
 
-<p>Samza allows writing your output results to HDFS in AVRO format. You can 
either use avro&rsquo;s GenericRecords or have Samza automatically infer the 
schema for your object using reflection. </p>
+<p>Samza allows writing your output results to HDFS in AVRO format. You can 
either use avro’s GenericRecords or have Samza automatically infer the schema 
for your object using reflection.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="n">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
-                                        <span class="o">.</span><span 
class="na">withWriterClassName</span><span class="o">(</span><span 
class="n">AvroDataFileHdfsWriter</span><span class="o">.</span><span 
class="na">class</span><span class="o">.</span><span 
class="na">getName</span><span class="o">());</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="nc">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="nc">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">"hdfs-clickstream"</span><span class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withWriterClassName</span><span class="o">(</span><span 
class="nc">AvroDataFileHdfsWriter</span><span class="o">.</span><span 
class="na">class</span><span class="o">.</span><span 
class="na">getName</span><span class="o">());</span></code></pre></figure>
 
-<p>If your output is non-avro, use <code>TextSequenceFileHdfsWriter</code>.</p>
+<p>If your output is non-avro, use <code class="language-plaintext 
highlighter-rouge">TextSequenceFileHdfsWriter</code>.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="n">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
-                                        <span class="o">.</span><span 
class="na">withWriterClassName</span><span class="o">(</span><span 
class="n">TextSequenceFileHdfsWriter</span><span class="o">.</span><span 
class="na">class</span><span class="o">.</span><span 
class="na">getName</span><span class="o">());</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="nc">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="nc">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">"hdfs-clickstream"</span><span class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withWriterClassName</span><span class="o">(</span><span 
class="nc">TextSequenceFileHdfsWriter</span><span class="o">.</span><span 
class="na">class</span><span class="o">.</span><span 
class="na">getName</span><span class="o">());</span></code></pre></figure>
 
 <h4 id="output-directory-structure">Output directory structure</h4>
 
-<p>Samza allows you to control the base HDFS directory to write your output. 
You can also organize the output into sub-directories depending on the time 
your application ran, by configuring a date-formatter. </p>
+<p>Samza allows you to control the base HDFS directory to write your output. 
You can also organize the output into sub-directories depending on the time 
your application ran, by configuring a date-formatter.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="n">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
-                                        <span class="o">.</span><span 
class="na">withOutputBaseDir</span><span class="o">(</span><span 
class="s">&quot;/user/me/analytics/clickstream_data&quot;</span><span 
class="o">)</span>
-                                        <span class="o">.</span><span 
class="na">withDatePathFormat</span><span class="o">(</span><span 
class="s">&quot;yyyy_MM_dd&quot;</span><span 
class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="nc">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="nc">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">"hdfs-clickstream"</span><span class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withOutputBaseDir</span><span class="o">(</span><span 
class="s">"/user/me/analytics/clickstream_data"</span><span class="o">)</span>
+                                        <span class="o">.</span><span 
class="na">withDatePathFormat</span><span class="o">(</span><span 
class="s">"yyyy_MM_dd"</span><span class="o">);</span></code></pre></figure>
 
 <p>You can configure the maximum size of each file or the maximum number of 
records per-file. Once either limits have been reached, Samza will create a new 
file.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="n">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;hdfs-clickstream&quot;</span><span class="o">)</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="nc">HdfsSystemDescriptor</span> <span 
class="n">hsd</span> <span class="o">=</span> <span class="k">new</span> <span 
class="nc">HdfsSystemDescriptor</span><span class="o">(</span><span 
class="s">"hdfs-clickstream"</span><span class="o">)</span>
                                         <span class="o">.</span><span 
class="na">withWriteBatchSizeBytes</span><span class="o">(</span><span 
class="mi">134217728</span><span class="o">)</span>
                                         <span class="o">.</span><span 
class="na">withWriteBatchSizeRecords</span><span class="o">(</span><span 
class="mi">10000</span><span class="o">);</span></code></pre></figure>
 
 <h3 id="security">Security</h3>
 
-<p>You can access Kerberos-enabled HDFS clusters by providing your principal 
and the path to your key-tab file. Samza takes care of automatically creating 
and renewing your Kerberos tokens periodically. </p>
+<p>You can access Kerberos-enabled HDFS clusters by providing your principal 
and the path to your key-tab file. Samza takes care of automatically creating 
and renewing your Kerberos tokens periodically.</p>
+
+<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties">job.security.manager.factory=org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory
 
-<figure class="highlight"><pre><code class="language-jproperties" 
data-lang="jproperties"><span></span><span 
class="na">job.security.manager.factory</span><span class="o">=</span><span 
class="s">org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory</span>
+# Kerberos principal
+yarn.kerberos.principal=your-principal-name
 
-<span class="c"># Kerberos principal</span>
-<span class="na">yarn.kerberos.principal</span><span class="o">=</span><span 
class="s">your-principal-name</span>
+# Path of the keytab file (local path)
+yarn.kerberos.keytab=/tmp/keytab</code></pre></figure>
 
-<span class="c"># Path of the keytab file (local path)</span>
-<span class="na">yarn.kerberos.keytab</span><span class="o">=</span><span 
class="s">/tmp/keytab</span></code></pre></figure>
 
            
         </div>

Modified: samza/site/learn/documentation/latest/connectors/kafka.html
URL: 
http://svn.apache.org/viewvc/samza/site/learn/documentation/latest/connectors/kafka.html?rev=1906774&r1=1906773&r2=1906774&view=diff
==============================================================================
--- samza/site/learn/documentation/latest/connectors/kafka.html (original)
+++ samza/site/learn/documentation/latest/connectors/kafka.html Wed Jan 18 
19:33:25 2023
@@ -227,6 +227,12 @@
     
       
         
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.8.0">1.8.0</a>
+      
+        
+      <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.7.0">1.7.0</a>
+      
+        
       <a class="side-navigation__group-item" data-match-active="" 
href="/releases/1.6.0">1.6.0</a>
       
         
@@ -538,6 +544,14 @@
               
               
 
+              <li class="hide"><a 
href="/learn/documentation/1.8.0/connectors/kafka">1.8.0</a></li>
+
+              
+
+              <li class="hide"><a 
href="/learn/documentation/1.7.0/connectors/kafka">1.7.0</a></li>
+
+              
+
               <li class="hide"><a 
href="/learn/documentation/1.6.0/connectors/kafka">1.6.0</a></li>
 
               
@@ -639,112 +653,125 @@
    limitations under the License.
 -->
 
-<h3 id="kafka-i-o-quickstart">Kafka I/O : QuickStart</h3>
-
+<h3 id="kafka-io--quickstart">Kafka I/O : QuickStart</h3>
 <p>Samza offers built-in integration with Apache Kafka for stream processing. 
A common pattern in Samza applications is to read messages from one or more 
Kafka topics, process them and emit results to other Kafka topics or 
databases.</p>
 
-<p>The <code>hello-samza</code> project includes multiple examples on 
interacting with Kafka from your Samza jobs. Each example also includes 
instructions on how to run them and view results. </p>
+<p>The <code class="language-plaintext highlighter-rouge">hello-samza</code> 
project includes multiple examples on interacting with Kafka from your Samza 
jobs. Each example also includes instructions on how to run them and view 
results.</p>
 
 <ul>
-<li><p><a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java";>High
 Level Streams API Example</a> with a corresponding <a 
href="/learn/documentation/latest/deployment/yarn.html#starting-your-application-on-yarn">tutorial</a></p></li>
-<li><p><a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java";>Low
 Level Task API Example</a> with a corresponding <a 
href="https://github.com/apache/samza-hello-samza#hello-samza";>tutorial</a></p></li>
+  <li>
+    <p><a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java";>High
 Level Streams API Example</a> with a corresponding <a 
href="/learn/documentation/latest/deployment/yarn.html#starting-your-application-on-yarn">tutorial</a></p>
+  </li>
+  <li>
+    <p><a 
href="https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java";>Low
 Level Task API Example</a> with a corresponding <a 
href="https://github.com/apache/samza-hello-samza#hello-samza";>tutorial</a></p>
+  </li>
 </ul>
 
 <h3 id="concepts">Concepts</h3>
 
-<h4 id="kafkasystemdescriptor">KafkaSystemDescriptor</h4>
+<p>####KafkaSystemDescriptor</p>
 
-<p>Samza refers to any IO source (eg: Kafka) it interacts with as a 
<em>system</em>, whose properties are set using a corresponding 
<code>SystemDescriptor</code>. The <code>KafkaSystemDescriptor</code> allows 
you to describe the Kafka cluster you are interacting with and specify its 
properties. </p>
+<p>Samza refers to any IO source (eg: Kafka) it interacts with as a 
<em>system</em>, whose properties are set using a corresponding <code 
class="language-plaintext highlighter-rouge">SystemDescriptor</code>. The <code 
class="language-plaintext highlighter-rouge">KafkaSystemDescriptor</code> 
allows you to describe the Kafka cluster you are interacting with and specify 
its properties.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">KafkaSystemDescriptor</span> 
<span class="n">kafkaSystemDescriptor</span> <span class="o">=</span>
-        <span class="k">new</span> <span 
class="n">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;kafka&quot;</span><span class="o">).</span><span 
class="na">withConsumerZkConnect</span><span class="o">(</span><span 
class="n">KAFKA_CONSUMER_ZK_CONNECT</span><span class="o">)</span>
-            <span class="o">.</span><span 
class="na">withProducerBootstrapServers</span><span class="o">(</span><span 
class="n">KAFKA_PRODUCER_BOOTSTRAP_SERVERS</span><span class="o">)</span>
-            <span class="o">.</span><span 
class="na">withDefaultStreamConfigs</span><span class="o">(</span><span 
class="n">KAFKA_DEFAULT_STREAM_CONFIGS</span><span 
class="o">);</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">KafkaSystemDescriptor</span> <span 
class="n">kafkaSystemDescriptor</span> <span class="o">=</span>
+        <span class="k">new</span> <span 
class="nf">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="s">"kafka"</span><span class="o">).</span><span 
class="na">withConsumerZkConnect</span><span class="o">(</span><span 
class="no">KAFKA_CONSUMER_ZK_CONNECT</span><span class="o">)</span>
+            <span class="o">.</span><span 
class="na">withProducerBootstrapServers</span><span class="o">(</span><span 
class="no">KAFKA_PRODUCER_BOOTSTRAP_SERVERS</span><span class="o">)</span>
+            <span class="o">.</span><span 
class="na">withDefaultStreamConfigs</span><span class="o">(</span><span 
class="no">KAFKA_DEFAULT_STREAM_CONFIGS</span><span 
class="o">);</span></code></pre></figure>
 
-<h4 id="kafkainputdescriptor">KafkaInputDescriptor</h4>
+<p>####KafkaInputDescriptor</p>
 
-<p>A Kafka cluster usually has multiple topics (a.k.a <em>streams</em>). The 
<code>KafkaInputDescriptor</code> allows you to specify the properties of each 
Kafka topic your application should read from. For each of your input topics, 
you should create a corresponding instance of <code>KafkaInputDescriptor</code>
+<p>A Kafka cluster usually has multiple topics (a.k.a <em>streams</em>). The 
<code class="language-plaintext highlighter-rouge">KafkaInputDescriptor</code> 
allows you to specify the properties of each Kafka topic your application 
should read from. For each of your input topics, you should create a 
corresponding instance of <code class="language-plaintext 
highlighter-rouge">KafkaInputDescriptor</code>
 by providing a topic-name and a serializer.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span 
class="n">KafkaInputDescriptor</span><span class="o">&lt;</span><span 
class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViewStreamDescriptor</span> <span class="o">=</span> <span 
class="n">kafkaSystemDescriptor</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">&quot;page-view-topic&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">PageView</span><span 
class="o">.</span><span class="na">class</span><span 
class="o">));</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">KafkaInputDescriptor</span><span class="o">&lt;</span><span 
class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">pageViewStreamDescriptor</span> <span class="o">=</span> <span 
class="n">kafkaSystemDescriptor</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">"page-view-topic"</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">PageView</span><span 
class="o">.</span><span class="na">class</span><span 
class="o">));</span></code></pre></figure>
 
-<p>The above example describes an input Kafka stream from the 
&ldquo;page-view-topic&rdquo; which Samza de-serializes into a JSON payload. 
Samza provides default serializers for common data-types like string, avro, 
bytes, integer etc.</p>
+<p>The above example describes an input Kafka stream from the 
“page-view-topic” which Samza de-serializes into a JSON payload. Samza 
provides default serializers for common data-types like string, avro, bytes, 
integer etc.</p>
 
-<h4 id="kafkaoutputdescriptor">KafkaOutputDescriptor</h4>
+<p>####KafkaOutputDescriptor</p>
 
-<p>Similarly, the <code>KafkaOutputDescriptor</code> allows you to specify the 
output streams for your application. For each output topic you write to, you 
should create an instance of <code>KafkaOutputDescriptor</code>.</p>
+<p>Similarly, the <code class="language-plaintext 
highlighter-rouge">KafkaOutputDescriptor</code> allows you to specify the 
output streams for your application. For each output topic you write to, you 
should create an instance of <code class="language-plaintext 
highlighter-rouge">KafkaOutputDescriptor</code>.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span 
class="n">KafkaOutputDescriptor</span><span class="o">&lt;</span><span 
class="n">DecoratedPageView</span><span class="o">&gt;</span> <span 
class="n">decoratedPageView</span> <span class="o">=</span> <span 
class="n">kafkaSystemDescriptor</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">&quot;my-output-topic&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">DecoratedPageView</span><span 
class="o">.</span><span class="na">class</span><span 
class="o">));</span></code></pre></figure>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">KafkaOutputDescriptor</span><span class="o">&lt;</span><span 
class="nc">DecoratedPageView</span><span class="o">&gt;</span> <span 
class="n">decoratedPageView</span> <span class="o">=</span> <span 
class="n">kafkaSystemDescriptor</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">"my-output-topic"</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">DecoratedPageView</span><span 
class="o">.</span><span class="na">class</span><span 
class="o">));</span></code></pre></figure>
 
 <h3 id="configuration">Configuration</h3>
 
-<h5 id="configuring-kafka-producer-and-consumer">Configuring Kafka producer 
and consumer</h5>
+<p>#####Configuring Kafka producer and consumer</p>
 
-<p>The <code>KafkaSystemDescriptor</code> allows you to specify any <a 
href="https://kafka.apache.org/documentation/#producerconfigs";>Kafka 
producer</a> or <a 
href="https://kafka.apache.org/documentation/#consumerconfigs";>Kafka 
consumer</a>) property which are directly passed over to the underlying Kafka 
client. This allows for 
-precise control over the KafkaProducer and KafkaConsumer used by Samza. </p>
+<p>The <code class="language-plaintext 
highlighter-rouge">KafkaSystemDescriptor</code> allows you to specify any <a 
href="https://kafka.apache.org/documentation/#producerconfigs";>Kafka 
producer</a> or <a 
href="https://kafka.apache.org/documentation/#consumerconfigs";>Kafka 
consumer</a>) property which are directly passed over to the underlying Kafka 
client. This allows for 
+precise control over the KafkaProducer and KafkaConsumer used by Samza.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">KafkaSystemDescriptor</span> 
<span class="n">kafkaSystemDescriptor</span> <span class="o">=</span>
-        <span class="k">new</span> <span 
class="n">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;kafka&quot;</span><span class="o">).</span><span 
class="na">withConsumerZkConnect</span><span class="o">(..)</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">KafkaSystemDescriptor</span> <span 
class="n">kafkaSystemDescriptor</span> <span class="o">=</span>
+        <span class="k">new</span> <span 
class="nf">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="s">"kafka"</span><span class="o">).</span><span 
class="na">withConsumerZkConnect</span><span class="o">(..)</span>
             <span class="o">.</span><span 
class="na">withProducerBootstrapServers</span><span class="o">(..)</span>
             <span class="o">.</span><span 
class="na">withConsumerConfigs</span><span class="o">(..)</span>
             <span class="o">.</span><span 
class="na">withProducerConfigs</span><span 
class="o">(..)</span></code></pre></figure>
 
-<h4 id="accessing-an-offset-which-is-out-of-range">Accessing an offset which 
is out-of-range</h4>
-
-<p>This setting determines the behavior if a consumer attempts to read an 
offset that is outside of the current valid range maintained by the broker. 
This could happen if the topic does not exist, or if a checkpoint is older than 
the maximum message history retained by the brokers. </p>
+<p>####Accessing an offset which is out-of-range
+This setting determines the behavior if a consumer attempts to read an offset 
that is outside of the current valid range maintained by the broker. This could 
happen if the topic does not exist, or if a checkpoint is older than the 
maximum message history retained by the brokers.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span>    <span class="n">KafkaSystemDescriptor</span> 
<span class="n">kafkaSystemDescriptor</span> <span class="o">=</span>
-        <span class="k">new</span> <span 
class="n">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="s">&quot;kafka&quot;</span><span class="o">).</span><span 
class="na">withConsumerZkConnect</span><span class="o">(..)</span>
+<figure class="highlight"><pre><code class="language-java" data-lang="java">   
 <span class="nc">KafkaSystemDescriptor</span> <span 
class="n">kafkaSystemDescriptor</span> <span class="o">=</span>
+        <span class="k">new</span> <span 
class="nf">KafkaSystemDescriptor</span><span class="o">(</span><span 
class="s">"kafka"</span><span class="o">).</span><span 
class="na">withConsumerZkConnect</span><span class="o">(..)</span>
             <span class="o">.</span><span 
class="na">withProducerBootstrapServers</span><span class="o">(..)</span>
-            <span class="o">.</span><span 
class="na">withConsumerAutoOffsetReset</span><span class="o">(</span><span 
class="s">&quot;largest&quot;</span><span 
class="o">)</span></code></pre></figure>
-
-<h5 id="ignoring-checkpointed-offsets">Ignoring checkpointed offsets</h5>
+            <span class="o">.</span><span 
class="na">withConsumerAutoOffsetReset</span><span class="o">(</span><span 
class="s">"largest"</span><span class="o">)</span></code></pre></figure>
 
-<p>Samza periodically persists the last processed Kafka offsets as a part of 
its checkpoint. During startup, Samza resumes consumption from the previously 
checkpointed offsets by default. You can over-ride this behavior and configure 
Samza to ignore checkpoints with 
<code>KafkaInputDescriptor#shouldResetOffset()</code>.
-Once there are no checkpoints for a stream, the 
<code>#withOffsetDefault(..)</code> determines whether we start consumption 
from the oldest or newest offset. </p>
+<p>#####Ignoring checkpointed offsets
+Samza periodically persists the last processed Kafka offsets as a part of its 
checkpoint. During startup, Samza resumes consumption from the previously 
checkpointed offsets by default. You can over-ride this behavior and configure 
Samza to ignore checkpoints with <code class="language-plaintext 
highlighter-rouge">KafkaInputDescriptor#shouldResetOffset()</code>.
+Once there are no checkpoints for a stream, the <code 
class="language-plaintext highlighter-rouge">#withOffsetDefault(..)</code> 
determines whether we start consumption from the oldest or newest offset.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">KafkaInputDescriptor</span><span 
class="o">&lt;</span><span class="n">PageView</span><span class="o">&gt;</span> 
<span class="n">pageViewStreamDescriptor</span> <span class="o">=</span> 
-    <span class="n">kafkaSystemDescriptor</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">&quot;page-view-topic&quot;</span><span class="o">,</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">))</span> 
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="nc">KafkaInputDescriptor</span><span 
class="o">&lt;</span><span class="nc">PageView</span><span 
class="o">&gt;</span> <span class="n">pageViewStreamDescriptor</span> <span 
class="o">=</span> 
+    <span class="n">kafkaSystemDescriptor</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">"page-view-topic"</span><span class="o">,</span> <span 
class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">))</span> 
         <span class="o">.</span><span class="na">shouldResetOffset</span><span 
class="o">()</span>
-        <span class="o">.</span><span class="na">withOffsetDefault</span><span 
class="o">(</span><span class="n">OffsetType</span><span 
class="o">.</span><span class="na">OLDEST</span><span 
class="o">);</span></code></pre></figure>
+        <span class="o">.</span><span class="na">withOffsetDefault</span><span 
class="o">(</span><span class="nc">OffsetType</span><span 
class="o">.</span><span class="na">OLDEST</span><span 
class="o">);</span></code></pre></figure>
 
-<p>The above example configures Samza to ignore checkpointed offsets for 
<code>page-view-topic</code> and consume from the oldest available offset 
during startup. You can configure this behavior to apply to all topics in the 
Kafka cluster by using 
<code>KafkaSystemDescriptor#withDefaultStreamOffsetDefault</code>.</p>
+<p>The above example configures Samza to ignore checkpointed offsets for <code 
class="language-plaintext highlighter-rouge">page-view-topic</code> and consume 
from the oldest available offset during startup. You can configure this 
behavior to apply to all topics in the Kafka cluster by using <code 
class="language-plaintext 
highlighter-rouge">KafkaSystemDescriptor#withDefaultStreamOffsetDefault</code>.</p>
 
 <h3 id="code-walkthrough-high-level-streams-api">Code walkthrough: High Level 
Streams API</h3>
 
 <p>In this section, we walk through a complete example that reads from a Kafka 
topic, filters a few messages and writes them to another topic.</p>
 
-<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// Define coordinates of the 
Kafka cluster using the KafkaSystemDescriptor</span>
-<span class="mi">1</span>    <span class="n">KafkaSystemDescriptor</span> 
<span class="n">kafkaSystemDescriptor</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">KafkaSystemDescriptor</span><span 
class="o">(</span><span class="s">&quot;kafka&quot;</span><span 
class="o">)</span>
-<span class="mi">2</span>        <span class="o">.</span><span 
class="na">withConsumerZkConnect</span><span class="o">(</span><span 
class="n">KAFKA_CONSUMER_ZK_CONNECT</span><span class="o">)</span>
-<span class="mi">3</span>        <span class="o">.</span><span 
class="na">withProducerBootstrapServers</span><span class="o">(</span><span 
class="n">KAFKA_PRODUCER_BOOTSTRAP_SERVERS</span><span class="o">)</span>
+<figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="c1">// Define coordinates of the Kafka cluster 
using the KafkaSystemDescriptor</span>
+<span class="mi">1</span>    <span class="nc">KafkaSystemDescriptor</span> 
<span class="n">kafkaSystemDescriptor</span> <span class="o">=</span> <span 
class="k">new</span> <span class="nc">KafkaSystemDescriptor</span><span 
class="o">(</span><span class="s">"kafka"</span><span class="o">)</span>
+<span class="mi">2</span>        <span class="o">.</span><span 
class="na">withConsumerZkConnect</span><span class="o">(</span><span 
class="no">KAFKA_CONSUMER_ZK_CONNECT</span><span class="o">)</span>
+<span class="mi">3</span>        <span class="o">.</span><span 
class="na">withProducerBootstrapServers</span><span class="o">(</span><span 
class="no">KAFKA_PRODUCER_BOOTSTRAP_SERVERS</span><span class="o">)</span>
 
 <span class="c1">// Create an KafkaInputDescriptor for your input topic and a 
KafkaOutputDescriptor for the output topic </span>
-<span class="mi">4</span>    <span class="n">KVSerde</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">PageView</span><span class="o">&gt;</span> <span 
class="n">serde</span> <span class="o">=</span> <span 
class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="n">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="n">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">));</span>
-<span class="mi">5</span>    <span class="n">KafkaInputDescriptor</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;&gt;</span> <span 
class="n">inputDescriptor</span> <span class="o">=</span>
-<span class="mi">6</span>        <span 
class="n">kafkaSystemDescriptor</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">&quot;page-views&quot;</span><span class="o">,</span> <span 
class="n">serde</span><span class="o">);</span>
-<span class="mi">7</span>    <span class="n">KafkaOutputDescriptor</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;&gt;</span> <span 
class="n">outputDescriptor</span> <span class="o">=</span>
-<span class="mi">8</span>        <span 
class="n">kafkaSystemDescriptor</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">&quot;filtered-page-views&quot;</span><span class="o">,</span> <span 
class="n">serde</span><span class="o">);</span>
+<span class="mi">4</span>    <span class="nc">KVSerde</span><span 
class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> 
<span class="nc">PageView</span><span class="o">&gt;</span> <span 
class="n">serde</span> <span class="o">=</span> <span 
class="nc">KVSerde</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="nc">StringSerde</span><span class="o">(),</span> <span 
class="k">new</span> <span class="nc">JsonSerdeV2</span><span 
class="o">&lt;&gt;(</span><span class="nc">PageView</span><span 
class="o">.</span><span class="na">class</span><span class="o">));</span>
+<span class="mi">5</span>    <span class="nc">KafkaInputDescriptor</span><span 
class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">PageView</span><span class="o">&gt;&gt;</span> <span 
class="n">inputDescriptor</span> <span class="o">=</span>
+<span class="mi">6</span>        <span 
class="n">kafkaSystemDescriptor</span><span class="o">.</span><span 
class="na">getInputDescriptor</span><span class="o">(</span><span 
class="s">"page-views"</span><span class="o">,</span> <span 
class="n">serde</span><span class="o">);</span>
+<span class="mi">7</span>    <span 
class="nc">KafkaOutputDescriptor</span><span class="o">&lt;</span><span 
class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">PageView</span><span class="o">&gt;&gt;</span> <span 
class="n">outputDescriptor</span> <span class="o">=</span>
+<span class="mi">8</span>        <span 
class="n">kafkaSystemDescriptor</span><span class="o">.</span><span 
class="na">getOutputDescriptor</span><span class="o">(</span><span 
class="s">"filtered-page-views"</span><span class="o">,</span> <span 
class="n">serde</span><span class="o">);</span>
 
 
 <span class="c1">// Obtain a message stream the input topic</span>
-<span class="mi">9</span>    <span class="n">MessageStream</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="n">inputDescriptor</span><span class="o">);</span>
+<span class="mi">9</span>    <span class="nc">MessageStream</span><span 
class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">PageView</span><span class="o">&gt;&gt;</span> <span 
class="n">pageViews</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(</span><span 
class="n">inputDescriptor</span><span class="o">);</span>
 
 <span class="c1">// Obtain an output stream for the topic    </span>
-<span class="mi">10</span>    <span class="n">OutputStream</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">PageView</span><span class="o">&gt;&gt;</span> <span 
class="n">filteredPageViews</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="n">outputDescriptor</span><span class="o">);</span>
+<span class="mi">10</span>    <span class="nc">OutputStream</span><span 
class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span 
class="nc">String</span><span class="o">,</span> <span 
class="nc">PageView</span><span class="o">&gt;&gt;</span> <span 
class="n">filteredPageViews</span> <span class="o">=</span> <span 
class="n">appDescriptor</span><span class="o">.</span><span 
class="na">getOutputStream</span><span class="o">(</span><span 
class="n">outputDescriptor</span><span class="o">);</span>
 
 <span class="c1">// write results to the output topic</span>
 <span class="mi">11</span>    <span class="n">pageViews</span>
-<span class="mi">12</span>       <span class="o">.</span><span 
class="na">filter</span><span class="o">(</span><span class="n">kv</span> <span 
class="o">-&gt;</span> <span class="o">!</span><span 
class="n">INVALID_USER_ID</span><span class="o">.</span><span 
class="na">equals</span><span class="o">(</span><span class="n">kv</span><span 
class="o">.</span><span class="na">value</span><span class="o">.</span><span 
class="na">userId</span><span class="o">))</span>
+<span class="mi">12</span>       <span class="o">.</span><span 
class="na">filter</span><span class="o">(</span><span class="n">kv</span> <span 
class="o">-&gt;</span> <span class="o">!</span><span 
class="no">INVALID_USER_ID</span><span class="o">.</span><span 
class="na">equals</span><span class="o">(</span><span class="n">kv</span><span 
class="o">.</span><span class="na">value</span><span class="o">.</span><span 
class="na">userId</span><span class="o">))</span>
 <span class="mi">13</span>       <span class="o">.</span><span 
class="na">sendTo</span><span class="o">(</span><span 
class="n">filteredPageViews</span><span 
class="o">);</span></code></pre></figure>
 
 <ul>
-<li><p>Lines 1-3 create a KafkaSystemDescriptor defining the coordinates of 
our Kafka cluster</p></li>
-<li><p>Lines 4-6 defines a KafkaInputDescriptor for our input topic - 
<code>page-views</code></p></li>
-<li><p>Lines 7-9 defines a KafkaOutputDescriptor for our output topic - 
<code>filtered-page-views</code></p></li>
-<li><p>Line 9 creates a MessageStream for the input topic so that you can 
chain operations on it later</p></li>
-<li><p>Line 10 creates an OuputStream for the output topic</p></li>
-<li><p>Lines 11-13 define a simple pipeline that reads from the input stream 
and writes filtered results to the output stream</p></li>
+  <li>
+    <p>Lines 1-3 create a KafkaSystemDescriptor defining the coordinates of 
our Kafka cluster</p>
+  </li>
+  <li>
+    <p>Lines 4-6 defines a KafkaInputDescriptor for our input topic - <code 
class="language-plaintext highlighter-rouge">page-views</code></p>
+  </li>
+  <li>
+    <p>Lines 7-9 defines a KafkaOutputDescriptor for our output topic - <code 
class="language-plaintext highlighter-rouge">filtered-page-views</code></p>
+  </li>
+  <li>
+    <p>Line 9 creates a MessageStream for the input topic so that you can 
chain operations on it later</p>
+  </li>
+  <li>
+    <p>Line 10 creates an OuputStream for the output topic</p>
+  </li>
+  <li>
+    <p>Lines 11-13 define a simple pipeline that reads from the input stream 
and writes filtered results to the output stream</p>
+  </li>
 </ul>
 
            


Reply via email to