http://git-wip-us.apache.org/repos/asf/kafka/blob/3e2fe17c/docs/streams/developer-guide/memory-mgmt.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide/memory-mgmt.html 
b/docs/streams/developer-guide/memory-mgmt.html
new file mode 100644
index 0000000..4cf3afc
--- /dev/null
+++ b/docs/streams/developer-guide/memory-mgmt.html
@@ -0,0 +1,241 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+  <!-- h1>Developer Guide for Kafka Streams</h1 -->
+  <div class="sub-nav-sticky">
+    <div class="sticky-top">
+      <!-- div style="height:35px">
+        <a href="/{{version}}/documentation/streams/">Introduction</a>
+        <a class="active-menu-item" 
href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+        <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+        <a href="/{{version}}/documentation/streams/quickstart">Run Demo 
App</a>
+        <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write 
App</a>
+      </div -->
+    </div>
+  </div>
+
+  <div class="section" id="memory-management">
+    <span id="streams-developer-guide-memory-management"></span><h1>Memory 
Management<a class="headerlink" href="#memory-management" title="Permalink to 
this headline"></a></h1>
+    <p>You can specify the total memory (RAM) size used for internal caching 
and compacting of records. This caching happens
+      before the records are written to state stores or forwarded downstream 
to other nodes.</p>
+    <p>The record caches are implemented slightly different in the DSL and 
Processor API.</p>
+    <div class="contents local topic" id="table-of-contents">
+      <p class="topic-title first">Table of Contents</p>
+      <ul class="simple">
+        <li><a class="reference internal" href="#record-caches-in-the-dsl" 
id="id1">Record caches in the DSL</a></li>
+        <li><a class="reference internal" 
href="#record-caches-in-the-processor-api" id="id2">Record caches in the 
Processor API</a></li>
+        <li><a class="reference internal" href="#other-memory-usage" 
id="id3">Other memory usage</a></li>
+      </ul>
+    </div>
+    <div class="section" id="record-caches-in-the-dsl">
+      <span 
id="streams-developer-guide-memory-management-record-cache"></span><h2><a 
class="toc-backref" href="#id1">Record caches in the DSL</a><a 
class="headerlink" href="#record-caches-in-the-dsl" title="Permalink to this 
headline"></a></h2>
+      <p>You can specify the total memory (RAM) size of the record cache for 
an instance of the processing topology. It is leveraged
+        by the following <code class="docutils literal"><span 
class="pre">KTable</span></code> instances:</p>
+      <ul class="simple">
+        <li>Source <code class="docutils literal"><span 
class="pre">KTable</span></code>: <code class="docutils literal"><span 
class="pre">KTable</span></code> instances that are created via <code 
class="docutils literal"><span class="pre">StreamsBuilder#table()</span></code> 
or <code class="docutils literal"><span 
class="pre">StreamsBuilder#globalTable()</span></code>.</li>
+        <li>Aggregation <code class="docutils literal"><span 
class="pre">KTable</span></code>: instances of <code class="docutils 
literal"><span class="pre">KTable</span></code> that are created as a result of 
<a class="reference internal" 
href="dsl-api.html#streams-developer-guide-dsl-aggregating"><span class="std 
std-ref">aggregations</span></a>.</li>
+      </ul>
+      <p>For such <code class="docutils literal"><span 
class="pre">KTable</span></code> instances, the record cache is used for:</p>
+      <ul class="simple">
+        <li>Internal caching and compacting of output records before they are 
written by the underlying stateful
+          <a class="reference internal" 
href="../concepts.html#streams-concepts-processor"><span class="std 
std-ref">processor node</span></a> to its internal state stores.</li>
+        <li>Internal caching and compacting of output records before they are 
forwarded from the underlying stateful
+          <a class="reference internal" 
href="../concepts.html#streams-concepts-processor"><span class="std 
std-ref">processor node</span></a> to any of its downstream processor 
nodes.</li>
+      </ul>
+      <p>Use the following example to understand the behaviors with and 
without record caching. In this example, the input is a
+        <code class="docutils literal"><span 
class="pre">KStream&lt;String,</span> <span 
class="pre">Integer&gt;</span></code> with the records <code class="docutils 
literal"><span class="pre">&lt;K,V&gt;:</span> <span class="pre">&lt;A,</span> 
<span class="pre">1&gt;,</span> <span class="pre">&lt;D,</span> <span 
class="pre">5&gt;,</span> <span class="pre">&lt;A,</span> <span 
class="pre">20&gt;,</span> <span class="pre">&lt;A,</span> <span 
class="pre">300&gt;</span></code>. The focus in this example is
+        on the records with key == <code class="docutils literal"><span 
class="pre">A</span></code>.</p>
+      <ul>
+        <li><p class="first">An <a class="reference internal" 
href="dsl-api.html#streams-developer-guide-dsl-aggregating"><span class="std 
std-ref">aggregation</span></a> computes the sum of record values, grouped by 
key, for
+          the input and returns a <code class="docutils literal"><span 
class="pre">KTable&lt;String,</span> <span 
class="pre">Integer&gt;</span></code>.</p>
+          <blockquote>
+            <div><ul class="simple">
+              <li><strong>Without caching</strong>: a sequence of output 
records is emitted for key <code class="docutils literal"><span 
class="pre">A</span></code> that represent changes in the
+                resulting aggregation table. The parentheses (<code 
class="docutils literal"><span class="pre">()</span></code>) denote changes, 
the left number is the new aggregate value
+                and the right number is the old aggregate value: <code 
class="docutils literal"><span class="pre">&lt;A,</span> <span 
class="pre">(1,</span> <span class="pre">null)&gt;,</span> <span 
class="pre">&lt;A,</span> <span class="pre">(21,</span> <span 
class="pre">1)&gt;,</span> <span class="pre">&lt;A,</span> <span 
class="pre">(321,</span> <span class="pre">21)&gt;</span></code>.</li>
+              <li><strong>With caching</strong>: a single output record is 
emitted for key <code class="docutils literal"><span 
class="pre">A</span></code> that would likely be compacted in the cache,
+                leading to a single output record of <code class="docutils 
literal"><span class="pre">&lt;A,</span> <span class="pre">(321,</span> <span 
class="pre">null)&gt;</span></code>. This record is written to the 
aggregation&#8217;s internal state
+                store and forwarded to any downstream operations.</li>
+            </ul>
+            </div></blockquote>
+        </li>
+      </ul>
+      <p>The cache size is specified through the <code class="docutils 
literal"><span class="pre">cache.max.bytes.buffering</span></code> parameter, 
which is a global setting per
+        processing topology:</p>
+      <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="c1">// Enable record cache of 
size 10 MB.</span>
+<span class="n">Properties</span> <span class="n">streamsConfiguration</span> 
<span class="o">=</span> <span class="k">new</span> <span 
class="n">Properties</span><span class="o">();</span>
+<span class="n">streamsConfiguration</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">CACHE_MAX_BYTES_BUFFERING_CONFIG</span><span class="o">,</span> 
<span class="mi">10</span> <span class="o">*</span> <span 
class="mi">1024</span> <span class="o">*</span> <span 
class="mi">1024L</span><span class="o">);</span>
+</pre></div>
+      </div>
+      <p>This parameter controls the number of bytes allocated for caching. 
Specifically, for a processor topology instance with
+        <code class="docutils literal"><span class="pre">T</span></code> 
threads and <code class="docutils literal"><span class="pre">C</span></code> 
bytes allocated for caching, each thread will have an even <code 
class="docutils literal"><span class="pre">C/T</span></code> bytes to construct 
its own
+        cache and use as it sees fit among its tasks. This means that there 
are as many caches as there are threads, but no sharing of
+        caches across threads happens.</p>
+      <p>The basic API for the cache is made of <code class="docutils 
literal"><span class="pre">put()</span></code> and <code class="docutils 
literal"><span class="pre">get()</span></code> calls.  Records are
+        evicted using a simple LRU scheme after the cache size is reached.  
The first time a keyed record <code class="docutils literal"><span 
class="pre">R1</span> <span class="pre">=</span> <span 
class="pre">&lt;K1,</span> <span class="pre">V1&gt;</span></code>
+        finishes processing at a node, it is marked as dirty in the cache.  
Any other keyed record <code class="docutils literal"><span 
class="pre">R2</span> <span class="pre">=</span> <span 
class="pre">&lt;K1,</span> <span class="pre">V2&gt;</span></code> with the
+        same key <code class="docutils literal"><span 
class="pre">K1</span></code> that is processed on that node during that time 
will overwrite <code class="docutils literal"><span class="pre">&lt;K1,</span> 
<span class="pre">V1&gt;</span></code>, this is referred to as
+        &#8220;being compacted&#8221;.  This has the same effect as
+        <a class="reference external" 
href="https://kafka.apache.org/documentation.html#compaction";>Kafka&#8217;s log 
compaction</a>, but happens earlier, while the
+        records are still in memory, and within your client-side application, 
rather than on the server-side (i.e. the Kafka
+        broker).  After flushing, <code class="docutils literal"><span 
class="pre">R2</span></code> is forwarded to the next processing node and then 
written to the local state store.</p>
+      <p>The semantics of caching is that data is flushed to the state store 
and forwarded to the next downstream processor node
+        whenever the earliest of <code class="docutils literal"><span 
class="pre">commit.interval.ms</span></code> or <code class="docutils 
literal"><span class="pre">cache.max.bytes.buffering</span></code> (cache 
pressure) hits.  Both
+        <code class="docutils literal"><span 
class="pre">commit.interval.ms</span></code> and <code class="docutils 
literal"><span class="pre">cache.max.bytes.buffering</span></code> are global 
parameters. As such, it is not possible to specify
+        different parameters for individual nodes.</p>
+      <p>Here are example settings for both parameters based on desired 
scenarios.</p>
+      <ul>
+        <li><p class="first">To turn off caching the cache size can be set to 
zero:</p>
+          <blockquote>
+            <div><div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="c1">// Disable record 
cache</span>
+<span class="n">Properties</span> <span class="n">streamsConfiguration</span> 
<span class="o">=</span> <span class="k">new</span> <span 
class="n">Properties</span><span class="o">();</span>
+<span class="n">streamsConfiguration</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">CACHE_MAX_BYTES_BUFFERING_CONFIG</span><span class="o">,</span> 
<span class="mi">0</span><span class="o">);</span>
+</pre></div>
+            </div>
+              <p>Turning off caching might result in high write traffic for 
the underlying RocksDB store.
+                With default settings caching is enabled within Kafka Streams 
but RocksDB caching is disabled.
+                Thus, to avoid high write traffic it is recommended to enable 
RocksDB caching if Kafka Streams caching is turned off.</p>
+              <p>For example, the RocksDB Block Cache could be set to 100MB 
and Write Buffer size to 32 MB. For more information, see
+                the <a class="reference internal" 
href="config-streams.html#streams-developer-guide-rocksdb-config"><span 
class="std std-ref">RocksDB config</span></a>.</p>
+            </div></blockquote>
+        </li>
+        <li><p class="first">To enable caching but still have an upper bound 
on how long records will be cached, you can set the commit interval. In this 
example, it is set to 1000 milliseconds:</p>
+          <blockquote>
+            <div><div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Properties</span> <span 
class="n">streamsConfiguration</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+<span class="c1">// Enable record cache of size 10 MB.</span>
+<span class="n">streamsConfiguration</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">CACHE_MAX_BYTES_BUFFERING_CONFIG</span><span class="o">,</span> 
<span class="mi">10</span> <span class="o">*</span> <span 
class="mi">1024</span> <span class="o">*</span> <span 
class="mi">1024L</span><span class="o">);</span>
+<span class="c1">// Set commit interval to 1 second.</span>
+<span class="n">streamsConfiguration</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">COMMIT_INTERVAL_MS_CONFIG</span><span class="o">,</span> <span 
class="mi">1000</span><span class="o">);</span>
+</pre></div>
+            </div>
+            </div></blockquote>
+        </li>
+      </ul>
+      <p>The effect of these two configurations is described in the figure 
below. The records are shown using 4 keys: blue, red, yellow, and green. Assume 
the cache has space for only 3 keys.</p>
+      <ul>
+        <li><p class="first">When the cache is disabled (a), all of the input 
records will be output.</p>
+        </li>
+        <li><p class="first">When the cache is enabled (b):</p>
+          <blockquote>
+            <div><ul class="simple">
+              <li>Most records are output at the end of commit intervals 
(e.g., at <code class="docutils literal"><span class="pre">t1</span></code> a 
single blue record is output, which is the final over-write of the blue key up 
to that time).</li>
+              <li>Some records are output because of cache pressure (i.e. 
before the end of a commit interval). For example, see the red record before 
<code class="docutils literal"><span class="pre">t2</span></code>. With smaller 
cache sizes we expect cache pressure to be the primary factor that dictates 
when records are output. With large cache sizes, the commit interval will be 
the primary factor.</li>
+              <li>The total number of records output has been reduced from 15 
to 8.</li>
+            </ul>
+            </div></blockquote>
+        </li>
+      </ul>
+      <div class="figure align-center">
+        <a class="reference internal image-reference" 
href="../../../images/streams-cache-and-commit-interval.png"><img 
alt="../../../images/streams-cache-and-commit-interval.png" 
src="../../../images/streams-cache-and-commit-interval.png" style="width: 
500pt; height: 400pt;" /></a>
+      </div>
+    </div>
+    <div class="section" id="record-caches-in-the-processor-api">
+      <span 
id="streams-developer-guide-memory-management-state-store-cache"></span><h2><a 
class="toc-backref" href="#id2">Record caches in the Processor API</a><a 
class="headerlink" href="#record-caches-in-the-processor-api" title="Permalink 
to this headline"></a></h2>
+      <p>You can specify the total memory (RAM) size of the record cache for 
an instance of the processing topology. It is used
+        for internal caching and compacting of output records before they are 
written from a stateful processor node to its
+        state stores.</p>
+      <p>The record cache in the Processor API does not cache or compact any 
output records that are being forwarded downstream.
+        This means that all downstream processor nodes can see all records, 
whereas the state stores see a reduced number of records.
+        This does not impact correctness of the system, but is a performance 
optimization for the state stores. For example, with the
+        Processor API you can store a record in a state store while forwarding 
a different value downstream.</p>
+      <p>Following from the example first shown in section <a class="reference 
internal" href="processor-api.html#streams-developer-guide-state-store"><span 
class="std std-ref">State Stores</span></a>, to enable caching, you can
+        add the <code class="docutils literal"><span 
class="pre">withCachingEnabled</span></code> call (note that caches are 
disabled by default and there is no explicit <code class="docutils 
literal"><span class="pre">withDisableCaching</span></code>
+        call).</p>
+      <p><strong>Tip:</strong> Caches are disabled by default and there is no 
explicit <code class="docutils literal"><span 
class="pre">disableCaching</span></code> call).</p>
+      <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">StoreBuilder</span> <span 
class="n">countStoreBuilder</span> <span class="o">=</span>
+  <span class="n">Stores</span><span class="o">.</span><span 
class="na">keyValueStoreBuilder</span><span class="o">(</span>
+    <span class="n">Stores</span><span class="o">.</span><span 
class="na">persistentKeyValueStore</span><span class="o">(</span><span 
class="s">&quot;Counts&quot;</span><span class="o">),</span>
+    <span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">(),</span>
+    <span class="n">Serdes</span><span class="o">.</span><span 
class="na">Long</span><span class="o">())</span>
+  <span class="o">.</span><span class="na">withCachingEnabled</span><span 
class="o">()</span>
+</pre></div>
+      </div>
+    </div>
+    <div class="section" id="other-memory-usage">
+      <h2><a class="toc-backref" href="#id3">Other memory usage</a><a 
class="headerlink" href="#other-memory-usage" title="Permalink to this 
headline"></a></h2>
+      <p>There are other modules inside Apache Kafka that allocate memory 
during runtime. They include the following:</p>
+      <ul class="simple">
+        <li>Producer buffering, managed by the producer config <code 
class="docutils literal"><span class="pre">buffer.memory</span></code>.</li>
+        <li>Consumer buffering, currently not strictly managed, but can be 
indirectly controlled by fetch size, i.e.,
+          <code class="docutils literal"><span 
class="pre">fetch.max.bytes</span></code> and <code class="docutils 
literal"><span class="pre">fetch.max.wait.ms</span></code>.</li>
+        <li>Both producer and consumer also have separate TCP send / receive 
buffers that are not counted as the buffering memory.
+          These are controlled by the <code class="docutils literal"><span 
class="pre">send.buffer.bytes</span></code> / <code class="docutils 
literal"><span class="pre">receive.buffer.bytes</span></code> configs.</li>
+        <li>Deserialized objects buffering: after <code class="docutils 
literal"><span class="pre">consumer.poll()</span></code> returns records, they 
will be deserialized to extract
+          timestamp and buffered in the streams space. Currently this is only 
indirectly controlled by
+          <code class="docutils literal"><span 
class="pre">buffered.records.per.partition</span></code>.</li>
+        <li>RocksDB&#8217;s own memory usage, both on-heap and off-heap; 
critical configs (for RocksDB version 4.1.0) include
+          <code class="docutils literal"><span 
class="pre">block_cache_size</span></code>, <code class="docutils 
literal"><span class="pre">write_buffer_size</span></code> and <code 
class="docutils literal"><span 
class="pre">max_write_buffer_number</span></code>.  These can be specified 
through the
+          <code class="docutils literal"><span 
class="pre">rocksdb.config.setter</span></code> configuration.</li>
+      </ul>
+      <div class="admonition tip">
+        <p class="first admonition-title">Tip</p>
+        <p><strong>Iterators should be closed explicitly to release 
resources:</strong> Store iterators (e.g., <code class="docutils literal"><span 
class="pre">KeyValueIterator</span></code> and <code class="docutils 
literal"><span class="pre">WindowStoreIterator</span></code>) must be closed 
explicitly upon completeness to release resources such as open file handlers 
and in-memory read buffers, or use try-with-resources statement (available 
since JDK7) for this Closeable class.</p>
+        <p class="last">Otherwise, stream application&#8217;s memory usage 
keeps increasing when running until it hits an OOM.</p>
+</div>
+</div>
+
+
+               </div>
+              </div>
+  <div class="pagination">
+    <a 
href="/{{version}}/documentation/streams/developer-guide/interactive-queries" 
class="pagination__btn pagination__btn__prev">Previous</a>
+    <a href="/{{version}}/documentation/streams/developer-guide/running-app" 
class="pagination__btn pagination__btn__next">Next</a>
+  </div>
+</script>
+
+<!--#include virtual="../../../includes/_header.htm" -->
+<!--#include virtual="../../../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+  <!--#include virtual="../../../includes/_nav.htm" -->
+  <div class="right">
+    <!--#include virtual="../../../includes/_docs_banner.htm" -->
+    <ul class="breadcrumbs">
+      <li><a href="/documentation">Documentation</a></li>
+      <li><a href="/documentation/streams">Kafka Streams</a></li>
+      <li><a href="/documentation/streams/developer-guide/">Developer 
Guide</a></li>
+    </ul>
+    <div class="p-content"></div>
+  </div>
+</div>
+<!--#include virtual="../../../includes/_footer.htm" -->
+<script>
+    $(function() {
+        // Show selected style on nav item
+        $('.b-nav__streams').addClass('selected');
+
+        //sticky secondary nav
+        var $navbar = $(".sub-nav-sticky"),
+            y_pos = $navbar.offset().top,
+            height = $navbar.height();
+
+        $(window).scroll(function() {
+            var scrollTop = $(window).scrollTop();
+
+            if (scrollTop > y_pos - height) {
+                $navbar.addClass("navbar-fixed")
+            } else if (scrollTop <= y_pos) {
+                $navbar.removeClass("navbar-fixed")
+            }
+        });
+
+        // Display docs subnav items
+        
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+    });
+</script>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e2fe17c/docs/streams/developer-guide/processor-api.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide/processor-api.html 
b/docs/streams/developer-guide/processor-api.html
new file mode 100644
index 0000000..fd7673d
--- /dev/null
+++ b/docs/streams/developer-guide/processor-api.html
@@ -0,0 +1,437 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+  <!-- h1>Developer Guide for Kafka Streams</h1 -->
+  <div class="sub-nav-sticky">
+    <div class="sticky-top">
+      <!-- div style="height:35px">
+        <a href="/{{version}}/documentation/streams/">Introduction</a>
+        <a class="active-menu-item" 
href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+        <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+        <a href="/{{version}}/documentation/streams/quickstart">Run Demo 
App</a>
+        <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write 
App</a>
+      </div -->
+    </div>
+  </div>
+
+    <div class="section" id="processor-api">
+        <span id="streams-developer-guide-processor-api"></span><h1>Processor 
API<a class="headerlink" href="#processor-api" title="Permalink to this 
headline"></a></h1>
+        <p>The Processor API allows developers to define and connect custom 
processors and to interact with state stores. With the
+            Processor API, you can define arbitrary stream processors that 
process one received record at a time, and connect these
+            processors with their associated state stores to compose the 
processor topology that represents a customized processing
+            logic.</p>
+        <div class="contents local topic" id="table-of-contents">
+            <p class="topic-title first">Table of Contents</p>
+            <ul class="simple">
+                <li><a class="reference internal" href="#overview" 
id="id1">Overview</a></li>
+                <li><a class="reference internal" 
href="#defining-a-stream-processor" id="id2">Defining a Stream 
Processor</a></li>
+                <li><a class="reference internal" href="#state-stores" 
id="id3">State Stores</a><ul>
+                    <li><a class="reference internal" 
href="#defining-and-creating-a-state-store" id="id4">Defining and creating a 
State Store</a></li>
+                    <li><a class="reference internal" 
href="#fault-tolerant-state-stores" id="id5">Fault-tolerant State 
Stores</a></li>
+                    <li><a class="reference internal" 
href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" 
id="id6">Enable or Disable Fault Tolerance of State Stores (Store 
Changelogs)</a></li>
+                    <li><a class="reference internal" 
href="#implementing-custom-state-stores" id="id7">Implementing Custom State 
Stores</a></li>
+                </ul>
+                </li>
+                <li><a class="reference internal" 
href="#connecting-processors-and-state-stores" id="id8">Connecting Processors 
and State Stores</a></li>
+            </ul>
+        </div>
+        <div class="section" id="overview">
+            <h2><a class="toc-backref" href="#id1">Overview</a><a 
class="headerlink" href="#overview" title="Permalink to this headline"></a></h2>
+            <p>The Processor API can be used to implement both 
<strong>stateless</strong> as well as <strong>stateful</strong> operations, 
where the latter is
+                achieved through the use of <a class="reference internal" 
href="#streams-developer-guide-state-store"><span class="std std-ref">state 
stores</span></a>.</p>
+            <div class="admonition tip">
+                <p class="first admonition-title">Tip</p>
+                <p class="last"><strong>Combining the DSL and the Processor 
API:</strong>
+                    You can combine the convenience of the DSL with the power 
and flexibility of the Processor API as described in the
+                    section <a class="reference internal" 
href="dsl-api.html#streams-developer-guide-dsl-process"><span class="std 
std-ref">Applying processors and transformers (Processor API 
integration)</span></a>.</p>
+            </div>
+            <p>For a complete list of available API functionality, see the <a 
class="reference internal" href="../javadocs.html#streams-javadocs"><span 
class="std std-ref">Kafka Streams API docs</span></a>.</p>
+        </div>
+        <div class="section" id="defining-a-stream-processor">
+            <span id="streams-developer-guide-stream-processor"></span><h2><a 
class="toc-backref" href="#id2">Defining a Stream Processor</a><a 
class="headerlink" href="#defining-a-stream-processor" title="Permalink to this 
headline"></a></h2>
+            <p>A <a class="reference internal" 
href="../concepts.html#streams-concepts"><span class="std std-ref">stream 
processor</span></a> is a node in the processor topology that represents a 
single processing step.
+                With the Processor API, you can define arbitrary stream 
processors that processes one received record at a time, and connect
+                these processors with their associated state stores to compose 
the processor topology.</p>
+            <p>You can define a customized stream processor by implementing 
the <code class="docutils literal"><span class="pre">Processor</span></code> 
interface, which provides the <code class="docutils literal"><span 
class="pre">process()</span></code> API method.
+                The <code class="docutils literal"><span 
class="pre">process()</span></code> method is called on each of the received 
records.</p>
+            <p>The <code class="docutils literal"><span 
class="pre">Processor</span></code> interface also has an <code class="docutils 
literal"><span class="pre">init()</span></code> method, which is called by the 
Kafka Streams library during task construction
+                phase. Processor instances should perform any required 
initialization in this method. The <code class="docutils literal"><span 
class="pre">init()</span></code> method passes in a <code class="docutils 
literal"><span class="pre">ProcessorContext</span></code>
+                instance, which provides access to the metadata of the 
currently processed record, including its source Kafka topic and partition,
+                its corresponding message offset, and further such 
information. You can also use this context instance to schedule a punctuation
+                function (via <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code>), to forward a new record 
as a key-value pair to the downstream processors (via <code class="docutils 
literal"><span class="pre">ProcessorContext#forward()</span></code>),
+                and to commit the current processing progress (via <code 
class="docutils literal"><span 
class="pre">ProcessorContext#commit()</span></code>).</p>
+            <p>Specifically, <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code> accepts a user <code 
class="docutils literal"><span class="pre">Punctuator</span></code> callback 
interface, which triggers its <code class="docutils literal"><span 
class="pre">punctuate()</span></code>
+                API method periodically based on the <code class="docutils 
literal"><span class="pre">PunctuationType</span></code>. The <code 
class="docutils literal"><span class="pre">PunctuationType</span></code> 
determines what notion of time is used
+                for the punctuation scheduling: either <a class="reference 
internal" href="../concepts.html#streams-concepts-time"><span class="std 
std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time
+                is configured to represent event-time via <code 
class="docutils literal"><span class="pre">TimestampExtractor</span></code>). 
When stream-time is used, <code class="docutils literal"><span 
class="pre">punctuate()</span></code> is triggered purely
+                by data because stream-time is determined (and advanced 
forward) by the timestamps derived from the input data. When there
+                is no new input data arriving, stream-time is not advanced and 
thus <code class="docutils literal"><span class="pre">punctuate()</span></code> 
is not called.</p>
+            <p>For example, if you schedule a <code class="docutils 
literal"><span class="pre">Punctuator</span></code> function every 10 seconds 
based on <code class="docutils literal"><span 
class="pre">PunctuationType.STREAM_TIME</span></code> and if you
+                process a stream of 60 records with consecutive timestamps 
from 1 (first record) to 60 seconds (last record),
+                then <code class="docutils literal"><span 
class="pre">punctuate()</span></code> would be called 6 times. This happens 
regardless of the time required to actually process those records. <code 
class="docutils literal"><span class="pre">punctuate()</span></code>
+                would be called 6 times regardless of whether processing these 
60 records takes a second, a minute, or an hour.</p>
+            <p>When wall-clock-time (i.e. <code class="docutils literal"><span 
class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>) is used, <code 
class="docutils literal"><span class="pre">punctuate()</span></code> is 
triggered purely by the wall-clock time.
+                Reusing the example above, if the <code class="docutils 
literal"><span class="pre">Punctuator</span></code> function is scheduled based 
on <code class="docutils literal"><span 
class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>, and if these
+                60 records were processed within 20 seconds, <code 
class="docutils literal"><span class="pre">punctuate()</span></code> is called 
2 times (one time every 10 seconds). If these 60 records
+                were processed within 5 seconds, then no <code class="docutils 
literal"><span class="pre">punctuate()</span></code> is called at all. Note 
that you can schedule multiple <code class="docutils literal"><span 
class="pre">Punctuator</span></code>
+                callbacks with different <code class="docutils literal"><span 
class="pre">PunctuationType</span></code> types within the same processor by 
calling <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code> multiple
+                times inside <code class="docutils literal"><span 
class="pre">init()</span></code> method.</p>
+            <div class="admonition attention">
+                <p class="first admonition-title">Attention</p>
+                <p class="last">Stream-time is only advanced if all input 
partitions over all input topics have new data (with newer timestamps) 
available.
+                    If at least one partition does not have any new data 
available, stream-time will not be advanced and thus <code class="docutils 
literal"><span class="pre">punctuate()</span></code> will not be triggered if 
<code class="docutils literal"><span 
class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
+                    This behavior is independent of the configured timestamp 
extractor, i.e., using <code class="docutils literal"><span 
class="pre">WallclockTimestampExtractor</span></code> does not enable 
wall-clock triggering of <code class="docutils literal"><span 
class="pre">punctuate()</span></code>.</p>
+            </div>
+            <p>The following example <code class="docutils literal"><span 
class="pre">Processor</span></code> defines a simple word-count algorithm and 
the following actions are performed:</p>
+            <ul class="simple">
+                <li>In the <code class="docutils literal"><span 
class="pre">init()</span></code> method, schedule the punctuation every 1000 
time units (the time unit is normally milliseconds, which in this example would 
translate to punctuation every 1 second) and retrieve the local state store by 
its name &#8220;Counts&#8221;.</li>
+                <li>In the <code class="docutils literal"><span 
class="pre">process()</span></code> method, upon each received record, split 
the value string into words, and update their counts into the state store (we 
will talk about this later in this section).</li>
+                <li>In the <code class="docutils literal"><span 
class="pre">punctuate()</span></code> method, iterate the local state store and 
send the aggregated counts to the downstream processor (we will talk about 
downstream processors later in this section), and commit the current stream 
state.</li>
+            </ul>
+            <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="kd">public</span> <span 
class="kd">class</span> <span class="nc">WordCountProcessor</span> <span 
class="kd">implements</span> <span class="n">Processor</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="o">{</span>
+
+  <span class="kd">private</span> <span class="n">ProcessorContext</span> 
<span class="n">context</span><span class="o">;</span>
+  <span class="kd">private</span> <span class="n">KeyValueStore</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Long</span><span class="o">&gt;</span> <span 
class="n">kvStore</span><span class="o">;</span>
+
+  <span class="nd">@Override</span>
+  <span class="nd">@SuppressWarnings</span><span class="o">(</span><span 
class="s">&quot;unchecked&quot;</span><span class="o">)</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">init</span><span class="o">(</span><span 
class="n">ProcessorContext</span> <span class="n">context</span><span 
class="o">)</span> <span class="o">{</span>
+      <span class="c1">// keep the processor context locally because we need 
it in punctuate() and commit()</span>
+      <span class="k">this</span><span class="o">.</span><span 
class="na">context</span> <span class="o">=</span> <span 
class="n">context</span><span class="o">;</span>
+
+      <span class="c1">// retrieve the key-value store named 
&quot;Counts&quot;</span>
+      <span class="n">kvStore</span> <span class="o">=</span> <span 
class="o">(</span><span class="n">KeyValueStore</span><span class="o">)</span> 
<span class="n">context</span><span class="o">.</span><span 
class="na">getStateStore</span><span class="o">(</span><span 
class="s">&quot;Counts&quot;</span><span class="o">);</span>
+
+      <span class="c1">// schedule a punctuate() method every 1000 
milliseconds based on stream-time</span>
+      <span class="k">this</span><span class="o">.</span><span 
class="na">context</span><span class="o">.</span><span 
class="na">schedule</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">,</span> <span 
class="n">PunctuationType</span><span class="o">.</span><span 
class="na">STREAM_TIME</span><span class="o">,</span> <span 
class="o">(</span><span class="n">timestamp</span><span class="o">)</span> 
<span class="o">-&gt;</span> <span class="o">{</span>
+          <span class="n">KeyValueIterator</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Long</span><span class="o">&gt;</span> <span 
class="n">iter</span> <span class="o">=</span> <span class="k">this</span><span 
class="o">.</span><span class="na">kvStore</span><span class="o">.</span><span 
class="na">all</span><span class="o">();</span>
+          <span class="k">while</span> <span class="o">(</span><span 
class="n">iter</span><span class="o">.</span><span 
class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
+              <span class="n">KeyValue</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;</span> <span class="n">entry</span> 
<span class="o">=</span> <span class="n">iter</span><span 
class="o">.</span><span class="na">next</span><span class="o">();</span>
+              <span class="n">context</span><span class="o">.</span><span 
class="na">forward</span><span class="o">(</span><span 
class="n">entry</span><span class="o">.</span><span class="na">key</span><span 
class="o">,</span> <span class="n">entry</span><span class="o">.</span><span 
class="na">value</span><span class="o">.</span><span 
class="na">toString</span><span class="o">());</span>
+          <span class="o">}</span>
+          <span class="n">iter</span><span class="o">.</span><span 
class="na">close</span><span class="o">();</span>
+
+          <span class="c1">// commit the current processing progress</span>
+          <span class="n">context</span><span class="o">.</span><span 
class="na">commit</span><span class="o">();</span>
+      <span class="o">});</span>
+  <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">punctuate</span><span class="o">(</span><span class="kt">long</span> 
<span class="n">timestamp</span><span class="o">)</span> <span 
class="o">{</span>
+      <span class="c1">// this method is deprecated and should not be used 
anymore</span>
+  <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">close</span><span class="o">()</span> <span class="o">{</span>
+      <span class="c1">// close the key-value store</span>
+      <span class="n">kvStore</span><span class="o">.</span><span 
class="na">close</span><span class="o">();</span>
+  <span class="o">}</span>
+
+<span class="o">}</span>
+</pre></div>
+            </div>
+            <div class="admonition note">
+                <p class="first admonition-title">Note</p>
+                <p class="last"><strong>Stateful processing with state 
stores:</strong>
+                    The <code class="docutils literal"><span 
class="pre">WordCountProcessor</span></code> defined above can access the 
currently received record in its <code class="docutils literal"><span 
class="pre">process()</span></code> method, and it can
+                    leverage <a class="reference internal" 
href="#streams-developer-guide-state-store"><span class="std std-ref">state 
stores</span></a> to maintain processing states to, for example, remember 
recently
+                    arrived records for stateful processing needs like 
aggregations and joins. For more information, see the <a class="reference 
internal" href="#streams-developer-guide-state-store"><span class="std 
std-ref">state stores</span></a> documentation.</p>
+            </div>
+        </div>
+        <div class="section" id="state-stores">
+            <span id="streams-developer-guide-state-store"></span><h2><a 
class="toc-backref" href="#id3">State Stores</a><a class="headerlink" 
href="#state-stores" title="Permalink to this headline"></a></h2>
+            <p>To implement a <strong>stateful</strong> <code class="docutils 
literal"><span class="pre">Processor</span></code> or <code class="docutils 
literal"><span class="pre">Transformer</span></code>, you must provide one or 
more state stores to the processor
+                or transformer (<em>stateless</em> processors or transformers 
do not need state stores).  State stores can be used to remember
+                recently received input records, to track rolling aggregates, 
to de-duplicate input records, and more.
+                Another feature of state stores is that they can be
+                <a class="reference internal" 
href="interactive-queries.html#streams-developer-guide-interactive-queries"><span
 class="std std-ref">interactively queried</span></a> from other applications, 
