http://git-wip-us.apache.org/repos/asf/kafka/blob/3e2fe17c/docs/streams/developer-guide/dsl-api.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide/dsl-api.html 
b/docs/streams/developer-guide/dsl-api.html
new file mode 100644
index 0000000..9cfb4bc
--- /dev/null
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -0,0 +1,3208 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+  <!-- h1>Developer Guide for Kafka Streams</h1 -->
+  <!-- div class="sub-nav-sticky">
+    <div class="sticky-top">
+      <div style="height:35px">
+        <a href="/{{version}}/documentation/streams/">Introduction</a>
+        <a class="active-menu-item" 
href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+        <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+        <a href="/{{version}}/documentation/streams/quickstart">Run Demo 
App</a>
+        <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write 
App</a>
+      </div>
+    </div -->
+  </div>
+
+
+    <div class="section" id="streams-dsl">
+        <span id="streams-developer-guide-dsl"></span><h1>Streams DSL<a 
class="headerlink" href="#streams-dsl" title="Permalink to this 
headline"></a></h1>
+        <p>The Kafka Streams DSL (Domain Specific Language) is built on top of 
the Streams Processor API. It is the recommended for
+            most users, especially beginners. Most data processing operations 
can be expressed in just a few lines of DSL code.</p>
+        <div class="contents local topic" id="table-of-contents">
+            <p class="topic-title first">Table of Contents</p>
+            <ul class="simple">
+                <li><a class="reference internal" href="#overview" 
id="id7">Overview</a></li>
+                <li><a class="reference internal" 
href="#creating-source-streams-from-kafka" id="id8">Creating source streams 
from Kafka</a></li>
+                <li><a class="reference internal" href="#transform-a-stream" 
id="id9">Transform a stream</a><ul>
+                    <li><a class="reference internal" 
href="#stateless-transformations" id="id10">Stateless transformations</a></li>
+                    <li><a class="reference internal" 
href="#stateful-transformations" id="id11">Stateful transformations</a><ul>
+                        <li><a class="reference internal" href="#aggregating" 
id="id12">Aggregating</a></li>
+                        <li><a class="reference internal" href="#joining" 
id="id13">Joining</a><ul>
+                            <li><a class="reference internal" 
href="#join-co-partitioning-requirements" id="id14">Join co-partitioning 
requirements</a></li>
+                            <li><a class="reference internal" 
href="#kstream-kstream-join" id="id15">KStream-KStream Join</a></li>
+                            <li><a class="reference internal" 
href="#ktable-ktable-join" id="id16">KTable-KTable Join</a></li>
+                            <li><a class="reference internal" 
href="#kstream-ktable-join" id="id17">KStream-KTable Join</a></li>
+                            <li><a class="reference internal" 
href="#kstream-globalktable-join" id="id18">KStream-GlobalKTable Join</a></li>
+                        </ul>
+                        </li>
+                        <li><a class="reference internal" href="#windowing" 
id="id19">Windowing</a><ul>
+                            <li><a class="reference internal" 
href="#tumbling-time-windows" id="id20">Tumbling time windows</a></li>
+                            <li><a class="reference internal" 
href="#hopping-time-windows" id="id21">Hopping time windows</a></li>
+                            <li><a class="reference internal" 
href="#sliding-time-windows" id="id22">Sliding time windows</a></li>
+                            <li><a class="reference internal" 
href="#session-windows" id="id23">Session Windows</a></li>
+                        </ul>
+                        </li>
+                    </ul>
+                    </li>
+                    <li><a class="reference internal" 
href="#applying-processors-and-transformers-processor-api-integration" 
id="id24">Applying processors and transformers (Processor API 
integration)</a></li>
+                </ul>
+                </li>
+                <li><a class="reference internal" 
href="#writing-streams-back-to-kafka" id="id25">Writing streams back to 
Kafka</a></li>
+            </ul>
+        </div>
+        <div class="section" id="overview">
+            <h2><a class="toc-backref" href="#id7">Overview</a><a 
class="headerlink" href="#overview" title="Permalink to this headline"></a></h2>
+            <p>In comparison to the <a class="reference internal" 
href="processor-api.html#streams-developer-guide-processor-api"><span 
class="std std-ref">Processor API</span></a>, only the DSL supports:</p>
+            <ul class="simple">
+                <li>Built-in abstractions for <a class="reference internal" 
href="../concepts.html#streams-concepts-duality"><span class="std 
std-ref">streams and tables</span></a> in the form of
+                    <a class="reference internal" 
href="../concepts.html#streams-concepts-kstream"><span class="std 
std-ref">KStream</span></a>, <a class="reference internal" 
href="../concepts.html#streams-concepts-ktable"><span class="std 
std-ref">KTable</span></a>, and
+                    <a class="reference internal" 
href="../concepts.html#streams-concepts-globalktable"><span class="std 
std-ref">GlobalKTable</span></a>.  Having first-class support for streams and 
tables is crucial
+                    because, in practice, most use cases require not just 
either streams or databases/tables, but a combination of both.
+                    For example, if your use case is to create a customer 
360-degree view that is updated in real-time, what your
+                    application will be doing is transforming many input 
<em>streams</em> of customer-related events into an output <em>table</em>
+                    that contains a continuously updated 360-degree view of 
your customers.</li>
+                <li>Declarative, functional programming style with
+                    <a class="reference internal" 
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std 
std-ref">stateless transformations</span></a> (e.g. <code class="docutils 
literal"><span class="pre">map</span></code> and <code class="docutils 
literal"><span class="pre">filter</span></code>)
+                    as well as <a class="reference internal" 
href="#streams-developer-guide-dsl-transformations-stateful"><span class="std 
std-ref">stateful transformations</span></a>
+                    such as <a class="reference internal" 
href="#streams-developer-guide-dsl-aggregating"><span class="std 
std-ref">aggregations</span></a> (e.g. <code class="docutils literal"><span 
class="pre">count</span></code> and <code class="docutils literal"><span 
class="pre">reduce</span></code>),
+                    <a class="reference internal" 
href="#streams-developer-guide-dsl-joins"><span class="std 
std-ref">joins</span></a> (e.g. <code class="docutils literal"><span 
class="pre">leftJoin</span></code>), and
+                    <a class="reference internal" 
href="#streams-developer-guide-dsl-windowing"><span class="std 
std-ref">windowing</span></a> (e.g. <a class="reference internal" 
href="#windowing-session"><span class="std std-ref">session 
windows</span></a>).</li>
+            </ul>
+            <p>With the DSL, you can define <a class="reference internal" 
href="../concepts.html#streams-concepts-processor-topology"><span class="std 
std-ref">processor topologies</span></a> (i.e., the logical
+                processing plan) in your application. The steps to accomplish 
this are:</p>
+            <ol class="arabic simple">
+                <li>Specify <a class="reference internal" 
href="#streams-developer-guide-dsl-sources"><span class="std std-ref">one or 
more input streams that are read from Kafka topics</span></a>.</li>
+                <li>Compose <a class="reference internal" 
href="#streams-developer-guide-dsl-transformations"><span class="std 
std-ref">transformations</span></a> on these streams.</li>
+                <li>Write the <a class="reference internal" 
href="#streams-developer-guide-dsl-destinations"><span class="std 
std-ref">resulting output streams back to Kafka topics</span></a>, or expose 
the processing results of your application directly to other applications 
through <a class="reference internal" 
href="interactive-queries.html#streams-developer-guide-interactive-queries"><span
 class="std std-ref">interactive queries</span></a> (e.g., via a REST API).</li>
