http://git-wip-us.apache.org/repos/asf/kafka/blob/3e2fe17c/docs/streams/developer-guide/dsl-api.html ---------------------------------------------------------------------- diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html new file mode 100644 index 0000000..9cfb4bc --- /dev/null +++ b/docs/streams/developer-guide/dsl-api.html @@ -0,0 +1,3208 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<script><!--#include virtual="../../js/templateData.js" --></script> + +<script id="content-template" type="text/x-handlebars-template"> + <!-- h1>Developer Guide for Kafka Streams</h1 --> + <!-- div class="sub-nav-sticky"> + <div class="sticky-top"> + <div style="height:35px"> + <a href="/{{version}}/documentation/streams/">Introduction</a> + <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a> + <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a> + <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a> + <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a> + </div> + </div --> + </div> + + + <div class="section" id="streams-dsl"> + <span id="streams-developer-guide-dsl"></span><h1>Streams DSL<a class="headerlink" href="#streams-dsl" title="Permalink to this headline"></a></h1> + <p>The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is the recommended for + most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.</p> + <div class="contents local topic" id="table-of-contents"> + <p class="topic-title first">Table of Contents</p> + <ul class="simple"> + <li><a class="reference internal" href="#overview" id="id7">Overview</a></li> + <li><a class="reference internal" href="#creating-source-streams-from-kafka" id="id8">Creating source streams from Kafka</a></li> + <li><a class="reference internal" href="#transform-a-stream" id="id9">Transform a stream</a><ul> + <li><a class="reference internal" href="#stateless-transformations" id="id10">Stateless transformations</a></li> + <li><a class="reference internal" href="#stateful-transformations" id="id11">Stateful transformations</a><ul> + <li><a class="reference internal" href="#aggregating" id="id12">Aggregating</a></li> + <li><a class="reference internal" href="#joining" id="id13">Joining</a><ul> + <li><a class="reference internal" href="#join-co-partitioning-requirements" id="id14">Join co-partitioning requirements</a></li> + <li><a class="reference internal" href="#kstream-kstream-join" id="id15">KStream-KStream Join</a></li> + <li><a class="reference internal" href="#ktable-ktable-join" id="id16">KTable-KTable Join</a></li> + <li><a class="reference internal" href="#kstream-ktable-join" id="id17">KStream-KTable Join</a></li> + <li><a class="reference internal" href="#kstream-globalktable-join" id="id18">KStream-GlobalKTable Join</a></li> + </ul> + </li> + <li><a class="reference internal" href="#windowing" id="id19">Windowing</a><ul> + <li><a class="reference internal" href="#tumbling-time-windows" id="id20">Tumbling time windows</a></li> + <li><a class="reference internal" href="#hopping-time-windows" id="id21">Hopping time windows</a></li> + <li><a class="reference internal" href="#sliding-time-windows" id="id22">Sliding time windows</a></li> + <li><a class="reference internal" href="#session-windows" id="id23">Session Windows</a></li> + </ul> + </li> + </ul> + </li> + <li><a class="reference internal" href="#applying-processors-and-transformers-processor-api-integration" id="id24">Applying processors and transformers (Processor API integration)</a></li> + </ul> + </li> + <li><a class="reference internal" href="#writing-streams-back-to-kafka" id="id25">Writing streams back to Kafka</a></li> + </ul> + </div> + <div class="section" id="overview"> + <h2><a class="toc-backref" href="#id7">Overview</a><a class="headerlink" href="#overview" title="Permalink to this headline"></a></h2> + <p>In comparison to the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a>, only the DSL supports:</p> + <ul class="simple"> + <li>Built-in abstractions for <a class="reference internal" href="../concepts.html#streams-concepts-duality"><span class="std std-ref">streams and tables</span></a> in the form of + <a class="reference internal" href="../concepts.html#streams-concepts-kstream"><span class="std std-ref">KStream</span></a>, <a class="reference internal" href="../concepts.html#streams-concepts-ktable"><span class="std std-ref">KTable</span></a>, and + <a class="reference internal" href="../concepts.html#streams-concepts-globalktable"><span class="std std-ref">GlobalKTable</span></a>. Having first-class support for streams and tables is crucial + because, in practice, most use cases require not just either streams or databases/tables, but a combination of both. + For example, if your use case is to create a customer 360-degree view that is updated in real-time, what your + application will be doing is transforming many input <em>streams</em> of customer-related events into an output <em>table</em> + that contains a continuously updated 360-degree view of your customers.</li> + <li>Declarative, functional programming style with + <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"><span class="std std-ref">stateless transformations</span></a> (e.g. <code class="docutils literal"><span class="pre">map</span></code> and <code class="docutils literal"><span class="pre">filter</span></code>) + as well as <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateful"><span class="std std-ref">stateful transformations</span></a> + such as <a class="reference internal" href="#streams-developer-guide-dsl-aggregating"><span class="std std-ref">aggregations</span></a> (e.g. <code class="docutils literal"><span class="pre">count</span></code> and <code class="docutils literal"><span class="pre">reduce</span></code>), + <a class="reference internal" href="#streams-developer-guide-dsl-joins"><span class="std std-ref">joins</span></a> (e.g. <code class="docutils literal"><span class="pre">leftJoin</span></code>), and + <a class="reference internal" href="#streams-developer-guide-dsl-windowing"><span class="std std-ref">windowing</span></a> (e.g. <a class="reference internal" href="#windowing-session"><span class="std std-ref">session windows</span></a>).</li> + </ul> + <p>With the DSL, you can define <a class="reference internal" href="../concepts.html#streams-concepts-processor-topology"><span class="std std-ref">processor topologies</span></a> (i.e., the logical + processing plan) in your application. The steps to accomplish this are:</p> + <ol class="arabic simple"> + <li>Specify <a class="reference internal" href="#streams-developer-guide-dsl-sources"><span class="std std-ref">one or more input streams that are read from Kafka topics</span></a>.</li> + <li>Compose <a class="reference internal" href="#streams-developer-guide-dsl-transformations"><span class="std std-ref">transformations</span></a> on these streams.</li> + <li>Write the <a class="reference internal" href="#streams-developer-guide-dsl-destinations"><span class="std std-ref">resulting output streams back to Kafka topics</span></a>, or expose the processing results of your application directly to other applications through <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactive queries</span></a> (e.g., via a REST API).</li> + </ol> + <p>After the application is run, the defined processor topologies are continuously executed (i.e., the processing plan is put into + action). A step-by-step guide for writing a stream processing application using the DSL is provided below.</p> + <p>For a complete list of available API functionality, see also the <a class="reference internal" href="../javadocs.html#streams-javadocs"><span class="std std-ref">Kafka Streams Javadocs</span></a>.</p> + </div> + <div class="section" id="creating-source-streams-from-kafka"> + <span id="streams-developer-guide-dsl-sources"></span><h2><a class="toc-backref" href="#id8">Creating source streams from Kafka</a><a class="headerlink" href="#creating-source-streams-from-kafka" title="Permalink to this headline"></a></h2> + <p>You can easily read data from Kafka topics into your application. The following operations are supported.</p> + <table border="1" class="non-scrolling-table width-100-percent docutils"> + <colgroup> + <col width="22%" /> + <col width="78%" /> + </colgroup> + <thead valign="bottom"> + <tr class="row-odd"><th class="head">Reading from Kafka</th> + <th class="head">Description</th> + </tr> + </thead> + <tbody valign="top"> + <tr class="row-even"><td><p class="first"><strong>Stream</strong></p> + <ul class="last simple"> + <li><em>input topics</em> → KStream</li> + </ul> + </td> + <td><p class="first">Creates a <a class="reference internal" href="../concepts.html#streams-concepts-kstream"><span class="std std-ref">KStream</span></a> from the specified Kafka input topics and interprets the data + as a <a class="reference internal" href="../concepts.html#streams-concepts-kstream"><span class="std std-ref">record stream</span></a>. + A <code class="docutils literal"><span class="pre">KStream</span></code> represents a <em>partitioned</em> record stream. + <a class="reference external" href="../javadocs/org/apache/kafka/streams/StreamsBuilder.html#stream(java.lang.String)">(details)</a></p> + <p>In the case of a KStream, the local KStream instance of every application instance will + be populated with data from only <strong>a subset</strong> of the partitions of the input topic. Collectively, across + all application instances, all input topic partitions are read and processed.</p> + <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.StreamsBuilder</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.kstream.KStream</span><span class="o">;</span> + +<span class="n">StreamsBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StreamsBuilder</span><span class="o">();</span> + +<span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">builder</span><span class="o">.</span><span class="na">stream</span><span class="o">(</span> + <span class="s">"word-counts-input-topic"</span><span class="o">,</span> <span class="cm">/* input topic */</span> + <span class="n">Consumed</span><span class="o">.</span><span class="na">with</span><span class="o">(</span> + <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key serde */</span> + <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()</span> <span class="cm">/* value serde */</span> + <span class="o">);</span> +</pre></div> + </div> + <p>If you do not specify SerDes explicitly, the default SerDes from the + <a class="reference internal" href="config-streams.html#streams-developer-guide-configuration"><span class="std std-ref">configuration</span></a> are used.</p> + <p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input + topics do not match the configured default SerDes. For information about configuring default SerDes, available + SerDes, and implementing your own custom SerDes see <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data Types and Serialization</span></a>.</p> + <p class="last">Several variants of <code class="docutils literal"><span class="pre">stream</span></code> exist, for example to specify a regex pattern for input topics to read from).</p> + </td> + </tr> + <tr class="row-odd"><td><p class="first"><strong>Table</strong></p> + <ul class="last simple"> + <li><em>input topic</em> → KTable</li> + </ul> + </td> + <td><p class="first">Reads the specified Kafka input topic into a <a class="reference internal" href="../concepts.html#streams-concepts-ktable"><span class="std std-ref">KTable</span></a>. The topic is + interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE + (when the record value is not <code class="docutils literal"><span class="pre">null</span></code>) or as DELETE (when the value is <code class="docutils literal"><span class="pre">null</span></code>) for that key. + <a class="reference external" href="../javadocs/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String(java.lang.String)">(details)</a></p> + <p>In the case of a KStream, the local KStream instance of every application instance will + be populated with data from only <strong>a subset</strong> of the partitions of the input topic. Collectively, across + all application instances, all input topic partitions are read and processed.</p> + <p>You must provide a name for the table (more precisely, for the internal + <a class="reference internal" href="../architecture.html#streams-architecture-state"><span class="std std-ref">state store</span></a> that backs the table). This is required for + supporting <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactive queries</span></a> against the table. When a + name is not provided the table will not queryable and an internal name will be provided for the state store.</p> + <p>If you do not specify SerDes explicitly, the default SerDes from the + <a class="reference internal" href="config-streams.html#streams-developer-guide-configuration"><span class="std std-ref">configuration</span></a> are used.</p> + <p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input + topics do not match the configured default SerDes. For information about configuring default SerDes, available + SerDes, and implementing your own custom SerDes see <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data Types and Serialization</span></a>.</p> + <p class="last">Several variants of <code class="docutils literal"><span class="pre">table</span></code> exist, for example to specify the <code class="docutils literal"><span class="pre">auto.offset.reset</span></code> policy to be used when + reading from the input topic.</p> + </td> + </tr> + <tr class="row-even"><td><p class="first"><strong>Global Table</strong></p> + <ul class="last simple"> + <li><em>input topic</em> → GlobalKTable</li> + </ul> + </td> + <td><p class="first">Reads the specified Kafka input topic into a <a class="reference internal" href="../concepts.html#streams-concepts-globalktable"><span class="std std-ref">GlobalKTable</span></a>. The topic is + interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE + (when the record value is not <code class="docutils literal"><span class="pre">null</span></code>) or as DELETE (when the value is <code class="docutils literal"><span class="pre">null</span></code>) for that key. + <a class="reference external" href="../javadocs/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String(java.lang.String)">(details)</a></p> + <p>In the case of a GlobalKTable, the local GlobalKTable instance of every application instance will + be populated with data from only <strong>a subset</strong> of the partitions of the input topic. Collectively, across + all application instances, all input topic partitions are read and processed.</p> + <p>You must provide a name for the table (more precisely, for the internal + <a class="reference internal" href="../architecture.html#streams-architecture-state"><span class="std std-ref">state store</span></a> that backs the table). This is required for + supporting <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactive queries</span></a> against the table. When a + name is not provided the table will not queryable and an internal name will be provided for the state store.</p> + <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.StreamsBuilder</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.kstream.GlobalKTable</span><span class="o">;</span> + +<span class="n">StreamsBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StreamsBuilder</span><span class="o">();</span> + +<span class="n">GlobalKTable</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">builder</span><span class="o">.</span><span class="na">globalTable</span><span class="o">(</span> + <span class="s">"word-counts-input-topic"</span><span class="o">,</span> + <span class="n">Materialized</span><span class="o">.<</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">KeyValueStore</span><span class="o"><</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]>></span><span class="n">as</span><span class="o">(</span> + <span class="s">"word-counts-global-store"</span> <span class="cm">/* table/store name */</span><span class="o">)</span> + <span class="o">.</span><span class="na">withKeySerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* key serde */</span> + <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span> <span class="cm">/* value serde */</span> + <span class="o">);</span> +</pre></div> + </div> + <p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input + topics do not match the configured default SerDes. For information about configuring default SerDes, available + SerDes, and implementing your own custom SerDes see <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data Types and Serialization</span></a>.</p> + <p class="last">Several variants of <code class="docutils literal"><span class="pre">globalTable</span></code> exist to e.g. specify explicit SerDes.</p> + </td> + </tr> + </tbody> + </table> + </div> + <div class="section" id="transform-a-stream"> + <span id="streams-developer-guide-dsl-transformations"></span><h2><a class="toc-backref" href="#id9">Transform a stream</a><a class="headerlink" href="#transform-a-stream" title="Permalink to this headline"></a></h2> + <p>The KStream and KTable interfaces support a variety of transformation operations. + Each of these operations can be translated into one or more connected processors into the underlying processor topology. + Since KStream and KTable are strongly typed, all of these transformation operations are defined as + generic functions where users could specify the input and output data types.</p> + <p>Some KStream transformations may generate one or more KStream objects, for example: + - <code class="docutils literal"><span class="pre">filter</span></code> and <code class="docutils literal"><span class="pre">map</span></code> on a KStream will generate another KStream + - <code class="docutils literal"><span class="pre">branch</span></code> on KStream can generate multiple KStreams</p> + <p>Some others may generate a KTable object, for example an aggregation of a KStream also yields a KTable. This allows Kafka Streams to continuously update the computed value upon arrivals of <a class="reference internal" href="../concepts.html#streams-concepts-aggregations"><span class="std std-ref">late records</span></a> after it + has already been produced to the downstream transformation operators.</p> + <p>All KTable transformation operations can only generate another KTable. However, the Kafka Streams DSL does provide a special function + that converts a KTable representation into a KStream. All of these transformation methods can be chained together to compose + a complex processor topology.</p> + <p>These transformation operations are described in the following subsections:</p> + <ul class="simple"> + <li><a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"><span class="std std-ref">Stateless transformations</span></a></li> + <li><a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateful"><span class="std std-ref">Stateful transformations</span></a></li> + </ul> + <div class="section" id="stateless-transformations"> + <span id="streams-developer-guide-dsl-transformations-stateless"></span><h3><a class="toc-backref" href="#id10">Stateless transformations</a><a class="headerlink" href="#stateless-transformations" title="Permalink to this headline"></a></h3> + <p>Stateless transformations do not require state for processing and they do not require a state store associated with + the stream processor. Kafka 0.11.0 and later allows you to materialize the result from a stateless <code class="docutils literal"><span class="pre">KTable</span></code> transformation. This allows the result to be queried through <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactive queries</span></a>. To materialize a <code class="docutils literal"><span class="pre">KTable</span></code>, each of the below stateless operations <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries-local-key-value-stores"><span class="std std-ref">can be augmented</span></a> with an optional <code class="docutils literal"><span class="pre">queryableStoreName</span></code> argument.</p> + <table border="1" class="non-scrolling-table width-100-percent docutils"> + <colgroup> + <col width="22%" /> + <col width="78%" /> + </colgroup> + <thead valign="bottom"> + <tr class="row-odd"><th class="head">Transformation</th> + <th class="head">Description</th> + </tr> + </thead> + <tbody valign="top"> + <tr class="row-even"><td><p class="first"><strong>Branch</strong></p> + <ul class="last simple"> + <li>KStream → KStream[]</li> + </ul> + </td> + <td><p class="first">Branch (or split) a <code class="docutils literal"><span class="pre">KStream</span></code> based on the supplied predicates into one or more <code class="docutils literal"><span class="pre">KStream</span></code> instances. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#branch-org.apache.kafka.streams.kstream.Predicate...-">details</a>)</p> + <p>Predicates are evaluated in order. A record is placed to one and only one output stream on the first match: + if the n-th predicate evaluates to true, the record is placed to n-th stream. If no predicate matches, the + the record is dropped.</p> + <p>Branching is useful, for example, to route records to different downstream topics.</p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span> +<span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">>[]</span> <span class="n">branches</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">branch</span><span class="o">(</span> + <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-></span> <span class="n">key</span><span class="o">.</span><span class="na">startsWith</span><span class="o">(</span><span class="s">"A"</span><span class="o">),</span> <span class="cm">/* first predicate */</span> + <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-></span> <span class="n">key</span><span class="o">.</span><span class="na">startsWith</span><span class="o">(</span><span class="s">"B"</span><span class="o">),</span> <span class="cm">/* second predicate */</span> + <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-></span> <span class="kc">true</span> <span class="cm">/* third predicate */</span> + <span class="o">);</span> + +<span class="c1">// KStream branches[0] contains all records whose keys start with "A"</span> +<span class="c1">// KStream branches[1] contains all records whose keys start with "B"</span> +<span class="c1">// KStream branches[2] contains all other records</span> + +<span class="c1">// Java 7 example: cf. `filter` for how to create `Predicate` instances</span> +</pre></div> + </div> + </td> + </tr> + <tr class="row-odd"><td><p class="first"><strong>Filter</strong></p> + <ul class="last simple"> + <li>KStream → KStream</li> + <li>KTable → KTable</li> + </ul> + </td> + <td><p class="first">Evaluates a boolean function for each element and retains those for which the function returns true. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#filter-org.apache.kafka.streams.kstream.Predicate-">KStream details</a>, + <a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#filter-org.apache.kafka.streams.kstream.Predicate-">KTable details</a>)</p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span> + +<span class="c1">// A filter that selects (keeps) only positive numbers</span> +<span class="c1">// Java 8+ example, using lambda expressions</span> +<span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">onlyPositives</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">filter</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-></span> <span class="n">value</span> <span class="o">></span> <span class="mi">0</span><span class="o">);</span> + +<span class="c1">// Java 7 example</span> +<span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">onlyPositives</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span> + <span class="k">new</span> <span class="n">Predicate</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">test</span><span class="o">(</span><span class="n">String</span> <span class="n">key</span><span class="o">,</span> <span class="n">Long</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> + <span class="k">return</span> <span class="n">value</span> <span class="o">></span> <span class="mi">0</span><span class="o">;</span> + <span class="o">}</span> + <span class="o">});</span> +</pre></div> + </div> + </td> + </tr> + <tr class="row-even"><td><p class="first"><strong>Inverse Filter</strong></p> + <ul class="last simple"> + <li>KStream → KStream</li> + <li>KTable → KTable</li> + </ul> + </td> + <td><p class="first">Evaluates a boolean function for each element and drops those for which the function returns true. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#filterNot-org.apache.kafka.streams.kstream.Predicate-">KStream details</a>, + <a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#filterNot-org.apache.kafka.streams.kstream.Predicate-">KTable details</a>)</p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span> + +<span class="c1">// An inverse filter that discards any negative numbers or zero</span> +<span class="c1">// Java 8+ example, using lambda expressions</span> +<span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">onlyPositives</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">filterNot</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-></span> <span class="n">value</span> <span class="o"><=</span> <span class="mi">0</span><span class="o">);</span> + +<span class="c1">// Java 7 example</span> +<span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">onlyPositives</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">filterNot</span><span class="o">(</span> + <span class="k">new</span> <span class="n">Predicate</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">test</span><span class="o">(</span><span class="n">String</span> <span class="n">key</span><span class="o">,</span> <span class="n">Long</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> + <span class="k">return</span> <span class="n">value</span> <span class="o"><=</span> <span class="mi">0</span><span class="o">;</span> + <span class="o">}</span> + <span class="o">});</span> +</pre></div> + </div> + </td> + </tr> + <tr class="row-odd"><td><p class="first"><strong>FlatMap</strong></p> + <ul class="last simple"> + <li>KStream → KStream</li> + </ul> + </td> + <td><p class="first">Takes one record and produces zero, one, or more records. You can modify the record keys and values, including + their types. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#flatMap-org.apache.kafka.streams.kstream.KeyValueMapper-">details</a>)</p> + <p><strong>Marks the stream for data re-partitioning:</strong> + Applying a grouping or a join after <code class="docutils literal"><span class="pre">flatMap</span></code> will result in re-partitioning of the records. + If possible use <code class="docutils literal"><span class="pre">flatMapValues</span></code> instead, which will not cause data re-partitioning.</p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o"><</span><span class="n">Long</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span> +<span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">transformed</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span> + <span class="c1">// Here, we generate two output records for each input record.</span> + <span class="c1">// We also change the key and value types.</span> + <span class="c1">// Example: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000)</span> + <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-></span> <span class="o">{</span> + <span class="n">List</span><span class="o"><</span><span class="n">KeyValue</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">result</span> <span class="o">=</span> <span class="k">new</span> <span class="n">LinkedList</span><span class="o"><>();</span> + <span class="n">result</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toUpperCase</span><span class="o">(),</span> <span class="mi">1000</span><span class="o">));</span> + <span class="n">result</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">(),</span> <span class="mi">9000</span><span class="o">));</span> + <span class="k">return</span> <span class="n">result</span><span class="o">;</span> + <span class="o">}</span> + <span class="o">);</span> + +<span class="c1">// Java 7 example: cf. `map` for how to create `KeyValueMapper` instances</span> +</pre></div> + </div> + </td> + </tr> + <tr class="row-even"><td><p class="first"><strong>FlatMap (values only)</strong></p> + <ul class="last simple"> + <li>KStream → KStream</li> + </ul> + </td> + <td><p class="first">Takes one record and produces zero, one, or more records, while retaining the key of the original record. + You can modify the record values and the value type. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#flatMapValues-org.apache.kafka.streams.kstream.ValueMapper-">details</a>)</p> + <p><code class="docutils literal"><span class="pre">flatMapValues</span></code> is preferable to <code class="docutils literal"><span class="pre">flatMap</span></code> because it will not cause data re-partitioning. However, you + cannot modify the key or key type like <code class="docutils literal"><span class="pre">flatMap</span></code> does.</p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Split a sentence into words.</span> +<span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">sentences</span> <span class="o">=</span> <span class="o">...;</span> +<span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">words</span> <span class="o">=</span> <span class="n">sentences</span><span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-></span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">"\\s+"</span><span class="o">)));</span> + +<span class="c1">// Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances</span> +</pre></div> + </div> + </td> + </tr> + <tr class="row-odd"><td><p class="first"><strong>Foreach</strong></p> + <ul class="last simple"> + <li>KStream → void</li> + <li>KStream → void</li> + <li>KTable → void</li> + </ul> + </td> + <td><p class="first"><strong>Terminal operation.</strong> Performs a stateless action on each record. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#foreach-org.apache.kafka.streams.kstream.ForeachAction-">details</a>)</p> + <p>You would use <code class="docutils literal"><span class="pre">foreach</span></code> to cause <em>side effects</em> based on the input data (similar to <code class="docutils literal"><span class="pre">peek</span></code>) and then <em>stop</em> + <em>further processing</em> of the input data (unlike <code class="docutils literal"><span class="pre">peek</span></code>, which is not a terminal operation).</p> + <p><strong>Note on processing guarantees:</strong> Any side effects of an action (such as writing to external systems) are not + trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.</p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span> + +<span class="c1">// Print the contents of the KStream to the local console.</span> +<span class="c1">// Java 8+ example, using lambda expressions</span> +<span class="n">stream</span><span class="o">.</span><span class="na">foreach</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-></span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="n">key</span> <span class="o">+</span> <span class="s">" => "</span> <span class="o">+</span> <span class="n">value</span><span class="o">));</span> + +<span class="c1">// Java 7 example</span> +<span class="n">stream</span><span class="o">.</span><span class="na">foreach</span><span class="o">(</span> + <span class="k">new</span> <span class="n">ForeachAction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">apply</span><span class="o">(</span><span class="n">String</span> <span class="n">key</span><span class="o">,</span> <span class="n">Long</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> + <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="n">key</span> <span class="o">+</span> <span class="s">" => "</span> <span class="o">+</span> <span class="n">value</span><span class="o">);</span> + <span class="o">}</span> + <span class="o">});</span> +</pre></div> + </div> + </td> + </tr> + <tr class="row-even"><td><p class="first"><strong>GroupByKey</strong></p> + <ul class="last simple"> + <li>KStream → KGroupedStream</li> + </ul> + </td> + <td><p class="first">Groups the records by the existing key. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#groupByKey--">details</a>)</p> + <p>Grouping is a prerequisite for <a class="reference internal" href="#streams-developer-guide-dsl-aggregating"><span class="std std-ref">aggregating a stream or a table</span></a> + and ensures that data is properly partitioned (“keyed”) for subsequent operations.</p> + <p><strong>When to set explicit SerDes:</strong> + Variants of <code class="docutils literal"><span class="pre">groupByKey</span></code> exist to override the configured default SerDes of your application, which <strong>you</strong> + <strong>must do</strong> if the key and/or value types of the resulting <code class="docutils literal"><span class="pre">KGroupedStream</span></code> do not match the configured default + SerDes.</p> + <div class="admonition note"> + <p class="first admonition-title">Note</p> + <p class="last"><strong>Grouping vs. Windowing:</strong> + A related operation is <a class="reference internal" href="#streams-developer-guide-dsl-windowing"><span class="std std-ref">windowing</span></a>, which lets you control how to + “sub-group” the grouped records <em>of the same key</em> into so-called <em>windows</em> for stateful operations such as + windowed <a class="reference internal" href="#streams-developer-guide-dsl-aggregating"><span class="std std-ref">aggregations</span></a> or + windowed <a class="reference internal" href="#streams-developer-guide-dsl-joins"><span class="std std-ref">joins</span></a>.</p> + </div> + <p><strong>Causes data re-partitioning if and only if the stream was marked for re-partitioning.</strong> + <code class="docutils literal"><span class="pre">groupByKey</span></code> is preferable to <code class="docutils literal"><span class="pre">groupBy</span></code> because it re-partitions data only if the stream was already marked + for re-partitioning. However, <code class="docutils literal"><span class="pre">groupByKey</span></code> does not allow you to modify the key or key type like <code class="docutils literal"><span class="pre">groupBy</span></code> + does.</p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span> + +<span class="c1">// Group by the existing key, using the application's configured</span> +<span class="c1">// default serdes for keys and values.</span> +<span class="n">KGroupedStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">groupByKey</span><span class="o">();</span> + +<span class="c1">// When the key and/or value types do not match the configured</span> +<span class="c1">// default serdes, we must explicitly specify serdes.</span> +<span class="n">KGroupedStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">groupByKey</span><span class="o">(</span> + <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span> + <span class="n">Serdes</span><span class="o">.</span><span class="na">ByteArray</span><span class="o">(),</span> <span class="cm">/* key */</span> + <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* value */</span> + <span class="o">);</span> +</pre></div> + </div> + </td> + </tr> + <tr class="row-odd"><td><p class="first"><strong>GroupBy</strong></p> + <ul class="last simple"> + <li>KStream → KGroupedStream</li> + <li>KTable → KGroupedTable</li> + </ul> + </td> + <td><p class="first">Groups the records by a <em>new</em> key, which may be of a different key type. + When grouping a table, you may also specify a new value and value type. + <code class="docutils literal"><span class="pre">groupBy</span></code> is a shorthand for <code class="docutils literal"><span class="pre">selectKey(...).groupByKey()</span></code>. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#groupBy-org.apache.kafka.streams.kstream.KeyValueMapper-">KStream details</a>, + <a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#groupBy-org.apache.kafka.streams.kstream.KeyValueMapper-">KTable details</a>)</p> + <p>Grouping is a prerequisite for <a class="reference internal" href="#streams-developer-guide-dsl-aggregating"><span class="std std-ref">aggregating a stream or a table</span></a> + and ensures that data is properly partitioned (“keyed”) for subsequent operations.</p> + <p><strong>When to set explicit SerDes:</strong> + Variants of <code class="docutils literal"><span class="pre">groupBy</span></code> exist to override the configured default SerDes of your application, which <strong>you must</strong> + <strong>do</strong> if the key and/or value types of the resulting <code class="docutils literal"><span class="pre">KGroupedStream</span></code> or <code class="docutils literal"><span class="pre">KGroupedTable</span></code> do not match the + configured default SerDes.</p> + <div class="admonition note"> + <p class="first admonition-title">Note</p> + <p class="last"><strong>Grouping vs. Windowing:</strong> + A related operation is <a class="reference internal" href="#streams-developer-guide-dsl-windowing"><span class="std std-ref">windowing</span></a>, which lets you control how to + “sub-group” the grouped records <em>of the same key</em> into so-called <em>windows</em> for stateful operations such as + windowed <a class="reference internal" href="#streams-developer-guide-dsl-aggregating"><span class="std std-ref">aggregations</span></a> or + windowed <a class="reference internal" href="#streams-developer-guide-dsl-joins"><span class="std std-ref">joins</span></a>.</p> + </div> + <p><strong>Always causes data re-partitioning:</strong> <code class="docutils literal"><span class="pre">groupBy</span></code> always causes data re-partitioning. + If possible use <code class="docutils literal"><span class="pre">groupByKey</span></code> instead, which will re-partition data only if required.</p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span> +<span class="n">KTable</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">table</span> <span class="o">=</span> <span class="o">...;</span> + +<span class="c1">// Java 8+ examples, using lambda expressions</span> + +<span class="c1">// Group the stream by a new key and key type</span> +<span class="n">KGroupedStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span> + <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-></span> <span class="n">value</span><span class="o">,</span> + <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span> + <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span> + <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* value */</span> + <span class="o">);</span> + +<span class="c1">// Group the table by a new key and key type, and also modify the value and value type.</span> +<span class="n">KGroupedTable</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">groupedTable</span> <span class="o">=</span> <span class="n">table</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span> + <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-></span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="na">length</span><span class="o">()),</span> + <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span> + <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span> + <span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">())</span> <span class="cm">/* value (note: type was modified) */</span> + <span class="o">);</span> + + +<span class="c1">// Java 7 examples</span> + +<span class="c1">// Group the stream by a new key and key type</span> +<span class="n">KGroupedStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span> + <span class="k">new</span> <span class="n">KeyValueMapper</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> + <span class="k">return</span> <span class="n">value</span><span class="o">;</span> + <span class="o">}</span> + <span class="o">},</span> + <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span> + <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span> + <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* value */</span> + <span class="o">);</span> + +<span class="c1">// Group the table by a new key and key type, and also modify the value and value type.</span> +<span class="n">KGroupedTable</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">groupedTable</span> <span class="o">=</span> <span class="n">table</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span> + <span class="k">new</span> <span class="n">KeyValueMapper</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">,</span> <span class="n">KeyValue</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">KeyValue</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="nf">apply</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> + <span class="k">return</span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="na">length</span><span class="o">());</span> + <span class="o">}</span> + <span class="o">},</span> + <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span> + <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span> + <span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">())</span> <span class="cm">/* value (note: type was modified) */</span> + <span class="o">);</span> +</pre></div> + </div> + </td> + </tr> + <tr class="row-even"><td><p class="first"><strong>Map</strong></p> + <ul class="last simple"> + <li>KStream → KStream</li> + </ul> + </td> + <td><p class="first">Takes one record and produces one record. You can modify the record key and value, including their types. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#map-org.apache.kafka.streams.kstream.KeyValueMapper-">details</a>)</p> + <p><strong>Marks the stream for data re-partitioning:</strong> + Applying a grouping or a join after <code class="docutils literal"><span class="pre">map</span></code> will result in re-partitioning of the records. + If possible use <code class="docutils literal"><span class="pre">mapValues</span></code> instead, which will not cause data re-partitioning.</p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span> + +<span class="c1">// Java 8+ example, using lambda expressions</span> +<span class="c1">// Note how we change the key and the key type (similar to `selectKey`)</span> +<span class="c1">// as well as the value and the value type.</span> +<span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">transformed</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">map</span><span class="o">(</span> + <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-></span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">(),</span> <span class="n">value</span><span class="o">.</span><span class="na">length</span><span class="o">()));</span> + +<span class="c1">// Java 7 example</span> +<span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">transformed</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">map</span><span class="o">(</span> + <span class="k">new</span> <span class="n">KeyValueMapper</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">,</span> <span class="n">KeyValue</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">KeyValue</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="nf">apply</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> + <span class="k">return</span> <span class="k">new</span> <span class="n">KeyValue</span><span class="o"><>(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">(),</span> <span class="n">value</span><span class="o">.</span><span class="na">length</span><span class="o">());</span> + <span class="o">}</span> + <span class="o">});</span> +</pre></div> + </div> + </td> + </tr> + <tr class="row-odd"><td><p class="first"><strong>Map (values only)</strong></p> + <ul class="last simple"> + <li>KStream → KStream</li> + <li>KTable → KTable</li> + </ul> + </td> + <td><p class="first">Takes one record and produces one record, while retaining the key of the original record. + You can modify the record value and the value type. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#mapValues-org.apache.kafka.streams.kstream.ValueMapper-">KStream details</a>, + <a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#mapValues-org.apache.kafka.streams.kstream.ValueMapper-">KTable details</a>)</p> + <p><code class="docutils literal"><span class="pre">mapValues</span></code> is preferable to <code class="docutils literal"><span class="pre">map</span></code> because it will not cause data re-partitioning. However, it does not + allow you to modify the key or key type like <code class="docutils literal"><span class="pre">map</span></code> does.</p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span> + +<span class="c1">// Java 8+ example, using lambda expressions</span> +<span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">uppercased</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">mapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-></span> <span class="n">value</span><span class="o">.</span><span class="na">toUpperCase</span><span class="o">());</span> + +<span class="c1">// Java 7 example</span> +<span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">uppercased</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">mapValues</span><span class="o">(</span> + <span class="k">new</span> <span class="n">ValueMapper</span><span class="o"><</span><span class="n">String</span><span class="o">>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="n">String</span> <span class="n">s</span><span class="o">)</span> <span class="o">{</span> + <span class="k">return</span> <span class="n">s</span><span class="o">.</span><span class="na">toUpperCase</span><span class="o">();</span> + <span class="o">}</span> + <span class="o">});</span> +</pre></div> + </div> + </td> + </tr> + <tr class="row-even"><td><p class="first"><strong>Peek</strong></p> + <ul class="last simple"> + <li>KStream → KStream</li> + </ul> + </td> + <td><p class="first">Performs a stateless action on each record, and returns an unchanged stream. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#peek-org.apache.kafka.streams.kstream.ForeachAction-">details</a>)</p> + <p>You would use <code class="docutils literal"><span class="pre">peek</span></code> to cause <em>side effects</em> based on the input data (similar to <code class="docutils literal"><span class="pre">foreach</span></code>) and <em>continue</em> + <em>processing</em> the input data (unlike <code class="docutils literal"><span class="pre">foreach</span></code>, which is a terminal operation). <code class="docutils literal"><span class="pre">peek</span></code> returns the input + stream as-is; if you need to modify the input stream, use <code class="docutils literal"><span class="pre">map</span></code> or <code class="docutils literal"><span class="pre">mapValues</span></code> instead.</p> + <p><code class="docutils literal"><span class="pre">peek</span></code> is helpful for use cases such as logging or tracking metrics or for debugging and troubleshooting.</p> + <p><strong>Note on processing guarantees:</strong> Any side effects of an action (such as writing to external systems) are not + trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.</p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span> + +<span class="c1">// Java 8+ example, using lambda expressions</span> +<span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">unmodifiedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">peek</span><span class="o">(</span> + <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-></span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"key="</span> <span class="o">+</span> <span class="n">key</span> <span class="o">+</span> <span class="s">", value="</span> <span class="o">+</span> <span class="n">value</span><span class="o">));</span> + +<span class="c1">// Java 7 example</span> +<span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">unmodifiedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">peek</span><span class="o">(</span> + <span class="k">new</span> <span class="n">ForeachAction</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">apply</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> + <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"key="</span> <span class="o">+</span> <span class="n">key</span> <span class="o">+</span> <span class="s">", value="</span> <span class="o">+</span> <span class="n">value</span><span class="o">);</span> + <span class="o">}</span> + <span class="o">});</span> +</pre></div> + </div> + </td> + </tr> + <tr class="row-odd"><td><p class="first"><strong>Print</strong></p> + <ul class="last simple"> + <li>KStream → void</li> + </ul> + </td> + <td><p class="first"><strong>Terminal operation.</strong> Prints the records to <code class="docutils literal"><span class="pre">System.out</span></code>. See Javadocs for serde and <code class="docutils literal"><span class="pre">toString()</span></code> + caveats. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#print--">details</a>)</p> + <p>Calling <code class="docutils literal"><span class="pre">print()</span></code> is the same as calling <code class="docutils literal"><span class="pre">foreach((key,</span> <span class="pre">value)</span> <span class="pre">-></span> <span class="pre">System.out.println(key</span> <span class="pre">+</span> <span class="pre">",</span> <span class="pre">"</span> <span class="pre">+</span> <span class="pre">value))</span></code></p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span> +<span class="c1">// print to sysout</span> +<span class="n">stream</span><span class="o">.</span><span class="na">print</span><span class="o">();</span> + +<span class="c1">// print to file with a custom label</span> +<span class="n">stream</span><span class="o">.</span><span class="na">print</span><span class="o">(</span><span class="n">Printed</span><span class="o">.</span><span class="na">toFile</span><span class="o">(</span><span class="s">"streams.out"</span><span class="o">).</span><span class="na">withLabel</span><span class="o">(</span><span class="s">"streams"</span><span class="o">));</span> +</pre></div> + </div> + </td> + </tr> + <tr class="row-even"><td><p class="first"><strong>SelectKey</strong></p> + <ul class="last simple"> + <li>KStream → KStream</li> + </ul> + </td> + <td><p class="first">Assigns a new key – possibly of a new key type – to each record. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#selectKey-org.apache.kafka.streams.kstream.KeyValueMapper-">details</a>)</p> + <p>Calling <code class="docutils literal"><span class="pre">selectKey(mapper)</span></code> is the same as calling <code class="docutils literal"><span class="pre">map((key,</span> <span class="pre">value)</span> <span class="pre">-></span> <span class="pre">mapper(key,</span> <span class="pre">value),</span> <span class="pre">value)</span></code>.</p> + <p><strong>Marks the stream for data re-partitioning:</strong> + Applying a grouping or a join after <code class="docutils literal"><span class="pre">selectKey</span></code> will result in re-partitioning of the records.</p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span> + +<span class="c1">// Derive a new record key from the record's value. Note how the key type changes, too.</span> +<span class="c1">// Java 8+ example, using lambda expressions</span> +<span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">rekeyed</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">selectKey</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-></span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">)[</span><span class="mi">0</span><span class="o">])</span> + +<span class="c1">// Java 7 example</span> +<span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">rekeyed</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">selectKey</span><span class="o">(</span> + <span class="k">new</span> <span class="n">KeyValueMapper</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span> + <span class="k">return</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">)[</span><span class="mi">0</span><span class="o">];</span> + <span class="o">}</span> + <span class="o">});</span> +</pre></div> + </div> + </td> + </tr> + <tr class="row-odd"><td><p class="first"><strong>Table to Stream</strong></p> + <ul class="last simple"> + <li>KTable → KStream</li> + </ul> + </td> + <td><p class="first">Get the changelog stream of this table. + (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#toStream--">details</a>)</p> + <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KTable</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">table</span> <span class="o">=</span> <span class="o">...;</span> + +<span class="c1">// Also, a variant of `toStream` exists that allows you</span> +<span class="c1">// to select a new key for the resulting stream.</span> +<span class="n">KStream</span><span class="o"><</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="n">table</span><span class="o">.</span><span class="na">toStream</span><span class="o">();</span> +</pre></div> + </div> + </td> + </tr> + </tbody> + </table> + </div> + <div class="section" id="stateful-transformations"> + <span id="streams-developer-guide-dsl-transformations-stateful"></span><h3><a class="toc-backref" href="#id11">Stateful transformations</a><a class="headerlink" href="#stateful-transformations" title="Permalink to this headline"></a></h3> + <p id="streams-developer-guide-dsl-transformations-stateful-overview">Stateful transformations depend on state for processing inputs and producing outputs and require a <a class="reference internal" href="../architecture.html#streams-architecture-state"><span class="std std-ref">state store</span></a> associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per + window. In join operations, a windowing state store is used to collect all of the records received so far within the + defined window boundary.</p> + <p>Note, that state stores are fault-tolerant. + In case of failure, Kafka Streams guarantees to fully restore all state stores prior to resuming the processing. + See <a class="reference internal" href="../architecture.html#streams-architecture-fault-tolerance"><span class="std std-ref">Fault Tolerance</span></a> for further information.</p> + <p>Available stateful transformations in the DSL include:</p> + <ul class="simple"> + <li><a class="reference internal" href="#streams-developer-guide-dsl-aggregating"><span class="std std-ref">Aggregating</span></a></li> + <li><a class="reference internal" href="#streams-developer-guide-dsl-joins"><span class="std std-ref">Joining</span></a></li> + <li><a class="reference internal" href="#streams-developer-guide-dsl-windowing"><span class="std std-ref">Windowing</span></a> (as part of aggregations and joins)</li> + <li><a class="reference internal" href="#streams-developer-guide-dsl-process"><span class="std std-ref">Applying custom processors and transformers</span></a>, which may be stateful, for + Processor API integration</li> + </ul> + <p>The following diagram shows their relationships:</p> + <div class="figure align-center" id="id2"> + <a class="reference internal image-reference" href="../../../images/streams-stateful_operations.png"><img alt="../../../images/streams-stateful_operations.png" src="../../../images/streams-stateful_operations.png" style="width: 400pt;" /></a> + <p class="caption"><span class="caption-text">Stateful transformations in the DSL.</span></p> + </div> + <p>Here is an example of a stateful application: the WordCount algorithm.</p> + <p>WordCount example in Java 8+, using lambda expressions:</p> + <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Assume the record values represent lines of text. For the sake of this example, you can ignore</span> +<span class="c1">// whatever may be stored in the record keys.</span> +<span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">textLines</span> <span class="o">=</span> <span class="o">...;</span> + +<span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">textLines</span> + <span class="c1">// Split each text line, by whitespace, into words. The text lines are the record</span> + <span class="c1">// values, i.e. you can ignore whatever data is in the record keys and thus invoke</span> + <span class="c1">// `flatMapValues` instead of the more generic `flatMap`.</span> + <span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-></span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">().</span><span class="na">split</span><span class="o">(</span><span class="s">"\\W+"</span><span class="o">)))</span> + <span class="c1">// Group the stream by word to ensure the key of the record is the word.</span> + <span class="o">.</span><span class="na">groupBy</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">word</span><span class="o">)</span> <span class="o">-></span> <span class="n">word</span><span class="o">)</span> + <span class="c1">// Count the occurrences of each word (record key).</span> + <span class="c1">//</span> + <span class="c1">// This will change the stream type from `KGroupedStream<String, String>` to</span> + <span class="c1">// `KTable<String, Long>` (word -> count).</span> + <span class="o">.</span><span class="na">count</span><span class="o">()</span> + <span class="c1">// Convert the `KTable<String, Long>` into a `KStream<String, Long>`.</span> + <span class="o">.</span><span class="na">toStream</span><span class="o">();</span> +</pre></div> + </div> + <p>WordCount example in
<TRUNCATED>