such as a
+                NodeJS-based dashboard or a microservice implemented in Scala 
or Go.</p>
+            <p>The
+                <a class="reference internal" 
href="#streams-developer-guide-state-store-defining"><span class="std 
std-ref">available state store types</span></a> in Kafka Streams have
+                <a class="reference internal" 
href="#streams-developer-guide-state-store-fault-tolerance"><span class="std 
std-ref">fault tolerance</span></a> enabled by default.</p>
+            <div class="section" id="defining-and-creating-a-state-store">
+                <span 
id="streams-developer-guide-state-store-defining"></span><h3><a 
class="toc-backref" href="#id4">Defining and creating a State Store</a><a 
class="headerlink" href="#defining-and-creating-a-state-store" title="Permalink 
to this headline"></a></h3>
+                <p>You can either use one of the available store types or
+                    <a class="reference internal" 
href="#streams-developer-guide-state-store-custom"><span class="std 
std-ref">implement your own custom store type</span></a>.
+                    It&#8217;s common practice to leverage an existing store 
type via the <code class="docutils literal"><span 
class="pre">Stores</span></code> factory.</p>
+                <p>Note that, when using Kafka Streams, you normally 
don&#8217;t create or instantiate state stores directly in your code.
+                    Rather, you define state stores indirectly by creating a 
so-called <code class="docutils literal"><span 
class="pre">StoreBuilder</span></code>.  This buildeer is used by
+                    Kafka Streams as a factory to instantiate the actual state 
stores locally in application instances when and where
+                    needed.</p>
+                <p>The following store types are available out of the box.</p>
+                <table border="1" class="non-scrolling-table width-100-percent 
docutils">
+                    <colgroup>
+                        <col width="19%" />
+                        <col width="11%" />
+                        <col width="18%" />
+                        <col width="51%" />
+                    </colgroup>
+                    <thead valign="bottom">
+                    <tr class="row-odd"><th class="head">Store Type</th>
+                        <th class="head">Storage Engine</th>
+                        <th class="head">Fault-tolerant?</th>
+                        <th class="head">Description</th>
+                    </tr>
+                    </thead>
+                    <tbody valign="top">
+                    <tr class="row-even"><td>Persistent
+                        <code class="docutils literal"><span 
class="pre">KeyValueStore&lt;K,</span> <span 
class="pre">V&gt;</span></code></td>
+                        <td>RocksDB</td>
+                        <td>Yes (enabled by default)</td>
+                        <td><ul class="first simple">
+                            <li><strong>The recommended store type for most 
use cases.</strong></li>
+                            <li>Stores its data on local disk.</li>
+                            <li>Storage capacity:
+                                managed local state can be larger than the 
memory (heap space) of an
+                                application instance, but must fit into the 
available local disk
+                                space.</li>
+                            <li>RocksDB settings can be fine-tuned, see
+                                <a class="reference internal" 
href="config-streams.html#streams-developer-guide-rocksdb-config"><span 
class="std std-ref">RocksDB configuration</span></a>.</li>
+                            <li>Available <a class="reference external" 
href="../javadocs/org/apache/kafka/streams/state/Stores.PersistentKeyValueFactory.html">store
 variants</a>:
+                                time window key-value store, session window 
key-value store.</li>
+                        </ul>
+                            <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="c1">// Creating a persistent 
key-value store:</span>
+<span class="c1">// here, we create a `KeyValueStore&lt;String, Long&gt;` 
named &quot;persistent-counts&quot;.</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.processor.StateStoreSupplier</span><span 
class="o">;</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span>
+
+<span class="c1">// Note: The `Stores` factory returns a supplier for the 
state store,</span>
+<span class="c1">// because that&#39;s what you typically need to pass as API 
parameter.</span>
+<span class="n">StateStoreSupplier</span> <span 
class="n">countStoreSupplier</span> <span class="o">=</span>
+  <span class="n">Stores</span><span class="o">.</span><span 
class="na">create</span><span class="o">(</span><span 
class="s">&quot;persistent-counts&quot;</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">withKeys</span><span 
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">())</span>
+    <span class="o">.</span><span class="na">withValues</span><span 
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span 
class="na">Long</span><span class="o">())</span>
+    <span class="o">.</span><span class="na">persistent</span><span 
class="o">()</span>
+    <span class="o">.</span><span class="na">build</span><span 
class="o">();</span>
+</pre></div>
+                            </div>
+                            <p class="last">See
+                                <a class="reference external" 
href="../javadocs/org/apache/kafka/streams/state/Stores.PersistentKeyValueFactory.html">PersistentKeyValueFactory</a>
 for