+            </ol>
+            <p>After the application is run, the defined processor topologies 
are continuously executed (i.e., the processing plan is put into
+                action). A step-by-step guide for writing a stream processing 
application using the DSL is provided below.</p>
+            <p>For a complete list of available API functionality, see also 
the <a class="reference internal" 
href="../javadocs.html#streams-javadocs"><span class="std std-ref">Kafka 
Streams Javadocs</span></a>.</p>
+        </div>
+        <div class="section" id="creating-source-streams-from-kafka">
+            <span id="streams-developer-guide-dsl-sources"></span><h2><a 
class="toc-backref" href="#id8">Creating source streams from Kafka</a><a 
class="headerlink" href="#creating-source-streams-from-kafka" title="Permalink 
to this headline"></a></h2>
+            <p>You can easily read data from Kafka topics into your 
application.  The following operations are supported.</p>
+            <table border="1" class="non-scrolling-table width-100-percent 
docutils">
+                <colgroup>
+                    <col width="22%" />
+                    <col width="78%" />
+                </colgroup>
+                <thead valign="bottom">
+                <tr class="row-odd"><th class="head">Reading from Kafka</th>
+                    <th class="head">Description</th>
+                </tr>
+                </thead>
+                <tbody valign="top">
+                <tr class="row-even"><td><p 
class="first"><strong>Stream</strong></p>
+                    <ul class="last simple">
+                        <li><em>input topics</em> &rarr; KStream</li>
+                    </ul>
+                </td>
+                    <td><p class="first">Creates a <a class="reference 
internal" href="../concepts.html#streams-concepts-kstream"><span class="std 
std-ref">KStream</span></a> from the specified Kafka input topics and 
interprets the data
+                        as a <a class="reference internal" 
href="../concepts.html#streams-concepts-kstream"><span class="std 
std-ref">record stream</span></a>.
+                        A <code class="docutils literal"><span 
class="pre">KStream</span></code> represents a <em>partitioned</em> record 
stream.
+                        <a class="reference external" 
href="../javadocs/org/apache/kafka/streams/StreamsBuilder.html#stream(java.lang.String)">(details)</a></p>
+                        <p>In the case of a KStream, the local KStream 
instance of every application instance will
+                            be populated with data from only <strong>a 
subset</strong> of the partitions of the input topic.  Collectively, across
+                            all application instances, all input topic 
partitions are read and processed.</p>
+                        <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="kn">import</span> <span 
class="nn">org.apache.kafka.common.serialization.Serdes</span><span 
class="o">;</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.StreamsBuilder</span><span 
class="o">;</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.kstream.KStream</span><span 
class="o">;</span>
+
+<span class="n">StreamsBuilder</span> <span class="n">builder</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">StreamsBuilder</span><span class="o">();</span>
+
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;</span> <span 
class="n">wordCounts</span> <span class="o">=</span> <span 
class="n">builder</span><span class="o">.</span><span 
class="na">stream</span><span class="o">(</span>
+    <span class="s">&quot;word-counts-input-topic&quot;</span><span 
class="o">,</span> <span class="cm">/* input topic */</span>
+    <span class="n">Consumed</span><span class="o">.</span><span 
class="na">with</span><span class="o">(</span>
+      <span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">(),</span> <span class="cm">/* key 
serde */</span>
+      <span class="n">Serdes</span><span class="o">.</span><span 
class="na">Long</span><span class="o">()</span>   <span class="cm">/* value 
serde */</span>
+    <span class="o">);</span>
+</pre></div>
+                        </div>
+                        <p>If you do not specify SerDes explicitly, the 
default SerDes from the
+                            <a class="reference internal" 
href="config-streams.html#streams-developer-guide-configuration"><span 
class="std std-ref">configuration</span></a> are used.</p>
+                        <p>You <strong>must specify SerDes explicitly</strong> 
if the key or value types of the records in the Kafka input
+                            topics do not match the configured default SerDes. 
For information about configuring default SerDes, available
+                            SerDes, and implementing your own custom SerDes 
see <a class="reference internal" 
href="datatypes.html#streams-developer-guide-serdes"><span class="std 
std-ref">Data Types and Serialization</span></a>.</p>
+                        <p class="last">Several variants of <code 
class="docutils literal"><span class="pre">stream</span></code> exist, for 
example to specify a regex pattern for input topics to read from).</p>
+                    </td>
+                </tr>
+                <tr class="row-odd"><td><p 
class="first"><strong>Table</strong></p>
+                    <ul class="last simple">
+                        <li><em>input topic</em> &rarr; KTable</li>
+                    </ul>
+                </td>
+                    <td><p class="first">Reads the specified Kafka input topic 
into a <a class="reference internal" 
href="../concepts.html#streams-concepts-ktable"><span class="std 
std-ref">KTable</span></a>.  The topic is
+                        interpreted as a changelog stream, where records with 
the same key are interpreted as UPSERT aka INSERT/UPDATE
+                        (when the record value is not <code class="docutils 
literal"><span class="pre">null</span></code>) or as DELETE (when the value is 
<code class="docutils literal"><span class="pre">null</span></code>) for that 
key.
+                        <a class="reference external" 
href="../javadocs/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String(java.lang.String)">(details)</a></p>
+                        <p>In the case of a KStream, the local KStream 
instance of every application instance will
+                            be populated with data from only <strong>a 
subset</strong> of the partitions of the input topic.  Collectively, across
+                            all application instances, all input topic 
partitions are read and processed.</p>
+                        <p>You must provide a name for the table (more 
precisely, for the internal
+                            <a class="reference internal" 
href="../architecture.html#streams-architecture-state"><span class="std 
std-ref">state store</span></a> that backs the table).  This is required for
+                            supporting <a class="reference internal" 
href="interactive-queries.html#streams-developer-guide-interactive-queries"><span
 class="std std-ref">interactive queries</span></a> against the table. When a
+                            name is not provided the table will not queryable 
and an internal name will be provided for the state store.</p>
+                        <p>If you do not specify SerDes explicitly, the 
default SerDes from the
+                            <a class="reference internal" 
href="config-streams.html#streams-developer-guide-configuration"><span 
class="std std-ref">configuration</span></a> are used.</p>
+                        <p>You <strong>must specify SerDes explicitly</strong> 
if the key or value types of the records in the Kafka input
+                            topics do not match the configured default SerDes. 
For information about configuring default SerDes, available
+                            SerDes, and implementing your own custom SerDes 
see <a class="reference internal" 
href="datatypes.html#streams-developer-guide-serdes"><span class="std 
std-ref">Data Types and Serialization</span></a>.</p>
+                        <p class="last">Several variants of <code 
class="docutils literal"><span class="pre">table</span></code> exist, for 
example to specify the <code class="docutils literal"><span 
class="pre">auto.offset.reset</span></code> policy to be used when
+                            reading from the input topic.</p>
+                    </td>
+                </tr>
+                <tr class="row-even"><td><p class="first"><strong>Global 
Table</strong></p>
+                    <ul class="last simple">
+                        <li><em>input topic</em> &rarr; GlobalKTable</li>
+                    </ul>
+                </td>
+                    <td><p class="first">Reads the specified Kafka input topic 
into a <a class="reference internal" 
href="../concepts.html#streams-concepts-globalktable"><span class="std 
std-ref">GlobalKTable</span></a>.  The topic is
+                        interpreted as a changelog stream, where records with 
the same key are interpreted as UPSERT aka INSERT/UPDATE
+                        (when the record value is not <code class="docutils 
literal"><span class="pre">null</span></code>) or as DELETE (when the value is 
<code class="docutils literal"><span class="pre">null</span></code>) for that 
key.
+                        <a class="reference external" 
href="../javadocs/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String(java.lang.String)">(details)</a></p>
+                        <p>In the case of a GlobalKTable, the local 
GlobalKTable instance of every application instance will
+                            be populated with data from only <strong>a 
subset</strong> of the partitions of the input topic.  Collectively, across
+                            all application instances, all input topic 
partitions are read and processed.</p>
+                        <p>You must provide a name for the table (more 
precisely, for the internal
+                            <a class="reference internal" 
href="../architecture.html#streams-architecture-state"><span class="std 
std-ref">state store</span></a> that backs the table).  This is required for
+                            supporting <a class="reference internal" 
href="interactive-queries.html#streams-developer-guide-interactive-queries"><span
 class="std std-ref">interactive queries</span></a> against the table. When a
+                            name is not provided the table will not queryable 
and an internal name will be provided for the state store.</p>
+                        <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="kn">import</span> <span 
class="nn">org.apache.kafka.common.serialization.Serdes</span><span 
class="o">;</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.StreamsBuilder</span><span 
class="o">;</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.kstream.GlobalKTable</span><span 
class="o">;</span>
+
+<span class="n">StreamsBuilder</span> <span class="n">builder</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">StreamsBuilder</span><span class="o">();</span>
+
+<span class="n">GlobalKTable</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;</span> <span 
class="n">wordCounts</span> <span class="o">=</span> <span 
class="n">builder</span><span class="o">.</span><span 
class="na">globalTable</span><span class="o">(</span>
+    <span class="s">&quot;word-counts-input-topic&quot;</span><span 
class="o">,</span>
+    <span class="n">Materialized</span><span class="o">.&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">,</span> <span 
class="n">KeyValueStore</span><span class="o">&lt;</span><span 
class="n">Bytes</span><span class="o">,</span> <span 
class="kt">byte</span><span class="o">[]&gt;&gt;</span><span 
class="n">as</span><span class="o">(</span>
+      <span class="s">&quot;word-counts-global-store&quot;</span> <span 
class="cm">/* table/store name */</span><span class="o">)</span>
+      <span class="o">.</span><span class="na">withKeySerde</span><span 
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">())</span> <span class="cm">/* key 
serde */</span>
+      <span class="o">.</span><span class="na">withValueSerde</span><span 
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span 
class="na">Long</span><span class="o">())</span> <span class="cm">/* value 
serde */</span>
+    <span class="o">);</span>
+</pre></div>
+                        </div>
+                        <p>You <strong>must specify SerDes explicitly</strong> 
if the key or value types of the records in the Kafka input
+                            topics do not match the configured default SerDes. 
For information about configuring default SerDes, available
+                            SerDes, and implementing your own custom SerDes 
see <a class="reference internal" 
href="datatypes.html#streams-developer-guide-serdes"><span class="std 
std-ref">Data Types and Serialization</span></a>.</p>
+                        <p class="last">Several variants of <code 
class="docutils literal"><span class="pre">globalTable</span></code> exist to 
e.g. specify explicit SerDes.</p>
+                    </td>
+                </tr>
+                </tbody>
+            </table>
+        </div>
+        <div class="section" id="transform-a-stream">
+            <span 
id="streams-developer-guide-dsl-transformations"></span><h2><a 
class="toc-backref" href="#id9">Transform a stream</a><a class="headerlink" 
href="#transform-a-stream" title="Permalink to this headline"></a></h2>
+            <p>The KStream and KTable interfaces support a variety of 
transformation operations.
+                Each of these operations can be translated into one or more 
connected processors into the underlying processor topology.
+                Since KStream and KTable are strongly typed, all of these 
transformation operations are defined as
+                generic functions where users could specify the input and 
output data types.</p>
+            <p>Some KStream transformations may generate one or more KStream 
objects, for example:
+                - <code class="docutils literal"><span 
class="pre">filter</span></code> and <code class="docutils literal"><span 
class="pre">map</span></code> on a KStream will generate another KStream
+                - <code class="docutils literal"><span 
class="pre">branch</span></code> on KStream can generate multiple KStreams</p>
+            <p>Some others may generate a KTable object, for example an 
aggregation of a KStream also yields a KTable. This allows Kafka Streams to 
continuously update the computed value upon arrivals of <a class="reference 
internal" href="../concepts.html#streams-concepts-aggregations"><span 
class="std std-ref">late records</span></a> after it
+                has already been produced to the downstream transformation 
operators.</p>
+            <p>All KTable transformation operations can only generate another 
KTable. However, the Kafka Streams DSL does provide a special function
+                that converts a KTable representation into a KStream. All of 
these transformation methods can be chained together to compose
+                a complex processor topology.</p>
+            <p>These transformation operations are described in the following 
subsections:</p>
+            <ul class="simple">
+                <li><a class="reference internal" 
href="#streams-developer-guide-dsl-transformations-stateless"><span class="std 
std-ref">Stateless transformations</span></a></li>
+                <li><a class="reference internal" 
href="#streams-developer-guide-dsl-transformations-stateful"><span class="std 
std-ref">Stateful transformations</span></a></li>
+            </ul>
+            <div class="section" id="stateless-transformations">
+                <span 
id="streams-developer-guide-dsl-transformations-stateless"></span><h3><a 
class="toc-backref" href="#id10">Stateless transformations</a><a 
class="headerlink" href="#stateless-transformations" title="Permalink to this 
headline"></a></h3>
+                <p>Stateless transformations do not require state for 
processing and they do not require a state store associated with
+                    the stream processor. Kafka 0.11.0 and later allows you to 
materialize the result from a stateless <code class="docutils literal"><span 
class="pre">KTable</span></code> transformation. This allows the result to be 
queried through <a class="reference internal" 
href="interactive-queries.html#streams-developer-guide-interactive-queries"><span
 class="std std-ref">interactive queries</span></a>. To materialize a <code 
class="docutils literal"><span class="pre">KTable</span></code>, each of the 
below stateless operations <a class="reference internal" 
href="interactive-queries.html#streams-developer-guide-interactive-queries-local-key-value-stores"><span
 class="std std-ref">can be augmented</span></a> with an optional <code 
class="docutils literal"><span class="pre">queryableStoreName</span></code> 
argument.</p>
+                <table border="1" class="non-scrolling-table width-100-percent 
docutils">
+                    <colgroup>
+                        <col width="22%" />
+                        <col width="78%" />
+                    </colgroup>
+                    <thead valign="bottom">
+                    <tr class="row-odd"><th class="head">Transformation</th>
+                        <th class="head">Description</th>
+                    </tr>
+                    </thead>
+                    <tbody valign="top">
+                    <tr class="row-even"><td><p 
class="first"><strong>Branch</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream[]</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Branch (or split) a <code 
class="docutils literal"><span class="pre">KStream</span></code> based on the 
supplied predicates into one or more <code class="docutils literal"><span 
class="pre">KStream</span></code> instances.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#branch-org.apache.kafka.streams.kstream.Predicate...-">details</a>)</p>
+                            <p>Predicates are evaluated in order.  A record is 
placed to one and only one output stream on the first match:
+                                if the n-th predicate evaluates to true, the 
record is placed to n-th stream.  If no predicate matches, the
+                                the record is dropped.</p>
+                            <p>Branching is useful, for example, to route 
records to different downstream topics.</p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Long</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;[]</span> <span 
class="n">branches</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">branch</span><span class="o">(</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> 
<span class="n">value</span><span class="o">)</span> <span 
class="o">-&gt;</span> <span class="n">key</span><span class="o">.</span><span 
class="na">startsWith</span><span class="o">(</span><span 
class="s">&quot;A&quot;</span><span class="o">),</span> <span class="cm">/* 
first predicate  */</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> 
<span class="n">value</span><span class="o">)</span> <span 
class="o">-&gt;</span> <span class="n">key</span><span class="o">.</span><span 
class="na">startsWith</span><span class="o">(</span><span 
class="s">&quot;B&quot;</span><span class="o">),</span> <span class="cm">/* 
second predicate */</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> 
<span class="n">value</span><span class="o">)</span> <span 
class="o">-&gt;</span> <span class="kc">true</span>                 <span 
class="cm">/* third predicate  */</span>
+  <span class="o">);</span>
+
+<span class="c1">// KStream branches[0] contains all records whose keys start 
with &quot;A&quot;</span>
+<span class="c1">// KStream branches[1] contains all records whose keys start 
with &quot;B&quot;</span>
+<span class="c1">// KStream branches[2] contains all other records</span>
+
+<span class="c1">// Java 7 example: cf. `filter` for how to create `Predicate` 
instances</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p 
class="first"><strong>Filter</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                            <li>KTable &rarr; KTable</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Evaluates a boolean function for 
each element and retains those for which the function returns true.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#filter-org.apache.kafka.streams.kstream.Predicate-">KStream
 details</a>,