+                                detailed factory options.</p>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td>In-memory
+                        <code class="docutils literal"><span 
class="pre">KeyValueStore&lt;K,</span> <span 
class="pre">V&gt;</span></code></td>
+                        <td>-</td>
+                        <td>Yes (enabled by default)</td>
+                        <td><ul class="first simple">
+                            <li>Stores its data in memory.</li>
+                            <li>Storage capacity:
+                                managed local state must fit into memory (heap 
space) of an
+                                application instance.</li>
+                            <li>Useful when application instances run in an 
environment where local
+                                disk space is either not available or local 
disk space is wiped
+                                in-between app instance restarts.</li>
+                        </ul>
+                            <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="c1">// Creating an in-memory 
key-value store:</span>
+<span class="c1">// here, we create a `KeyValueStore&lt;String, Long&gt;` 
named &quot;inmemory-counts&quot;.</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.processor.StateStoreSupplier</span><span 
class="o">;</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span>
+
+<span class="c1">// Note: The `Stores` factory returns a supplier for the 
state store,</span>
+<span class="c1">// because that&#39;s what you typically need to pass as API 
parameter.</span>
+<span class="n">StateStoreSupplier</span> <span 
class="n">countStoreSupplier</span> <span class="o">=</span>
+  <span class="n">Stores</span><span class="o">.</span><span 
class="na">create</span><span class="o">(</span><span 
class="s">&quot;inmemory-counts&quot;</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">withKeys</span><span 
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">())</span>
+    <span class="o">.</span><span class="na">withValues</span><span 
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span 
class="na">Long</span><span class="o">())</span>
+    <span class="o">.</span><span class="na">inMemory</span><span 
class="o">()</span>
+    <span class="o">.</span><span class="na">build</span><span 
class="o">();</span>
+</pre></div>
+                            </div>
+                            <p class="last">See
+                                <a class="reference external" 
href="../javadocs/org/apache/kafka/streams/state/Stores.InMemoryKeyValueFactory.html">InMemoryKeyValueFactory</a>
 for
+                                detailed factory options.</p>
+                        </td>
+                    </tr>
+                    </tbody>
+                </table>
+            </div>
+            <div class="section" id="fault-tolerant-state-stores">
+                <span 
id="streams-developer-guide-state-store-fault-tolerance"></span><h3><a 
class="toc-backref" href="#id5">Fault-tolerant State Stores</a><a 
class="headerlink" href="#fault-tolerant-state-stores" title="Permalink to this 
headline"></a></h3>
+                <p>To make state stores fault-tolerant and to allow for state 
store migration without data loss, a state store can be
+                    continuously backed up to a Kafka topic behind the scenes. 
For example, to migrate a stateful stream task from one
+                    machine to another when <a class="reference internal" 
href="running-app.html#streams-developer-guide-execution-scaling"><span 
class="std std-ref">elastically adding or removing capacity from your 
application</span></a>.
+                    This topic is sometimes referred to as the state 
store&#8217;s associated <em>changelog topic</em>, or its <em>changelog</em>.  
For example, if
+                    you experience machine failure, the state store and the 
application&#8217;s state can be fully restored from its changelog. You can
+                    <a class="reference internal" 
href="#streams-developer-guide-state-store-enable-disable-fault-tolerance"><span
 class="std std-ref">enable or disable this backup feature</span></a> for a