+                            <a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#filter-org.apache.kafka.streams.kstream.Predicate-">KTable
 details</a>)</p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Long</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// A filter that selects (keeps) only positive numbers</span>
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;</span> <span 
class="n">onlyPositives</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">filter</span><span class="o">((</span><span 
class="n">key</span><span class="o">,</span> <span class="n">value</span><span 
class="o">)</span> <span class="o">-&gt;</span> <span class="n">value</span> 
<span class="o">&gt;</span> <span class="mi">0</span><span class="o">);</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;</span> <span 
class="n">onlyPositives</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">filter</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">Predicate</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Long</span><span class="o">&gt;()</span> <span 
class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="kt">boolean</span> <span 
class="nf">test</span><span class="o">(</span><span class="n">String</span> 
<span class="n">key</span><span class="o">,</span> <span class="n">Long</span> 
<span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">value</span> <span 
class="o">&gt;</span> <span class="mi">0</span><span class="o">;</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-even"><td><p class="first"><strong>Inverse 
Filter</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                            <li>KTable &rarr; KTable</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Evaluates a boolean function for 
each element and drops those for which the function returns true.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#filterNot-org.apache.kafka.streams.kstream.Predicate-">KStream
 details</a>,
+                            <a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#filterNot-org.apache.kafka.streams.kstream.Predicate-">KTable
 details</a>)</p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Long</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// An inverse filter that discards any negative numbers or 