+                    state store.</p>
+                <p>By default, persistent key-value stores are fault-tolerant. 
 They are backed by a
+                    <a class="reference external" 
href="https://kafka.apache.org/documentation.html#compaction";>compacted</a> 
changelog topic.  The purpose of compacting this
+                    topic is to prevent the topic from growing indefinitely, 
to reduce the storage consumed in the associated Kafka cluster,
+                    and to minimize recovery time if a state store needs to be 
restored from its changelog topic.</p>
+                <p>Similarly, persistent window stores are fault-tolerant.  
They are backed by a topic that uses both compaction and
+                    deletion. Because of the structure of the message keys 
that are being sent to the changelog topics, this combination of
+                    deletion and compaction is required for the changelog 
topics of window stores. For window stores, the message keys are
+                    composite keys that include the &#8220;normal&#8221; key 
and window timestamps.  For these types of composite keys it would not
+                    be sufficient to only enable compaction to prevent a 
changelog topic from growing out of bounds.  With deletion
+                    enabled, old windows that have expired will be cleaned up 
by Kafka&#8217;s log cleaner as the log segments expire.  The
+                    default retention setting is <code class="docutils 
literal"><span class="pre">Windows#maintainMs()</span></code> + 1 day.  You can 
override this setting by specifying
+                    <code class="docutils literal"><span 
class="pre">StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG</span></code>
 in the <code class="docutils literal"><span 