zero</span>
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;</span> <span 
class="n">onlyPositives</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">filterNot</span><span class="o">((</span><span 
class="n">key</span><span class="o">,</span> <span class="n">value</span><span 
class="o">)</span> <span class="o">-&gt;</span> <span class="n">value</span> 
<span class="o">&lt;=</span> <span class="mi">0</span><span class="o">);</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;</span> <span 
class="n">onlyPositives</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">filterNot</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">Predicate</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Long</span><span class="o">&gt;()</span> <span 
class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="kt">boolean</span> <span 
class="nf">test</span><span class="o">(</span><span class="n">String</span> 
<span class="n">key</span><span class="o">,</span> <span class="n">Long</span> 
<span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">value</span> <span 
class="o">&lt;=</span> <span class="mi">0</span><span class="o">;</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p 
class="first"><strong>FlatMap</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Takes one record and produces 
zero, one, or more records.  You can modify the record keys and values, 
including
+                            their types.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#flatMap-org.apache.kafka.streams.kstream.KeyValueMapper-">details</a>)</p>
+                            <p><strong>Marks the stream for data 
re-partitioning:</strong>
+                                Applying a grouping or a join after <code 
class="docutils literal"><span class="pre">flatMap</span></code> will result in 
re-partitioning of the records.
+                                If possible use <code class="docutils 
literal"><span class="pre">flatMapValues</span></code> instead, which will not 
cause data re-partitioning.</p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KStream</span><span 
class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">transformed</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">flatMap</span><span class="o">(</span>
+     <span class="c1">// Here, we generate two output records for each input 
record.</span>
+     <span class="c1">// We also change the key and value types.</span>
+     <span class="c1">// Example: (345L, &quot;Hello&quot;) -&gt; 
(&quot;HELLO&quot;, 1000), (&quot;hello&quot;, 9000)</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> 
<span class="n">value</span><span class="o">)</span> <span 
class="o">-&gt;</span> <span class="o">{</span>
+      <span class="n">List</span><span class="o">&lt;</span><span 
class="n">KeyValue</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">result</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">LinkedList</span><span class="o">&lt;&gt;();</span>
+      <span class="n">result</span><span class="o">.</span><span 
class="na">add</span><span class="o">(</span><span 
class="n">KeyValue</span><span class="o">.</span><span 
class="na">pair</span><span class="o">(</span><span class="n">value</span><span 
class="o">.</span><span class="na">toUpperCase</span><span class="o">(),</span> 
<span class="mi">1000</span><span class="o">));</span>
+      <span class="n">result</span><span class="o">.</span><span 
class="na">add</span><span class="o">(</span><span 
class="n">KeyValue</span><span class="o">.</span><span 
class="na">pair</span><span class="o">(</span><span class="n">value</span><span 
class="o">.</span><span class="na">toLowerCase</span><span class="o">(),</span> 
<span class="mi">9000</span><span class="o">));</span>
+      <span class="k">return</span> <span class="n">result</span><span 
class="o">;</span>
+    <span class="o">}</span>
+  <span class="o">);</span>
+
+<span class="c1">// Java 7 example: cf. `map` for how to create 
`KeyValueMapper` instances</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-even"><td><p class="first"><strong>FlatMap 
(values only)</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Takes one record and produces 
zero, one, or more records, while retaining the key of the original record.
+                            You can modify the record values and the value 
type.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#flatMapValues-org.apache.kafka.streams.kstream.ValueMapper-">details</a>)</p>
+                            <p><code class="docutils literal"><span 
class="pre">flatMapValues</span></code> is preferable to <code class="docutils 
literal"><span class="pre">flatMap</span></code> because it will not cause data 
re-partitioning.  However, you
+                                cannot modify the key or key type like <code 
class="docutils literal"><span class="pre">flatMap</span></code> does.</p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="c1">// Split a sentence into 
words.</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="kt">byte</span><span class="o">[],</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">sentences</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="kt">byte</span><span class="o">[],</span> <span 
class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> 
<span class="o">=</span> <span class="n">sentences</span><span 
class="o">.</span><span class="na">flatMapValues</span><span 
class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> 
<span class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span 
class="n">value</span><span class="o">.</span><span 
class="na">split</span><span class="o">(</span><span 
class="s">&quot;\\s+&quot;</span><span class="o">)));</span>
+
+<span class="c1">// Java 7 example: cf. `mapValues` for how to create 
`ValueMapper` instances</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p 
class="first"><strong>Foreach</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; void</li>
+                            <li>KStream &rarr; void</li>
+                            <li>KTable &rarr; void</li>
+                        </ul>
+                    </td>
+                        <td><p class="first"><strong>Terminal 
operation.</strong>  Performs a stateless action on each record.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#foreach-org.apache.kafka.streams.kstream.ForeachAction-">details</a>)</p>
+                            <p>You would use <code class="docutils 
literal"><span class="pre">foreach</span></code> to cause <em>side effects</em> 
based on the input data (similar to <code class="docutils literal"><span 
class="pre">peek</span></code>) and then <em>stop</em>
+                                <em>further processing</em> of the input data 
(unlike <code class="docutils literal"><span class="pre">peek</span></code>, 
which is not a terminal operation).</p>
+                            <p><strong>Note on processing guarantees:</strong> 
Any side effects of an action (such as writing to external systems) are not
+                                trackable by Kafka, which means they will 
typically not benefit from  Kafka&#8217;s processing guarantees.</p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Long</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Print the contents of the KStream to the local 
console.</span>
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="n">stream</span><span class="o">.</span><span 
class="na">foreach</span><span class="o">((</span><span 
class="n">key</span><span class="o">,</span> <span class="n">value</span><span 
class="o">)</span> <span class="o">-&gt;</span> <span 
class="n">System</span><span class="o">.</span><span class="na">out</span><span 
class="o">.</span><span class="na">println</span><span class="o">(</span><span 
class="n">key</span> <span class="o">+</span> <span class="s">&quot; =&gt; 
&quot;</span> <span class="o">+</span> <span class="n">value</span><span 
class="o">));</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">stream</span><span class="o">.</span><span 
class="na">foreach</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">ForeachAction</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Long</span><span class="o">&gt;()</span> <span 
class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">apply</span><span class="o">(</span><span class="n">String</span> 
<span class="n">key</span><span class="o">,</span> <span class="n">Long</span> 
<span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="n">System</span><span class="o">.</span><span 
class="na">out</span><span class="o">.</span><span 
class="na">println</span><span class="o">(</span><span class="n">key</span> 
<span class="o">+</span> <span class="s">&quot; =&gt; &quot;</span> <span 
class="o">+</span> <span class="n">value</span><span class="o">);</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-even"><td><p 
class="first"><strong>GroupByKey</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KGroupedStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Groups the records by the 
existing key.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#groupByKey--">details</a>)</p>
+                            <p>Grouping is a prerequisite for <a 
class="reference internal" 
href="#streams-developer-guide-dsl-aggregating"><span class="std 
std-ref">aggregating a stream or a table</span></a>
+                                and ensures that data is properly partitioned 
(&#8220;keyed&#8221;) for subsequent operations.</p>
+                            <p><strong>When to set explicit SerDes:</strong>
+                                Variants of <code class="docutils 
literal"><span class="pre">groupByKey</span></code> exist to override the 
configured default SerDes of your application, which <strong>you</strong>
+                                <strong>must do</strong> if the key and/or 
value types of the resulting <code class="docutils literal"><span 
class="pre">KGroupedStream</span></code> do not match the configured default
+                                SerDes.</p>
+                            <div class="admonition note">
+                                <p class="first admonition-title">Note</p>
+                                <p class="last"><strong>Grouping vs. 
Windowing:</strong>
+                                    A related operation is <a class="reference 
internal" href="#streams-developer-guide-dsl-windowing"><span class="std 
std-ref">windowing</span></a>, which lets you control how to
+                                    &#8220;sub-group&#8221; the grouped 
records <em>of the same key</em> into so-called <em>windows</em> for stateful 
operations such as
+                                    windowed <a class="reference internal" 
href="#streams-developer-guide-dsl-aggregating"><span class="std 
std-ref">aggregations</span></a> or
+                                    windowed <a class="reference internal" 
href="#streams-developer-guide-dsl-joins"><span class="std 
std-ref">joins</span></a>.</p>
+                            </div>
+                            <p><strong>Causes data re-partitioning if and only 
if the stream was marked for re-partitioning.</strong>
+                                <code class="docutils literal"><span 
class="pre">groupByKey</span></code> is preferable to <code class="docutils 
literal"><span class="pre">groupBy</span></code> because it re-partitions data 
only if the stream was already marked
+                                for re-partitioning. However, <code 
class="docutils literal"><span class="pre">groupByKey</span></code> does not 
allow you to modify the key or key type like <code class="docutils 
literal"><span class="pre">groupBy</span></code>
+                                does.</p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KStream</span><span 
class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Group by the existing key, using the application&#39;s 
configured</span>
+<span class="c1">// default serdes for keys and values.</span>
+<span class="n">KGroupedStream</span><span class="o">&lt;</span><span 
class="kt">byte</span><span class="o">[],</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">groupedStream</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">groupByKey</span><span class="o">();</span>
+
+<span class="c1">// When the key and/or value types do not match the 
configured</span>
+<span class="c1">// default serdes, we must explicitly specify serdes.</span>
+<span class="n">KGroupedStream</span><span class="o">&lt;</span><span 
class="kt">byte</span><span class="o">[],</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">groupedStream</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">groupByKey</span><span class="o">(</span>
+    <span class="n">Serialized</span><span class="o">.</span><span 
class="na">with</span><span class="o">(</span>
+      <span class="n">Serdes</span><span class="o">.</span><span 
class="na">ByteArray</span><span class="o">(),</span> <span class="cm">/* key 
*/</span>
+      <span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">())</span>     <span class="cm">/* 
value */</span>
+  <span class="o">);</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p 
class="first"><strong>GroupBy</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KGroupedStream</li>
+                            <li>KTable &rarr; KGroupedTable</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Groups the records by a 
<em>new</em> key, which may be of a different key type.
+                            When grouping a table, you may also specify a new 
value and value type.
+                            <code class="docutils literal"><span 
class="pre">groupBy</span></code> is a shorthand for <code class="docutils 
literal"><span class="pre">selectKey(...).groupByKey()</span></code>.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#groupBy-org.apache.kafka.streams.kstream.KeyValueMapper-">KStream
 details</a>,
+                            <a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#groupBy-org.apache.kafka.streams.kstream.KeyValueMapper-">KTable
 details</a>)</p>
+                            <p>Grouping is a prerequisite for <a 
class="reference internal" 
href="#streams-developer-guide-dsl-aggregating"><span class="std 
std-ref">aggregating a stream or a table</span></a>
+                                and ensures that data is properly partitioned 
(&#8220;keyed&#8221;) for subsequent operations.</p>
+                            <p><strong>When to set explicit SerDes:</strong>
+                                Variants of <code class="docutils 
literal"><span class="pre">groupBy</span></code> exist to override the 
configured default SerDes of your application, which <strong>you must</strong>
+                                <strong>do</strong> if the key and/or value 
types of the resulting <code class="docutils literal"><span 
class="pre">KGroupedStream</span></code> or <code class="docutils 
literal"><span class="pre">KGroupedTable</span></code> do not match the
+                                configured default SerDes.</p>
+                            <div class="admonition note">
+                                <p class="first admonition-title">Note</p>
+                                <p class="last"><strong>Grouping vs. 
Windowing:</strong>
+                                    A related operation is <a class="reference 
internal" href="#streams-developer-guide-dsl-windowing"><span class="std 
std-ref">windowing</span></a>, which lets you control how to
+                                    &#8220;sub-group&#8221; the grouped 
records <em>of the same key</em> into so-called <em>windows</em> for stateful 
operations such as
+                                    windowed <a class="reference internal" 
href="#streams-developer-guide-dsl-aggregating"><span class="std 
std-ref">aggregations</span></a> or
+                                    windowed <a class="reference internal" 
href="#streams-developer-guide-dsl-joins"><span class="std 
std-ref">joins</span></a>.</p>
+                            </div>
+                            <p><strong>Always causes data 
re-partitioning:</strong>  <code class="docutils literal"><span 
class="pre">groupBy</span></code> always causes data re-partitioning.
+                                If possible use <code class="docutils 
literal"><span class="pre">groupByKey</span></code> instead, which will 
re-partition data only if required.</p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KStream</span><span 
class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">KTable</span><span class="o">&lt;</span><span 
class="kt">byte</span><span class="o">[],</span> <span 
class="n">String</span><span class="o">&gt;</span> <span class="n">table</span> 
<span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Java 8+ examples, using lambda expressions</span>
+
+<span class="c1">// Group the stream by a new key and key type</span>
+<span class="n">KGroupedStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">groupedStream</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">groupBy</span><span class="o">(</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> 
<span class="n">value</span><span class="o">)</span> <span 
class="o">-&gt;</span> <span class="n">value</span><span class="o">,</span>
+    <span class="n">Serialized</span><span class="o">.</span><span 
class="na">with</span><span class="o">(</span>
+      <span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">(),</span> <span class="cm">/* key 
(note: type was modified) */</span>
+      <span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">())</span>  <span class="cm">/* value 
*/</span>
+  <span class="o">);</span>
+
+<span class="c1">// Group the table by a new key and key type, and also modify 
the value and value type.</span>
+<span class="n">KGroupedTable</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">groupedTable</span> <span class="o">=</span> <span 
class="n">table</span><span class="o">.</span><span 
class="na">groupBy</span><span class="o">(</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> 
<span class="n">value</span><span class="o">)</span> <span 
class="o">-&gt;</span> <span class="n">KeyValue</span><span 
class="o">.</span><span class="na">pair</span><span class="o">(</span><span 
class="n">value</span><span class="o">,</span> <span 
class="n">value</span><span class="o">.</span><span 
class="na">length</span><span class="o">()),</span>
+    <span class="n">Serialized</span><span class="o">.</span><span 
class="na">with</span><span class="o">(</span>
+      <span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">(),</span> <span class="cm">/* key 
(note: type was modified) */</span>
+      <span class="n">Serdes</span><span class="o">.</span><span 
class="na">Integer</span><span class="o">())</span> <span class="cm">/* value 
(note: type was modified) */</span>
+  <span class="o">);</span>
+
+
+<span class="c1">// Java 7 examples</span>
+
+<span class="c1">// Group the stream by a new key and key type</span>
+<span class="n">KGroupedStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">groupedStream</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">groupBy</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">KeyValueMapper</span><span 
class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> 
<span class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;()</span> <span 
class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="n">String</span> <span 
class="nf">apply</span><span class="o">(</span><span 
class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span 
class="o">,</span> <span class="n">String</span> <span 
class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">value</span><span 
class="o">;</span>
+      <span class="o">}</span>
+    <span class="o">},</span>
+    <span class="n">Serialized</span><span class="o">.</span><span 
class="na">with</span><span class="o">(</span>
+      <span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">(),</span> <span class="cm">/* key 
(note: type was modified) */</span>
+      <span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">())</span>  <span class="cm">/* value 
*/</span>
+  <span class="o">);</span>
+
+<span class="c1">// Group the table by a new key and key type, and also modify 
the value and value type.</span>
+<span class="n">KGroupedTable</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">groupedTable</span> <span class="o">=</span> <span 
class="n">table</span><span class="o">.</span><span 
class="na">groupBy</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">KeyValueMapper</span><span 
class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> 
<span class="n">String</span><span class="o">,</span> <span 
class="n">KeyValue</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;()</span> <span 
class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="n">KeyValue</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Integer</span><span class="o">&gt;</span> <span 
class="nf">apply</span><span class="o">(</span><span 
class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span 
class="o">,</span> <span class="n">String</span> <span 
class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">KeyValue</span><span 
class="o">.</span><span class="na">pair</span><span class="o">(</span><span 
class="n">value</span><span class="o">,</span> <span 
class="n">value</span><span class="o">.</span><span 
class="na">length</span><span class="o">());</span>
+      <span class="o">}</span>
+    <span class="o">},</span>
+    <span class="n">Serialized</span><span class="o">.</span><span 
class="na">with</span><span class="o">(</span>
+      <span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">(),</span> <span class="cm">/* key 
(note: type was modified) */</span>
+      <span class="n">Serdes</span><span class="o">.</span><span 
class="na">Integer</span><span class="o">())</span> <span class="cm">/* value 
(note: type was modified) */</span>
+  <span class="o">);</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-even"><td><p 
class="first"><strong>Map</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Takes one record and produces one 
record.  You can modify the record key and value, including their types.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#map-org.apache.kafka.streams.kstream.KeyValueMapper-">details</a>)</p>
+                            <p><strong>Marks the stream for data 
re-partitioning:</strong>
+                                Applying a grouping or a join after <code 
class="docutils literal"><span class="pre">map</span></code> will result in 
re-partitioning of the records.
+                                If possible use <code class="docutils 
literal"><span class="pre">mapValues</span></code> instead, which will not 
cause data re-partitioning.</p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KStream</span><span 
class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="c1">// Note how we change the key and the key type (similar to 
`selectKey`)</span>
+<span class="c1">// as well as the value and the value type.</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">transformed</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span class="na">map</span><span 
class="o">(</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> 
<span class="n">value</span><span class="o">)</span> <span 
class="o">-&gt;</span> <span class="n">KeyValue</span><span 
class="o">.</span><span class="na">pair</span><span class="o">(</span><span 
class="n">value</span><span class="o">.</span><span 
class="na">toLowerCase</span><span class="o">(),</span> <span 
class="n">value</span><span class="o">.</span><span 
class="na">length</span><span class="o">()));</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">transformed</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span class="na">map</span><span 
class="o">(</span>
+    <span class="k">new</span> <span class="n">KeyValueMapper</span><span 
class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> 
<span class="n">String</span><span class="o">,</span> <span 
class="n">KeyValue</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;()</span> <span 
class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="n">KeyValue</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Integer</span><span class="o">&gt;</span> <span 
class="nf">apply</span><span class="o">(</span><span 
class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span 
class="o">,</span> <span class="n">String</span> <span 
class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="k">new</span> <span 
class="n">KeyValue</span><span class="o">&lt;&gt;(</span><span 
class="n">value</span><span class="o">.</span><span 
class="na">toLowerCase</span><span class="o">(),</span> <span 
class="n">value</span><span class="o">.</span><span 
class="na">length</span><span class="o">());</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p class="first"><strong>Map 
(values only)</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                            <li>KTable &rarr; KTable</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Takes one record and produces one 
record, while retaining the key of the original record.
+                            You can modify the record value and the value type.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#mapValues-org.apache.kafka.streams.kstream.ValueMapper-">KStream
 details</a>,
+                            <a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#mapValues-org.apache.kafka.streams.kstream.ValueMapper-">KTable
 details</a>)</p>
+                            <p><code class="docutils literal"><span 
class="pre">mapValues</span></code> is preferable to <code class="docutils 
literal"><span class="pre">map</span></code> because it will not cause data 
re-partitioning.  However, it does not
+                                allow you to modify the key or key type like 
<code class="docutils literal"><span class="pre">map</span></code> does.</p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KStream</span><span 
class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="kt">byte</span><span class="o">[],</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">uppercased</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">mapValues</span><span class="o">(</span><span class="n">value</span> 
<span class="o">-&gt;</span> <span class="n">value</span><span 
class="o">.</span><span class="na">toUpperCase</span><span class="o">());</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="kt">byte</span><span class="o">[],</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">uppercased</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">mapValues</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">ValueMapper</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">&gt;()</span> 
<span class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="n">String</span> <span 
class="nf">apply</span><span class="o">(</span><span class="n">String</span> 
<span class="n">s</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">s</span><span 
class="o">.</span><span class="na">toUpperCase</span><span class="o">();</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-even"><td><p 
class="first"><strong>Peek</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Performs a stateless action on 
each record, and returns an unchanged stream.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#peek-org.apache.kafka.streams.kstream.ForeachAction-">details</a>)</p>
+                            <p>You would use <code class="docutils 
literal"><span class="pre">peek</span></code> to cause <em>side effects</em> 
based on the input data (similar to <code class="docutils literal"><span 
class="pre">foreach</span></code>) and <em>continue</em>
+                                <em>processing</em> the input data (unlike 
<code class="docutils literal"><span class="pre">foreach</span></code>, which 
is a terminal operation).  <code class="docutils literal"><span 
class="pre">peek</span></code> returns the input
+                                stream as-is;  if you need to modify the input 
stream, use <code class="docutils literal"><span class="pre">map</span></code> 
or <code class="docutils literal"><span class="pre">mapValues</span></code> 
instead.</p>
+                            <p><code class="docutils literal"><span 
class="pre">peek</span></code> is helpful for use cases such as logging or 
tracking metrics or for debugging and troubleshooting.</p>
+                            <p><strong>Note on processing guarantees:</strong> 
Any side effects of an action (such as writing to external systems) are not
+                                trackable by Kafka, which means they will 
typically not benefit from Kafka&#8217;s processing guarantees.</p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KStream</span><span 
class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="kt">byte</span><span class="o">[],</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">unmodifiedStream</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">peek</span><span class="o">(</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> 
<span class="n">value</span><span class="o">)</span> <span 
class="o">-&gt;</span> <span class="n">System</span><span 
class="o">.</span><span class="na">out</span><span class="o">.</span><span 
class="na">println</span><span class="o">(</span><span 
class="s">&quot;key=&quot;</span> <span class="o">+</span> <span 
class="n">key</span> <span class="o">+</span> <span class="s">&quot;, 
value=&quot;</span> <span class="o">+</span> <span class="n">value</span><span 
class="o">));</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="kt">byte</span><span class="o">[],</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">unmodifiedStream</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">peek</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">ForeachAction</span><span 
class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> 
<span class="n">String</span><span class="o">&gt;()</span> <span 
class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">apply</span><span class="o">(</span><span 
class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span 
class="o">,</span> <span class="n">String</span> <span 
class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="n">System</span><span class="o">.</span><span 
class="na">out</span><span class="o">.</span><span 
class="na">println</span><span class="o">(</span><span 
class="s">&quot;key=&quot;</span> <span class="o">+</span> <span 
class="n">key</span> <span class="o">+</span> <span class="s">&quot;, 
value=&quot;</span> <span class="o">+</span> <span class="n">value</span><span 
class="o">);</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p 
class="first"><strong>Print</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; void</li>
+                        </ul>
+                    </td>
+                        <td><p class="first"><strong>Terminal 
operation.</strong>  Prints the records to <code class="docutils literal"><span 
class="pre">System.out</span></code>.  See Javadocs for serde and <code 
class="docutils literal"><span class="pre">toString()</span></code>
+                            caveats.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#print--">details</a>)</p>
+                            <p>Calling <code class="docutils literal"><span 
class="pre">print()</span></code> is the same as calling <code class="docutils 
literal"><span class="pre">foreach((key,</span> <span class="pre">value)</span> 
<span class="pre">-&gt;</span> <span class="pre">System.out.println(key</span> 
<span class="pre">+</span> <span class="pre">&quot;,</span> <span 
class="pre">&quot;</span> <span class="pre">+</span> <span 
class="pre">value))</span></code></p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KStream</span><span 
class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="c1">// print to sysout</span>
+<span class="n">stream</span><span class="o">.</span><span 
class="na">print</span><span class="o">();</span>
+
+<span class="c1">// print to file with a custom label</span>
+<span class="n">stream</span><span class="o">.</span><span 
class="na">print</span><span class="o">(</span><span 
class="n">Printed</span><span class="o">.</span><span 
class="na">toFile</span><span class="o">(</span><span 
class="s">&quot;streams.out&quot;</span><span class="o">).</span><span 
class="na">withLabel</span><span class="o">(</span><span 
class="s">&quot;streams&quot;</span><span class="o">));</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-even"><td><p 
class="first"><strong>SelectKey</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Assigns a new key &#8211; 
possibly of a new key type &#8211; to each record.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#selectKey-org.apache.kafka.streams.kstream.KeyValueMapper-">details</a>)</p>
+                            <p>Calling <code class="docutils literal"><span 
class="pre">selectKey(mapper)</span></code> is the same as calling <code 
class="docutils literal"><span class="pre">map((key,</span> <span 
class="pre">value)</span> <span class="pre">-&gt;</span> <span 
class="pre">mapper(key,</span> <span class="pre">value),</span> <span 
class="pre">value)</span></code>.</p>
+                            <p><strong>Marks the stream for data 
re-partitioning:</strong>
+                                Applying a grouping or a join after <code 
class="docutils literal"><span class="pre">selectKey</span></code> will result 
in re-partitioning of the records.</p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KStream</span><span 
class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Derive a new record key from the record&#39;s value.  Note 
how the key type changes, too.</span>
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">rekeyed</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">selectKey</span><span class="o">((</span><span 
class="n">key</span><span class="o">,</span> <span class="n">value</span><span 
class="o">)</span> <span class="o">-&gt;</span> <span 
class="n">value</span><span class="o">.</span><span 
class="na">split</span><span class="o">(</span><span class="s">&quot; 
&quot;</span><span class="o">)[</span><span class="mi">0</span><span 
class="o">])</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">rekeyed</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">selectKey</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">KeyValueMapper</span><span 
class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> 
<span class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="n">String</span> <span 
class="nf">apply</span><span class="o">(</span><span 
class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span 
class="o">,</span> <span class="n">String</span> <span 
class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">value</span><span 
class="o">.</span><span class="na">split</span><span class="o">(</span><span 
class="s">&quot; &quot;</span><span class="o">)[</span><span 
class="mi">0</span><span class="o">];</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p class="first"><strong>Table to 
Stream</strong></p>
+                        <ul class="last simple">
+                            <li>KTable &rarr; KStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Get the changelog stream of this 
table.
+                            (<a class="reference external" 
href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#toStream--">details</a>)</p>
+                            <div class="last highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KTable</span><span 
class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">table</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Also, a variant of `toStream` exists that allows you</span>
+<span class="c1">// to select a new key for the resulting stream.</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="kt">byte</span><span class="o">[],</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span 
class="n">table</span><span class="o">.</span><span 
class="na">toStream</span><span class="o">();</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    </tbody>
+                </table>
+            </div>
+            <div class="section" id="stateful-transformations">
+                <span 
id="streams-developer-guide-dsl-transformations-stateful"></span><h3><a 
class="toc-backref" href="#id11">Stateful transformations</a><a 
class="headerlink" href="#stateful-transformations" title="Permalink to this 
headline"></a></h3>
+                <p 
id="streams-developer-guide-dsl-transformations-stateful-overview">Stateful 
transformations depend on state for processing inputs and producing outputs and 
require a <a class="reference internal" 
href="../architecture.html#streams-architecture-state"><span class="std 
std-ref">state store</span></a> associated with the stream processor. For 
example, in aggregating operations, a windowing state store is used to collect 
the latest aggregation results per
+                    window. In join operations, a windowing state store is 
used to collect all of the records received so far within the
+                    defined window boundary.</p>
+                <p>Note, that state stores are fault-tolerant.
+                    In case of failure, Kafka Streams guarantees to fully 
restore all state stores prior to resuming the processing.
+                    See <a class="reference internal" 
href="../architecture.html#streams-architecture-fault-tolerance"><span 
class="std std-ref">Fault Tolerance</span></a> for further information.</p>
+                <p>Available stateful transformations in the DSL include:</p>
+                <ul class="simple">
+                    <li><a class="reference internal" 
href="#streams-developer-guide-dsl-aggregating"><span class="std 
std-ref">Aggregating</span></a></li>
+                    <li><a class="reference internal" 
href="#streams-developer-guide-dsl-joins"><span class="std 
std-ref">Joining</span></a></li>
+                    <li><a class="reference internal" 
href="#streams-developer-guide-dsl-windowing"><span class="std 
std-ref">Windowing</span></a> (as part of aggregations and joins)</li>
+                    <li><a class="reference internal" 
href="#streams-developer-guide-dsl-process"><span class="std std-ref">Applying 
custom processors and transformers</span></a>, which may be stateful, for
+                        Processor API integration</li>
+                </ul>
+                <p>The following diagram shows their relationships:</p>
+                <div class="figure align-center" id="id2">
+                    <a class="reference internal image-reference" 
href="../../../images/streams-stateful_operations.png"><img 
alt="../../../images/streams-stateful_operations.png" 
src="../../../images/streams-stateful_operations.png" style="width: 400pt;" 
/></a>
+                    <p class="caption"><span class="caption-text">Stateful 
transformations in the DSL.</span></p>
+                </div>
+                <p>Here is an example of a stateful application: the WordCount 
algorithm.</p>
+                <p>WordCount example in Java 8+, using lambda expressions:</p>
+                <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="c1">// Assume the record 
values represent lines of text.  For the sake of this example, you can 
ignore</span>
+<span class="c1">// whatever may be stored in the record keys.</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">textLines</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="n">KStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;</span> <span 
class="n">wordCounts</span> <span class="o">=</span> <span 
class="n">textLines</span>
+    <span class="c1">// Split each text line, by whitespace, into words.  The 
text lines are the record</span>
+    <span class="c1">// values, i.e. you can ignore whatever data is in the 
record keys and thus invoke</span>
+    <span class="c1">// `flatMapValues` instead of the more generic 
`flatMap`.</span>
+    <span class="o">.</span><span class="na">flatMapValues</span><span 
class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> 
<span class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span 
class="n">value</span><span class="o">.</span><span 
class="na">toLowerCase</span><span class="o">().</span><span 
class="na">split</span><span class="o">(</span><span 
class="s">&quot;\\W+&quot;</span><span class="o">)))</span>
+    <span class="c1">// Group the stream by word to ensure the key of the 
record is the word.</span>
+    <span class="o">.</span><span class="na">groupBy</span><span 
class="o">((</span><span class="n">key</span><span class="o">,</span> <span 
class="n">word</span><span class="o">)</span> <span class="o">-&gt;</span> 
<span class="n">word</span><span class="o">)</span>
+    <span class="c1">// Count the occurrences of each word (record key).</span>
+    <span class="c1">//</span>
+    <span class="c1">// This will change the stream type from 
`KGroupedStream&lt;String, String&gt;` to</span>
+    <span class="c1">// `KTable&lt;String, Long&gt;` (word -&gt; count).</span>
+    <span class="o">.</span><span class="na">count</span><span 
class="o">()</span>
+    <span class="c1">// Convert the `KTable&lt;String, Long&gt;` into a 
`KStream&lt;String, Long&gt;`.</span>
+    <span class="o">.</span><span class="na">toStream</span><span 
class="o">();</span>
+</pre></div>
+                </div>
+                <p>WordCount example in

<TRUNCATED>

Reply via email to