class="pre">StreamsConfig</span></code>.</p>
+                <p>When you open an <code class="docutils literal"><span 
class="pre">Iterator</span></code> from a state store you must call <code 
class="docutils literal"><span class="pre">close()</span></code> on the 
iterator when you are done working with
+                    it to reclaim resources; or you can use the iterator from 
within a try-with-resources statement. If you do not close an iterator,
+                    you may encounter an OOM error.</p>
+            </div>
+            <div class="section" 
id="enable-or-disable-fault-tolerance-of-state-stores-store-changelogs">
+                <span 
id="streams-developer-guide-state-store-enable-disable-fault-tolerance"></span><h3><a
 class="toc-backref" href="#id6">Enable or Disable Fault Tolerance of State 
Stores (Store Changelogs)</a><a class="headerlink" 
href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" 
title="Permalink to this headline"></a></h3>
+                <p>You can enable or disable fault tolerance for a state store 
by enabling or disabling the change logging
+                    of the store through <code class="docutils literal"><span 
class="pre">enableLogging()</span></code> and <code class="docutils 
literal"><span class="pre">disableLogging()</span></code>.
+                    You can also fine-tune the associated topic’s 
configuration if needed.</p>
+                <p>Example for disabling fault-tolerance:</p>
+                <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span 
class="o">;</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span>
+
+<span class="n">StoreBuilder</span><span class="o">&lt;</span><span 
class="n">KeyValueStore</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;&gt;</span> <span 
class="n">countStoreSupplier</span> <span class="o">=</span> <span 
class="n">Stores</span><span class="o">.</span><span 
class="na">keyValueStoreBuilder</span><span class="o">(</span>
+  <span class="n">Stores</span><span class="o">.</span><span 
class="na">persistentKeyValueStore</span><span class="o">(</span><span 
class="s">&quot;Counts&quot;</span><span class="o">),</span>
+    <span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">(),</span>
+    <span class="n">Serdes</span><span class="o">.</span><span 
class="na">Long</span><span class="o">())</span>
+  <span class="o">.</span><span class="na">withLoggingDisabled</span><span 
class="o">();</span> <span class="c1">// disable backing up the store to a 
changelog topic</span>
+</pre></div>
+                </div>
+                <div class="admonition attention">
+                    <p class="first admonition-title">Attention</p>
+                    <p class="last">If the changelog is disabled then the 
attached state store is no longer fault tolerant and it can&#8217;t have any <a 
class="reference internal" 
href="config-streams.html#streams-developer-guide-standby-replicas"><span 
class="std std-ref">standby replicas</span></a>.</p>
+                </div>
+                <p>Here is an example for enabling fault tolerance, with 
additional changelog-topic configuration:
+                    You can add any log config from <a class="reference 
external" 
href="https://github.com/apache/kafka/blob/1.0/core/src/main/scala/kafka/log/LogConfig.scala#L61";>kafka.log.LogConfig</a>.
+                    Unrecognized configs will be ignored.</p>
+                <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span 
class="o">;</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.state.Stores</span><span class="o">;</span>
+
+<span class="n">Map</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">changelogConfig</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">HashMap</span><span class="o">();</span>
+<span class="c1">// override min.insync.replicas</span>
+<span class="n">changelogConfig</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;min.insyc.replicas&quot;</span><span class="o">,</span> <span 
class="s">&quot;1&quot;</span><span class="o">)</span>
+
+<span class="n">StoreBuilder</span><span class="o">&lt;</span><span 
class="n">KeyValueStore</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;&gt;</span> <span 
class="n">countStoreSupplier</span> <span class="o">=</span> <span 
class="n">Stores</span><span class="o">.</span><span 
class="na">keyValueStoreBuilder</span><span class="o">(</span>
+  <span class="n">Stores</span><span class="o">.</span><span 
class="na">persistentKeyValueStore</span><span class="o">(</span><span 
class="s">&quot;Counts&quot;</span><span class="o">),</span>
+    <span class="n">Serdes</span><span class="o">.</span><span 
class="na">String</span><span class="o">(),</span>
+    <span class="n">Serdes</span><span class="o">.</span><span 
class="na">Long</span><span class="o">())</span>
+  <span class="o">.</span><span class="na">withLoggingEnabled</span><span 
class="o">(</span><span class="n">changlogConfig</span><span 
class="o">);</span> <span class="c1">// enable changelogging, with custom 
changelog settings</span>
+</pre></div>
+                </div>
+            </div>
+            <div class="section" id="implementing-custom-state-stores">
+                <span 
id="streams-developer-guide-state-store-custom"></span><h3><a 
class="toc-backref" href="#id7">Implementing Custom State Stores</a><a 
class="headerlink" href="#implementing-custom-state-stores" title="Permalink to 
this headline"></a></h3>
+                <p>You can use the <a class="reference internal" 
href="#streams-developer-guide-state-store-defining"><span class="std 
std-ref">built-in state store types</span></a> or  implement your own.
+                    The primary interface to implement for the store is
+                    <code class="docutils literal"><span 
class="pre">org.apache.kafka.streams.processor.StateStore</span></code>.  Kafka 
Streams also has a few extended interfaces such
+                    as <code class="docutils literal"><span 
class="pre">KeyValueStore</span></code>.</p>
+                <p>You also need to provide a &#8220;factory&#8221; for the 
store by implementing the
+                    <code class="docutils literal"><span 
class="pre">org.apache.kafka.streams.processor.StateStoreSupplier</span></code> 
interface, which Kafka Streams uses to create instances of
+                    your store.</p>
+            </div>
+        </div>
+        <div class="section" id="connecting-processors-and-state-stores">
+            <h2><a class="toc-backref" href="#id8">Connecting Processors and 
State Stores</a><a class="headerlink" 
href="#connecting-processors-and-state-stores" title="Permalink to this 
headline"></a></h2>
+            <p>Now that a <a class="reference internal" 
href="#streams-developer-guide-stream-processor"><span class="std 
std-ref">processor</span></a> (WordCountProcessor) and the
+                state stores have been defined, you can construct the 
processor topology by connecting these processors and state stores together by
+                using the <code class="docutils literal"><span 
class="pre">Topology</span></code> instance.  In addition, you can add source 
processors with the specified Kafka topics
+                to generate input data streams into the topology, and sink 
processors with the specified Kafka topics to generate
+                output data streams out of the topology.</p>
+            <p>Here is an example implementation:</p>
+            <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Topology</span> <span 
class="n">builder</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">Topology</span><span class="o">();</span>
+
+<span class="c1">// add the source processor node that takes Kafka topic 
&quot;source-topic&quot; as input</span>
+<span class="n">builder</span><span class="o">.</span><span 
class="na">addSource</span><span class="o">(</span><span 
class="s">&quot;Source&quot;</span><span class="o">,</span> <span 
class="s">&quot;source-topic&quot;</span><span class="o">)</span>
+
+    <span class="c1">// add the WordCountProcessor node which takes the source 
processor as its upstream processor</span>
+    <span class="o">.</span><span class="na">addProcessor</span><span 
class="o">(</span><span class="s">&quot;Process&quot;</span><span 
class="o">,</span> <span class="o">()</span> <span class="o">-&gt;</span> <span 
class="k">new</span> <span class="n">WordCountProcessor</span><span 
class="o">(),</span> <span class="s">&quot;Source&quot;</span><span 
class="o">)</span>
+
+    <span class="c1">// add the count store associated with the 
WordCountProcessor processor</span>
+    <span class="o">.</span><span class="na">addStateStore</span><span 
class="o">(</span><span class="n">countStoreBuilder</span><span 
class="o">,</span> <span class="s">&quot;Process&quot;</span><span 
class="o">)</span>
+
+    <span class="c1">// add the sink processor node that takes Kafka topic 
&quot;sink-topic&quot; as output</span>
+    <span class="c1">// and the WordCountProcessor node as its upstream 
processor</span>
+    <span class="o">.</span><span class="na">addSink</span><span 
class="o">(</span><span class="s">&quot;Sink&quot;</span><span 
class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span 
class="o">,</span> <span class="s">&quot;Process&quot;</span><span 
class="o">);</span>
+</pre></div>
+            </div>
+            <p>Here is a quick explanation of this example:</p>
+            <ul class="simple">
+                <li>A source processor node named <code class="docutils 
literal"><span class="pre">&quot;Source&quot;</span></code> is added to the 
topology using the <code class="docutils literal"><span 
class="pre">addSource</span></code> method, with one Kafka topic
+                    <code class="docutils literal"><span 
class="pre">&quot;source-topic&quot;</span></code> fed to it.</li>
+                <li>A processor node named <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> with the 
pre-defined <code class="docutils literal"><span 
class="pre">WordCountProcessor</span></code> logic is then added as the 
downstream
+                    processor of the <code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node using the <code 
class="docutils literal"><span class="pre">addProcessor</span></code> 
method.</li>
+                <li>A predefined persistent key-value state store is created 
and associated with the <code class="docutils literal"><span 
class="pre">&quot;Process&quot;</span></code> node, using
+                    <code class="docutils literal"><span 
class="pre">countStoreBuilder</span></code>.</li>
+                <li>A sink processor node is then added to complete the 
topology using the <code class="docutils literal"><span 
class="pre">addSink</span></code> method, taking the <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> node
+                    as its upstream processor and writing to a separate <code 
class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> 
Kafka topic.</li>
+            </ul>
+            <p>In this topology, the <code class="docutils literal"><span 
class="pre">&quot;Process&quot;</span></code> stream processor node is 
considered a downstream processor of the <code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node, and an
+                upstream processor of the <code class="docutils literal"><span 
class="pre">&quot;Sink&quot;</span></code> node.  As a result, whenever the 
<code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node forwards a newly fetched 
record from
+                Kafka to its downstream <code class="docutils literal"><span 
class="pre">&quot;Process&quot;</span></code> node, the <code class="docutils 
literal"><span class="pre">WordCountProcessor#process()</span></code> method is 
triggered to process the record and
+                update the associated state store. Whenever <code 
class="docutils literal"><span class="pre">context#forward()</span></code> is 
called in the
+                <code class="docutils literal"><span 
class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate 
key-value pair will be sent via the <code class="docutils literal"><span 
class="pre">&quot;Sink&quot;</span></code> processor node to
+                the Kafka topic <code class="docutils literal"><span 
class="pre">&quot;sink-topic&quot;</span></code>.  Note that in the <code 
class="docutils literal"><span class="pre">WordCountProcessor</span></code> 
implementation, you must refer to the
+                same store name <code class="docutils literal"><span 
class="pre">&quot;Counts&quot;</span></code> when accessing the key-value 
store, otherwise an exception will be thrown at runtime,
+                indicating that the state store cannot be found. If the state 
store is not associated with the processor
+                in the <code class="docutils literal"><span 
class="pre">Topology</span></code> code, accessing it in the processor&#8217;s 
<code class="docutils literal"><span class="pre">init()</span></code> method 
will also throw an exception at
+                runtime, indicating the state store is not accessible from 
this processor.</p>
+            <p>Now that you have fully defined your processor topology in your 
application, you can proceed to
+                <a class="reference internal" 
href="running-app.html#streams-developer-guide-execution"><span class="std 
std-ref">running the Kafka Streams application</span></a>.</p>
+</div>
+</div>
+
+
+               </div>
+              </div>
+              <div class="pagination">
+                <a 
href="/{{version}}/documentation/streams/developer-guide/dsl-api" 
class="pagination__btn pagination__btn__prev">Previous</a>
+                <a 
href="/{{version}}/documentation/streams/developer-guide/datatypes" 
class="pagination__btn pagination__btn__next">Next</a>
+              </div>
+                </script>
+
+                <!--#include virtual="../../../includes/_header.htm" -->
+                <!--#include virtual="../../../includes/_top.htm" -->
+                    <div class="content documentation documentation--current">
+                    <!--#include virtual="../../../includes/_nav.htm" -->
+                    <div class="right">
+                    <!--#include virtual="../../../includes/_docs_banner.htm" 
-->
+                    <ul class="breadcrumbs">
+                    <li><a href="/documentation">Documentation</a></li>
+                    <li><a href="/documentation/streams">Kafka Streams</a></li>
+                    <li><a 
href="/documentation/streams/developer-guide/">Developer Guide</a></li>
+                </ul>
+                <div class="p-content"></div>
+                    </div>
+                    </div>
+                    <!--#include virtual="../../../includes/_footer.htm" -->
+                    <script>
+                    $(function() {
+                        // Show selected style on nav item
+                        $('.b-nav__streams').addClass('selected');
+
+                        //sticky secondary nav
+                        var $navbar = $(".sub-nav-sticky"),
+                            y_pos = $navbar.offset().top,
+                            height = $navbar.height();
+
+                        $(window).scroll(function() {
+                            var scrollTop = $(window).scrollTop();
+
+                            if (scrollTop > y_pos - height) {
+                                $navbar.addClass("navbar-fixed")
+                            } else if (scrollTop <= y_pos) {
+                                $navbar.removeClass("navbar-fixed")
+                            }
+                        });
+
+                        // Display docs subnav items
+                        
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+                    });
+              </script>

Reply via email to