This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new e330635  MINOR: Re-apply 2.6 docs commit (#275) (4c19eab) (#301)
e330635 is described below

commit e3306354a25c70f53270552a45125f7a3b5f1a1c
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Wed Sep 9 12:33:16 2020 -0700

    MINOR: Re-apply 2.6 docs commit (#275) (4c19eab) (#301)
    
    The PR to make several Streams-relevant changes for the
    2.6 release (4c19eab / #275) was accidentally reverted by
    35d3b804b86c249aace9a729c5abd9be4525ef3d
    
    This commit restores the desired docs changes.
    
    Reviewers: John Roesler <[email protected]>
---
 26/streams/architecture.html                   |   4 +-
 26/streams/developer-guide/config-streams.html | 722 +++++++++++++++----------
 26/streams/developer-guide/memory-mgmt.html    |   5 +-
 26/streams/developer-guide/running-app.html    |  12 +
 26/streams/upgrade-guide.html                  |  16 +-
 5 files changed, 460 insertions(+), 299 deletions(-)

diff --git a/26/streams/architecture.html b/26/streams/architecture.html
index 544108d..67594c2 100644
--- a/26/streams/architecture.html
+++ b/26/streams/architecture.html
@@ -151,8 +151,10 @@
     <p>
         Note that the cost of task (re)initialization typically depends 
primarily on the time for restoring the state by replaying the state stores' 
associated changelog topics.
         To minimize this restoration time, users can configure their 
applications to have <b>standby replicas</b> of local states (i.e. fully 
replicated copies of the state).
-        When a task migration happens, Kafka Streams then attempts to assign a 
task to an application instance where such a standby replica already exists in 
order to minimize
+        When a task migration happens, Kafka Streams will assign a task to an 
application instance where such a standby replica already exists in order to 
minimize
         the task (re)initialization cost. See 
<code>num.standby.replicas</code> in the <a 
href="/{{version}}/documentation/#streamsconfigs"><b>Kafka Streams 
Configs</b></a> section.
+        Starting in 2.6, Kafka Streams will guarantee that a task is only ever 
assigned to an instance with a fully caught-up local copy of the state, if such 
an instance
+        exists. Standby tasks will increase the likelihood that a caught-up 
instance exists in the case of a failure.
     </p>
 
     <div class="pagination">
diff --git a/26/streams/developer-guide/config-streams.html 
b/26/streams/developer-guide/config-streams.html
index 2ad5951..d6164f0 100644
--- a/26/streams/developer-guide/config-streams.html
+++ b/26/streams/developer-guide/config-streams.html
@@ -62,24 +62,31 @@
           </ul>
           </li>
           <li><a class="reference internal" 
href="#optional-configuration-parameters" id="id6">Optional configuration 
parameters</a><ul>
+            <li><a class="reference internal" href="#acceptable-recovery-lag" 
id="id27">acceptable.recovery.lag</a></li>
             <li><a class="reference internal" 
href="#default-deserialization-exception-handler" 
id="id7">default.deserialization.exception.handler</a></li>
-            <li><a class="reference internal" 
href="#default-production-exception-handler" 
id="id24">default.production.exception.handler</a></li>
             <li><a class="reference internal" href="#default-key-serde" 
id="id8">default.key.serde</a></li>
+            <li><a class="reference internal" 
href="#default-production-exception-handler" 
id="id24">default.production.exception.handler</a></li>
+            <li><a class="reference internal" href="#timestamp-extractor" 
id="id15">default.timestamp.extractor</a></li>
             <li><a class="reference internal" href="#default-value-serde" 
id="id9">default.value.serde</a></li>
+            <li><a class="reference internal" 
href="#default-windowed-key-serde-inner" 
id="id32">default.windowed.key.serde.inner</a></li>
+            <li><a class="reference internal" 
href="#default-windowed-value-serde-inner" 
id="id33">default.windowed.value.serde.inner</a></li>
+            <li><a class="reference internal" href="#max-task-idle-ms" 
id="id28">max.task.idle.ms</a></li>
+            <li><a class="reference internal" href="#max-warmup-replicas" 
id="id29">max.warmup.replicas</a></li>
             <li><a class="reference internal" href="#num-standby-replicas" 
id="id10">num.standby.replicas</a></li>
             <li><a class="reference internal" href="#num-stream-threads" 
id="id11">num.stream.threads</a></li>
             <li><a class="reference internal" href="#partition-grouper" 
id="id12">partition.grouper</a></li>
+            <li><a class="reference internal" 
href="#probing-rebalance-interval-ms" 
id="id30">probing.rebalance.interval.ms</a></li>
             <li><a class="reference internal" href="#processing-guarantee" 
id="id25">processing.guarantee</a></li>
             <li><a class="reference internal" href="#replication-factor" 
id="id13">replication.factor</a></li>
+            <li><a class="reference internal" href="#rocksdb-config-setter" 
id="id20">rocksdb.config.setter</a></li>
             <li><a class="reference internal" href="#state-dir" 
id="id14">state.dir</a></li>
-            <li><a class="reference internal" href="#timestamp-extractor" 
id="id15">timestamp.extractor</a></li>
+            <li><a class="reference internal" href="#topology-optimization" 
id="id31">topology.optimization</a></li>
           </ul>
           </li>
           <li><a class="reference internal" 
href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka 
consumers and producer configuration parameters</a><ul>
             <li><a class="reference internal" href="#naming" 
id="id17">Naming</a></li>
             <li><a class="reference internal" href="#default-values" 
id="id18">Default Values</a></li>
             <li><a class="reference internal" href="#enable-auto-commit" 
id="id19">enable.auto.commit</a></li>
-            <li><a class="reference internal" href="#rocksdb-config-setter" 
id="id20">rocksdb.config.setter</a></li>
           </ul>
           </li>
           <li><a class="reference internal" 
href="#recommended-configuration-parameters-for-resiliency" 
id="id21">Recommended configuration parameters for resiliency</a><ul>
@@ -165,6 +172,11 @@
           </tr>
           </thead>
           <tbody valign="top">
+          <tr class="row-odd"><td>acceptable.recovery.lag</td>
+            <td>Medium</td>
+            <td colspan="2">The maximum acceptable lag (number of offsets to 
catch up) for an instance to be considered caught-up and ready for the active 
task.</td>
+            <td>10000</td>
+          </tr>
           <tr class="row-even"><td>application.server</td>
             <td>Low</td>
             <td colspan="2">A host:port pair pointing to an embedded user 
defined endpoint that can be used for discovering the locations of
@@ -198,16 +210,47 @@
             <td colspan="2">Exception handling class that implements the <code 
class="docutils literal"><span 
class="pre">DeserializationExceptionHandler</span></code> interface.</td>
             <td><code class="docutils literal"><span 
class="pre">LogAndContinueExceptionHandler</span></code></td>
           </tr>
-          <tr class="row-even"><td>default.production.exception.handler</td>
+          <tr class="row-even"><td>default.key.serde</td>
+            <td>Medium</td>
+            <td colspan="2">Default serializer/deserializer class for record 
keys, implements the <code class="docutils literal"><span 
class="pre">Serde</span></code> interface (see also default.value.serde).</td>
+            <td><code class="docutils literal"><span 
class="pre">Serdes.ByteArray().getClass().getName()</span></code></td>
+          </tr>
+          <tr class="row-odd"><td>default.production.exception.handler</td>
             <td>Medium</td>
             <td colspan="2">Exception handling class that implements the <code 
class="docutils literal"><span 
class="pre">ProductionExceptionHandler</span></code> interface.</td>
             <td><code class="docutils literal"><span 
class="pre">DefaultProductionExceptionHandler</span></code></td>
           </tr>
-          <tr class="row-odd"><td>key.serde</td>
+          <tr class="row-even"><td>default.timestamp.extractor</td>
             <td>Medium</td>
-            <td colspan="2">Default serializer/deserializer class for record 
keys, implements the <code class="docutils literal"><span 
class="pre">Serde</span></code> interface (see also value.serde).</td>
+            <td colspan="2">Timestamp extractor class that implements the 
<code class="docutils literal"><span 
class="pre">TimestampExtractor</span></code> interface.</td>
+            <td>See <a class="reference internal" 
href="#streams-developer-guide-timestamp-extractor"><span class="std 
std-ref">Timestamp Extractor</span></a></td>
+          </tr>
+          <tr class="row-odd"><td>default.value.serde</td>
+            <td>Medium</td>
+            <td colspan="2">Default serializer/deserializer class for record 
values, implements the <code class="docutils literal"><span 
class="pre">Serde</span></code> interface (see also default.key.serde).</td>
             <td><code class="docutils literal"><span 
class="pre">Serdes.ByteArray().getClass().getName()</span></code></td>
           </tr>
+          <tr class="row-even"><td>default.windowed.key.serde.inner</td>
+            <td>Medium</td>
+            <td colspan="2">Default serializer/deserializer for the inner 
class of windowed keys, implementing the <code class="docutils literal"><span 
class="pre">Serde</span></code> interface.</td>
+            <td>null</td>
+          </tr>
+          <tr class="row-odd"><td>default.windowed.value.serde.inner</td>
+            <td>Medium</td>
+            <td colspan="2">Default serializer/deserializer for the inner 
class of windowed values, implementing the <code class="docutils literal"><span 
class="pre">Serde</span></code> interface.</td>
+            <td>null</td>
+          </tr>
+          <tr class="row-even"><td>max.task.idle.ms</td>
+            <td>Medium</td>
+            <td colspan="2">Maximum amount of time a stream task will stay 
idle while waiting for all partitions to contain data and avoid potential 
out-of-order record
+              processing across multiple input streams.</td>
+            <td>0 milliseconds</td>
+          </tr>
+          <tr class="row-odd"><td>max.warmup.replicas</td>
+            <td>Medium</td>
+            <td colspan="2">The maximum number of warmup replicas (extra 
standbys beyond the configured num.standbys) that can be assigned at once.</td>
+            <td>2</td>
+          </tr>
           <tr class="row-even"><td>metric.reporters</td>
             <td>Low</td>
             <td colspan="2">A list of classes to use as metrics reporters.</td>
@@ -243,10 +286,15 @@
             <td colspan="2">Partition grouper class that implements the <code 
class="docutils literal"><span class="pre">PartitionGrouper</span></code> 
interface.</td>
             <td>See <a class="reference internal" 
href="#streams-developer-guide-partition-grouper"><span class="std 
std-ref">Partition Grouper</span></a></td>
           </tr>
+          <tr class="row-odd"><td>probing.rebalance.interval.ms</td>
+            <td>Low</td>
+            <td colspan="2">The maximum time to wait before triggering a 
rebalance to probe for warmup replicas that have sufficiently caught up.</td>
+            <td>600000 milliseconds (10 minutes)</td>
+          </tr>
           <tr class="row-even"><td>processing.guarantee</td>
             <td>Medium</td>
             <td colspan="2">The processing mode. Can be either <code 
class="docutils literal"><span class="pre">"at_least_once"</span></code> 
(default),
-              <code class="docutils literal"><span 
class="pre">"exactly_once"</span></code>, or <code class="docutils 
literal"><span class="pre">"exactly_once_beta"</span></code>.
+              <code class="docutils literal"><span 
class="pre">"exactly_once"</span></code>, or <code class="docutils 
literal"><span class="pre">"exactly_once_beta"</span></code></td>.
             <td>See <a class="reference internal" 
href="#streams-developer-guide-processing-guarantedd"><span class="std 
std-ref">Processing Guarantee</span></a></td>
           </tr>
           <tr class="row-odd"><td>poll.ms</td>
@@ -259,46 +307,41 @@
             <td colspan="2">The replication factor for changelog topics and 
repartition topics created by the application.</td>
             <td>1</td>
           </tr>
-          <tr class="row-even"><td>retries</td>
-              <td>Medium</td>
-              <td colspan="2">The number of retries for broker requests that 
return a retryable error. </td>
-              <td>0</td>
+          <tr class="row-odd"><td>retries</td>
+            <td>Medium</td>
+            <td colspan="2">The number of retries for broker requests that 
return a retryable error. </td>
+            <td>0</td>
           </tr>
           <tr class="row-even"><td>retry.backoff.ms</td>
-              <td>Medium</td>
-              <td colspan="2">The amount of time in milliseconds, before a 
request is retried. This applies if the <code class="docutils literal"><span 
class="pre">retries</span></code> parameter is configured to be greater than 0. 
</td>
-              <td>100</td>
+            <td>Medium</td>
+            <td colspan="2">The amount of time in milliseconds, before a 
request is retried. This applies if the <code class="docutils literal"><span 
class="pre">retries</span></code> parameter is configured to be greater than 0. 
</td>
+            <td>100</td>
           </tr>
-          <tr class="row-even"><td>rocksdb.config.setter</td>
+          <tr class="row-odd"><td>rocksdb.config.setter</td>
             <td>Medium</td>
             <td colspan="2">The RocksDB configuration.</td>
             <td></td>
           </tr>
-          <tr class="row-odd"><td>state.cleanup.delay.ms</td>
+          <tr class="row-even"><td>state.cleanup.delay.ms</td>
             <td>Low</td>
             <td colspan="2">The amount of time in milliseconds to wait before 
deleting state when a partition has migrated.</td>
             <td>600000 milliseconds</td>
           </tr>
-          <tr class="row-even"><td>state.dir</td>
+          <tr class="row-odd"><td>state.dir</td>
             <td>High</td>
             <td colspan="2">Directory location for state stores.</td>
             <td><code class="docutils literal"><span 
class="pre">/tmp/kafka-streams</span></code></td>
           </tr>
-          <tr class="row-odd"><td>timestamp.extractor</td>
+          <tr class="row-even"><td>topology.optimization</td>
             <td>Medium</td>
-            <td colspan="2">Timestamp extractor class that implements the 
<code class="docutils literal"><span 
class="pre">TimestampExtractor</span></code> interface.</td>
-            <td>See <a class="reference internal" 
href="#streams-developer-guide-timestamp-extractor"><span class="std 
std-ref">Timestamp Extractor</span></a></td>
+            <td colspan="2">A configuration telling Kafka Streams if it should 
optimize the topology</td>
+            <td>none</td>
           </tr>
-          <tr class="row-even"><td>upgrade.from</td>
+          <tr class="row-odd"><td>upgrade.from</td>
             <td>Medium</td>
             <td colspan="2">The version you are upgrading from during a 
rolling upgrade.</td>
             <td>See <a class="reference internal" 
href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade 
From</span></a></td>
           </tr>
-          <tr class="row-odd"><td>value.serde</td>
-            <td>Medium</td>
-            <td colspan="2">Default serializer/deserializer class for record 
values, implements the <code class="docutils literal"><span 
class="pre">Serde</span></code> interface (see also key.serde).</td>
-            <td><code class="docutils literal"><span 
class="pre">Serdes.ByteArray().getClass().getName()</span></code></td>
-          </tr>
           <tr 
class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
             <td>Low</td>
             <td colspan="2">Added to a windows maintainMs to ensure data is 
not deleted from the log prematurely. Allows for clock drift.</td>
@@ -306,6 +349,22 @@
           </tr>
           </tbody>
         </table>
+        <div class="section" id="acceptable-recovery-lag">
+          <h4><a class="toc-backref" 
href="#id27">acceptable.recovery.lag</a><a class="headerlink" 
href="#acceptable-recovery-lag" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div>
+              <p>
+                The maximum acceptable lag (total number of offsets to catch 
up from the changelog) for an instance to be considered caught-up and able to 
receive an active task. Streams will only assign
+                stateful active tasks to instances whose state stores are 
within the acceptable recovery lag, if any exist, and assign warmup replicas to 
restore state in the background for instances
+                that are not yet caught up. Should correspond to a recovery 
time of well under a minute for a given workload. Must be at least 0.
+              </p>
+              <p>
+                Note: if you set this to <code>Long.MAX_VALUE</code> it 
effectively disables the warmup replicas and task high availability, allowing 
Streams to immediately produce a balanced
+                assignment and migrate tasks to a new instance without first 
warming them up.
+              </p>
+            </div>
+          </blockquote>
+        </div>
         <div class="section" id="default-deserialization-exception-handler">
           <span id="streams-developer-guide-deh"></span><h4><a 
class="toc-backref" href="#id7">default.deserialization.exception.handler</a><a 
class="headerlink" href="#default-deserialization-exception-handler" 
title="Permalink to this headline"></a></h4>
           <blockquote>
@@ -365,8 +424,8 @@
               such as attempting to produce a record that is too large. By 
default, Kafka provides and uses the <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html">DefaultProductionExceptionHandler</a>
               that always fails when these exceptions occur.</p>
 
-            <p>Each exception handler can return a <code>FAIL</code> or 
<code>CONTINUE</code> depending on the record and the exception thrown. 
Returning <code>FAIL</code> will signal that Streams should shut down and 
<code>CONTINUE</code> will signal that Streams
-            should ignore the issue and continue processing. If you want to 
provide an exception handler that always ignores records that are too large, 
you could implement something like the following:</p>
+              <p>Each exception handler can return a <code>FAIL</code> or 
<code>CONTINUE</code> depending on the record and the exception thrown. 
Returning <code>FAIL</code> will signal that Streams should shut down and 
<code>CONTINUE</code> will signal that Streams
+                should ignore the issue and continue processing. If you want 
to provide an exception handler that always ignores records that are too large, 
you could implement something like the following:</p>
 
             <pre class="line-numbers"><code class="language-java">
             import java.util.Properties;
@@ -396,18 +455,106 @@
                          IgnoreRecordTooLargeHandler.class);</code></pre></div>
           </blockquote>
         </div>
+        <div class="section" id="timestamp-extractor">
+          <span id="streams-developer-guide-timestamp-extractor"></span><h4><a 
class="toc-backref" href="#id15">default.timestamp.extractor</a><a 
class="headerlink" href="#timestamp-extractor" title="Permalink to this 
headline"></a></h4>
+          <blockquote>
+            <div><p>A timestamp extractor pulls a timestamp from an instance 
of <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html">ConsumerRecord</a>.
+              Timestamps are used to control the progress of streams.</p>
+              <p>The default extractor is
+                <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.html">FailOnInvalidTimestamp</a>.
+                This extractor retrieves built-in timestamps that are 
automatically embedded into Kafka messages by the Kafka producer
+                client since
+                <a class="reference external" 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message";>Kafka
 version 0.10</a>.
+                Depending on the setting of Kafka&#8217;s server-side <code 
class="docutils literal"><span 
class="pre">log.message.timestamp.type</span></code> broker and <code 
class="docutils literal"><span class="pre">message.timestamp.type</span></code> 
topic parameters,
+                this extractor provides you with:</p>
+              <ul class="simple">
+                <li><strong>event-time</strong> processing semantics if <code 
class="docutils literal"><span 
class="pre">log.message.timestamp.type</span></code> is set to <code 
class="docutils literal"><span class="pre">CreateTime</span></code> aka 
&#8220;producer time&#8221;
+                  (which is the default).  This represents the time when a 
Kafka producer sent the original message.  If you use Kafka&#8217;s
+                  official producer client, the timestamp represents 
milliseconds since the epoch.</li>
+                <li><strong>ingestion-time</strong> processing semantics if 
<code class="docutils literal"><span 
class="pre">log.message.timestamp.type</span></code> is set to <code 
class="docutils literal"><span class="pre">LogAppendTime</span></code> aka 
&#8220;broker
+                  time&#8221;.  This represents the time when the Kafka broker 
received the original message, in milliseconds since the epoch.</li>
+              </ul>
+              <p>The <code class="docutils literal"><span 
class="pre">FailOnInvalidTimestamp</span></code> extractor throws an exception 
if a record contains an invalid (i.e. negative) built-in
+                timestamp, because Kafka Streams would not process this record 
but silently drop it.  Invalid built-in timestamps can
+                occur for various reasons:  if for example, you consume a 
topic that is written to by pre-0.10 Kafka producer clients
+                or by third-party producer clients that don&#8217;t support 
the new Kafka 0.10 message format yet;  another situation where
+                this may happen is after upgrading your Kafka cluster from 
<code class="docutils literal"><span class="pre">0.9</span></code> to <code 
class="docutils literal"><span class="pre">0.10</span></code>, where all the 
data that was generated
+                with <code class="docutils literal"><span 
class="pre">0.9</span></code> does not include the <code class="docutils 
literal"><span class="pre">0.10</span></code> message timestamps.</p>
+              <p>If you have data with invalid timestamps and want to process 
it, then there are two alternative extractors available.
+                Both work on built-in timestamps, but handle invalid 
timestamps differently.</p>
+              <ul class="simple">
+                <li><a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.html">LogAndSkipOnInvalidTimestamp</a>:
+                  This extractor logs a warn message and returns the invalid 
timestamp to Kafka Streams, which will not process but
+                  silently drop the record.
+                  This log-and-skip strategy allows Kafka Streams to make 
progress instead of failing if there are records with an
+                  invalid built-in timestamp in your input data.</li>
+                <li><a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/processor/UsePartitionTimeOnInvalidTimestamp.html">UsePartitionTimeOnInvalidTimestamp</a>.
+                  This extractor returns the record&#8217;s built-in timestamp 
if it is valid (i.e. not negative).  If the record does not
+                  have a valid built-in timestamps, the extractor returns the 
previously extracted valid timestamp from a record of the
+                  same topic partition as the current record as a timestamp 
estimation.  In case that no timestamp can be estimated, it
+                  throws an exception.</li>
+              </ul>
+              <p>Another built-in extractor is
+                <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/processor/WallclockTimestampExtractor.html">WallclockTimestampExtractor</a>.
+                This extractor does not actually &#8220;extract&#8221; a 
timestamp from the consumed record but rather returns the current time in
+                milliseconds from the system clock (think: <code 
class="docutils literal"><span 
class="pre">System.currentTimeMillis()</span></code>), which effectively means 
Streams will operate
+                on the basis of the so-called <strong>processing-time</strong> 
of events.</p>
+              <p>You can also provide your own timestamp extractors, for 
instance to retrieve timestamps embedded in the payload of
+                messages.  If you cannot extract a valid timestamp, you can 
either throw an exception, return a negative timestamp, or
+                estimate a timestamp.  Returning a negative timestamp will 
result in data loss &#8211; the corresponding record will not be
+                processed but silently dropped.  If you want to estimate a new 
timestamp, you can use the value provided via
+                <code class="docutils literal"><span 
class="pre">previousTimestamp</span></code> (i.e., a Kafka Streams timestamp 
estimation).  Here is an example of a custom
+                <code class="docutils literal"><span 
class="pre">TimestampExtractor</span></code> implementation:</p>
+              <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="kn">import</span> <span 
class="nn">org.apache.kafka.clients.consumer.ConsumerRecord</span><span 
class="o">;</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.processor.TimestampExtractor</span><span 
class="o">;</span>
+
+<span class="c1">// Extracts the embedded timestamp of a record (giving you 
&quot;event-time&quot; semantics).</span>
+<span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">MyEventTimeExtractor</span> <span class="kd">implements</span> <span 
class="n">TimestampExtractor</span> <span class="o">{</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">long</span> <span 
class="nf">extract</span><span class="o">(</span><span class="kd">final</span> 
<span class="n">ConsumerRecord</span><span class="o">&lt;</span><span 
class="n">Object</span><span class="o">,</span> <span 
class="n">Object</span><span class="o">&gt;</span> <span 
class="n">record</span><span class="o">,</span> <span class="kd">final</span> 
<span class="kt">long</span> <span class="n">previousTimestamp</span><span 
class="o">) [...]
+    <span class="c1">// `Foo` is your own custom class, which we assume has a 
method that returns</span>
+    <span class="c1">// the embedded timestamp (milliseconds since midnight, 
January 1, 1970 UTC).</span>
+    <span class="kt">long</span> <span class="n">timestamp</span> <span 
class="o">=</span> <span class="o">-</span><span class="mi">1</span><span 
class="o">;</span>
+    <span class="kd">final</span> <span class="n">Foo</span> <span 
class="n">myPojo</span> <span class="o">=</span> <span class="o">(</span><span 
class="n">Foo</span><span class="o">)</span> <span class="n">record</span><span 
class="o">.</span><span class="na">value</span><span class="o">();</span>
+    <span class="k">if</span> <span class="o">(</span><span 
class="n">myPojo</span> <span class="o">!=</span> <span 
class="kc">null</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">timestamp</span> <span class="o">=</span> <span 
class="n">myPojo</span><span class="o">.</span><span 
class="na">getTimestampInMillis</span><span class="o">();</span>
+    <span class="o">}</span>
+    <span class="k">if</span> <span class="o">(</span><span 
class="n">timestamp</span> <span class="o">&lt;</span> <span 
class="mi">0</span><span class="o">)</span> <span class="o">{</span>
+      <span class="c1">// Invalid timestamp!  Attempt to estimate a new 
timestamp,</span>
+      <span class="c1">// otherwise fall back to wall-clock time 
(processing-time).</span>
+      <span class="k">if</span> <span class="o">(</span><span 
class="n">previousTimestamp</span> <span class="o">&gt;=</span> <span 
class="mi">0</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span 
class="n">previousTimestamp</span><span class="o">;</span>
+      <span class="o">}</span> <span class="k">else</span> <span 
class="o">{</span>
+        <span class="k">return</span> <span class="n">System</span><span 
class="o">.</span><span class="na">currentTimeMillis</span><span 
class="o">();</span>
+      <span class="o">}</span>
+    <span class="o">}</span>
+  <span class="o">}</span>
+
+<span class="o">}</span>
+</pre></div>
+              </div>
+              <p>You would then define the custom timestamp extractor in your 
Streams configuration as follows:</p>
+              <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="kn">import</span> <span 
class="nn">java.util.Properties</span><span class="o">;</span>
+<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.StreamsConfig</span><span class="o">;</span>
+
+<span class="n">Properties</span> <span class="n">streamsConfiguration</span> 
<span class="o">=</span> <span class="k">new</span> <span 
class="n">Properties</span><span class="o">();</span>
+<span class="n">streamsConfiguration</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG</span><span 
class="o">,</span> <span class="n">MyEventTimeExtractor</span><span 
class="o">.</span><span class="na">class</span><span class="o">);</span>
+</pre></div>
+              </div>
+            </div></blockquote>
+        </div>
         <div class="section" id="default-key-serde">
           <h4><a class="toc-backref" href="#id8">default.key.serde</a><a 
class="headerlink" href="#default-key-serde" title="Permalink to this 
headline"></a></h4>
           <blockquote>
             <div><p>The default Serializer/Deserializer class for record keys. 
Serialization and deserialization in Kafka Streams happens
               whenever data needs to be materialized, for example:</p>
-              <blockquote>
                 <div><ul class="simple">
                   <li>Whenever data is read from or written to a <em>Kafka 
topic</em> (e.g., via the <code class="docutils literal"><span 
class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils 
literal"><span class="pre">KStream#to()</span></code> methods).</li>
                   <li>Whenever data is read from or written to a <em>state 
store</em>.</li>
                 </ul>
                   <p>This is discussed in more detail in <a class="reference 
internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std 
std-ref">Data types and serialization</span></a>.</p>
-                </div></blockquote>
+                </div>
             </div></blockquote>
         </div>
         <div class="section" id="default-value-serde">
@@ -422,6 +569,51 @@
               <p>This is discussed in more detail in <a class="reference 
internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std 
std-ref">Data types and serialization</span></a>.</p>
             </div></blockquote>
         </div>
+        <div class="section" id="default-windowed-key-serde-inner">
+          <h4><a class="toc-backref" 
href="#id32">default.windowed.key.serde.inner</a><a class="headerlink" 
href="#default-windowed-key-serde-inner" title="Permalink to this 
headline"></a></h4>
+          <blockquote>
+            <div><p>The default Serializer/Deserializer class for the inner 
class of windowed keys. Serialization and deserialization in Kafka Streams 
happens
+              whenever data needs to be materialized, for example:</p>
+                <div><ul class="simple">
+                  <li>Whenever data is read from or written to a <em>Kafka 
topic</em> (e.g., via the <code class="docutils literal"><span 
class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils 
literal"><span class="pre">KStream#to()</span></code> methods).</li>
+                  <li>Whenever data is read from or written to a <em>state 
store</em>.</li>
+                </ul>
+                <p>This is discussed in more detail in <a class="reference 
internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std 
std-ref">Data types and serialization</span></a>.</p>
+                </div>
+            </div></blockquote>
+        </div>
+        <div class="section" id="default-windowed-value-serde-inner">
+          <h4><a class="toc-backref" 
href="#id33">default.windowed.value.serde.inner</a><a class="headerlink" 
href="#default-windowed-value-serde-inner" title="Permalink to this 
headline"></a></h4>
+          <blockquote>
+            <div><p>The default Serializer/Deserializer class for the inner 
class of windowed values. Serialization and deserialization in Kafka Streams 
happens
+              happens whenever data needs to be materialized, for example:</p>
+              <ul class="simple">
+                <li>Whenever data is read from or written to a <em>Kafka 
topic</em> (e.g., via the <code class="docutils literal"><span 
class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils 
literal"><span class="pre">KStream#to()</span></code> methods).</li>
+                <li>Whenever data is read from or written to a <em>state 
store</em>.</li>
+              </ul>
+              <p>This is discussed in more detail in <a class="reference 
internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std 
std-ref">Data types and serialization</span></a>.</p>
+            </div></blockquote>
+        </div>
+        <div class="section" id="max-task-idle-ms">
+          <span id="streams-developer-guide-max-task-idle-ms"></span><h4><a 
class="toc-backref" href="#id28">max.task.idle.ms</a><a class="headerlink" 
href="#max-task-idle-ms" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div>
+              The maximum amount of time a task will idle without processing 
data when waiting for all of its input partition buffers to contain records. 
This can help avoid potential out-of-order
+              processing when the task has multiple input streams, as in a 
join, for example. Setting this to a nonzero value may increase latency but 
will improve time synchronization.
+            </div>
+          </blockquote>
+        </div>
+        <div class="section" id="max-warmup-replicas">
+          <span id="streams-developer-guide-max-warmup-replicas"></span><h4><a 
class="toc-backref" href="#id29">max.warmup.replicas</a><a class="headerlink" 
href="#max-warmup-replicas" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div>
+              The maximum number of warmup replicas (extra standbys beyond the 
configured num.standbys) that can be assigned at once for the purpose of keeping
+              the task available on one instance while it is warming up on 
another instance it has been reassigned to. Used to throttle how much extra 
broker
+              traffic and cluster state can be used for high availability. 
Increasing this will allow Streams to warm up more tasks at once, speeding up 
the time
+              for the reassigned warmups to restore sufficient state for them 
to be transitioned to active tasks. Must be at least 1.
+            </div>
+          </blockquote>
+        </div>
         <div class="section" id="num-standby-replicas">
           <span id="streams-developer-guide-standby-replicas"></span><h4><a 
class="toc-backref" href="#id10">num.standby.replicas</a><a class="headerlink" 
href="#num-standby-replicas" title="Permalink to this headline"></a></h4>
           <blockquote>
@@ -430,13 +622,15 @@
               Standby replicas are used to minimize the latency of task 
failover.  A task that was previously running on a failed instance is
               preferred to restart on an instance that has standby replicas so 
that the local state store restoration process from its
               changelog can be minimized.  Details about how Kafka Streams 
makes use of the standby replicas to minimize the cost of
-              resuming tasks on failover can be found in the <a 
class="reference internal" 
href="../architecture.html#streams_architecture_state"><span class="std 
std-ref">State</span></a> section.</div></blockquote>
+              resuming tasks on failover can be found in the <a 
class="reference internal" 
href="../architecture.html#streams_architecture_state"><span class="std 
std-ref">State</span></a> section.
             </div>
-            <div class="admonition note">
-              <p class="first admonition-title">Note</p>
-              <p class="last">If you enable <cite>n</cite> standby tasks, you 
need to provision <cite>n+1</cite> <code class="docutils literal"><span 
class="pre">KafkaStreams</span></code>
-              instances.</p>
-              </div>
+          </blockquote>
+        </div>
+        <div class="admonition note">
+          <p class="first admonition-title">Note</p>
+          <p class="last">If you enable <cite>n</cite> standby tasks, you need 
to provision <cite>n+1</cite> <code class="docutils literal"><span 
class="pre">KafkaStreams</span></code>
+            instances.</p>
+        </div>
         <div class="section" id="num-stream-threads">
           <h4><a class="toc-backref" href="#id11">num.stream.threads</a><a 
class="headerlink" href="#num-stream-threads" title="Permalink to this 
headline"></a></h4>
           <blockquote>
@@ -446,31 +640,43 @@
         <div class="section" id="partition-grouper">
           <span id="streams-developer-guide-partition-grouper"></span><h4><a 
class="toc-backref" href="#id12">partition.grouper</a><a class="headerlink" 
href="#partition-grouper" title="Permalink to this headline"></a></h4>
           <blockquote>
-            <div>A partition grouper creates a list of stream tasks from the 
partitions of source topics, where each created task is assigned with a group 
of source topic partitions.
+            <div>
+              <b>[DEPRECATED]</b> A partition grouper creates a list of stream 
tasks from the partitions of source topics, where each created task is assigned 
with a group of source topic partitions.
               The default implementation provided by Kafka Streams is <a 
class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/processor/DefaultPartitionGrouper.html">DefaultPartitionGrouper</a>.
               It assigns each task with one partition for each of the source 
topic partitions. The generated number of tasks equals the largest
-              number of partitions among the input topics. Usually an 
application does not need to customize the partition grouper.</div></blockquote>
+              number of partitions among the input topics. Usually an 
application does not need to customize the partition grouper.
+            </div>
+          </blockquote>
+        </div>
+        <div class="section" id="probing-rebalance-interval-ms">
+          <h4><a class="toc-backref" 
href="#id30">probing.rebalance.interval.ms</a><a class="headerlink" 
href="#probing-rebalance-interval-ms" title="Permalink to this 
headline"></a></h4>
+          <blockquote>
+            <div>
+              The maximum time to wait before triggering a rebalance to probe 
for warmup replicas that have restored enough to be considered caught up. 
Streams will only assign stateful active tasks to
+              instances that are caught up and within the <a class="reference 
internal" href="#acceptable-recovery-lag"><span class="std 
std-ref">acceptable.recovery.lag</span></a>, if any exist. Probing rebalances 
are used to query the latest total lag of warmup replicas and transition
+              them to active tasks if ready. They will continue to be 
triggered as long as there are warmup tasks, and until the assignment is 
balanced. Must be at least 1 minute.
+            </div></blockquote>
         </div>
         <div class="section" id="processing-guarantee">
           <span 
id="streams-developer-guide-processing-guarantee"></span><h4><a 
class="toc-backref" href="#id25">processing.guarantee</a><a class="headerlink" 
href="#processing-guarantee" title="Permalink to this headline"></a></h4>
           <blockquote>
             <div>The processing guarantee that should be used.
-                 Possible values are <code class="docutils literal"><span 
class="pre">"at_least_once"</span></code> (default),
-                 <code class="docutils literal"><span 
class="pre">"exactly_once"</span></code>,
-                 and <code class="docutils literal"><span 
class="pre">"exactly_once_beta"</span></code>.
-                 Using <code class="docutils literal"><span 
class="pre">"exactly_once"</span></code> requires broker
-                 version 0.11.0 or newer, while using <code class="docutils 
literal"><span class="pre">"exactly_once_beta"</span></code>
-                 requires broker version 2.5 or newer.
-                 Note that if exactly-once processing is enabled, the default 
for parameter
-                 <code class="docutils literal"><span 
class="pre">commit.interval.ms</span></code> changes to 100ms.
-                 Additionally, consumers are configured with <code 
class="docutils literal"><span 
class="pre">isolation.level="read_committed"</span></code>
-                 and producers are configured with <code class="docutils 
literal"><span class="pre">enable.idempotence=true</span></code> per default.
-                 Note that by default exactly-once processing requires a 
cluster of at least three brokers what is the recommended setting for 
production.
-                 For development, you can change this configuration by 
adjusting broker setting
-                 <code class="docutils literal"><span 
class="pre">transaction.state.log.replication.factor</span></code>
-                 and <code class="docutils literal"><span 
class="pre">transaction.state.log.min.isr</span></code>
-                 to the number of brokers you want to use.
-                 For more details see <a 
href="../core-concepts#streams_processing_guarantee">Processing Guarantees</a>.
+              Possible values are <code class="docutils literal"><span 
class="pre">"at_least_once"</span></code> (default),
+              <code class="docutils literal"><span 
class="pre">"exactly_once"</span></code>,
+              and <code class="docutils literal"><span 
class="pre">"exactly_once_beta"</span></code>.
+              Using <code class="docutils literal"><span 
class="pre">"exactly_once"</span></code> requires broker
+              version 0.11.0 or newer, while using <code class="docutils 
literal"><span class="pre">"exactly_once_beta"</span></code>
+              requires broker version 2.5 or newer.
+              Note that if exactly-once processing is enabled, the default for 
parameter
+              <code class="docutils literal"><span 
class="pre">commit.interval.ms</span></code> changes to 100ms.
+              Additionally, consumers are configured with <code 
class="docutils literal"><span 
class="pre">isolation.level="read_committed"</span></code>
+              and producers are configured with <code class="docutils 
literal"><span class="pre">enable.idempotence=true</span></code> per default.
+              Note that by default exactly-once processing requires a cluster 
of at least three brokers what is the recommended setting for production.
+              For development, you can change this configuration by adjusting 
broker setting
+              <code class="docutils literal"><span 
class="pre">transaction.state.log.replication.factor</span></code>
+              and <code class="docutils literal"><span 
class="pre">transaction.state.log.min.isr</span></code>
+              to the number of brokers you want to use.
+              For more details see <a 
href="../core-concepts#streams_processing_guarantee">Processing Guarantees</a>.
             </div></blockquote>
         </div>
         <div class="section" id="replication-factor">
@@ -520,153 +726,79 @@
                     <span class="n">Properties</span> <span 
class="n">streamsSettings</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
                     <span class="n">streamsConfig</span><span 
class="o">.</span><span class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">ROCKSDB_CONFIG_SETTER_CLASS_CONFIG</span><span class="o">,</span> 
<span class="n">CustomRocksDBConfig</span><span class="o">.</span><span 
class="na">class</span><span class="o">);</span>
                     </pre></div>
-                    </div>
-                    <dl class="docutils">
-                      <dt>Notes for example:</dt>
-                      <dd><ol class="first last arabic simple">
-                        <li><code class="docutils literal"><span 
class="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) 
options.tableFormatConfig();</span></code> Get a reference to the existing 
table config rather than create a new one, so you don't accidentally overwrite 
defaults such as the <code class="docutils literal"><span 
class="pre">BloomFilter</span></code>, which is an important optimization.
-                        <li><code class="docutils literal"><span 
class="pre">tableConfig.setBlockSize(16</span> <span class="pre">*</span> <span 
class="pre">1024L);</span></code> Modify the default <a class="reference 
external" 
href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79";>block
 size</a> per these instructions from the <a class="reference external" 
href="https://github.com/facebook/rocksdb/wiki/Memory- [...]
-                        <li><code class="docutils literal"><span 
class="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do 
not let the index and filter blocks grow unbounded. For more information, see 
the <a class="reference external" 
href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks";>RocksDB
 GitHub</a>.</li>
-                        <li><code class="docutils literal"><span 
class="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced 
options in the <a class="reference external" 
href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103";>RocksDB
 GitHub</a>.</li>
-                        <li><code class="docutils literal"><span 
class="pre">cache.close();</span></code> To avoid memory leaks, you must close 
any objects you constructed that extend org.rocksdb.RocksObject. See  <a 
class="reference external" 
href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management";>RocksJava
 docs</a> for more details.</li>
-                      </ol>
-                      </dd>
-                    </dl>
-                  </div></blockquote>
-              </div>
-            </div>
-          </blockquote>
-        </div>
-        <div class="section" id="state-dir">
-          <h4><a class="toc-backref" href="#id14">state.dir</a><a 
class="headerlink" href="#state-dir" title="Permalink to this 
headline"></a></h4>
-          <blockquote>
-            <div>The state directory. Kafka Streams persists local states 
under the state directory. Each application has a subdirectory on its hosting
-              machine that is located under the state directory. The name of 
the subdirectory is the application ID. The state stores associated
-              with the application are created under this subdirectory. When 
running multiple instances of the same application on a single machine,
-              this path must be unique for each such instance.</div>
-          </blockquote>
-        </div>
-        <div class="section" id="timestamp-extractor">
-          <span id="streams-developer-guide-timestamp-extractor"></span><h4><a 
class="toc-backref" href="#id15">timestamp.extractor</a><a class="headerlink" 
href="#timestamp-extractor" title="Permalink to this headline"></a></h4>
-          <blockquote>
-            <div><p>A timestamp extractor pulls a timestamp from an instance 
of <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html">ConsumerRecord</a>.
-              Timestamps are used to control the progress of streams.</p>
-              <p>The default extractor is
-                <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.html">FailOnInvalidTimestamp</a>.
-                This extractor retrieves built-in timestamps that are 
automatically embedded into Kafka messages by the Kafka producer
-                client since
-                <a class="reference external" 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message";>Kafka
 version 0.10</a>.
-                Depending on the setting of Kafka&#8217;s server-side <code 
class="docutils literal"><span 
class="pre">log.message.timestamp.type</span></code> broker and <code 
class="docutils literal"><span class="pre">message.timestamp.type</span></code> 
topic parameters,
-                this extractor provides you with:</p>
-              <ul class="simple">
-                <li><strong>event-time</strong> processing semantics if <code 
class="docutils literal"><span 
class="pre">log.message.timestamp.type</span></code> is set to <code 
class="docutils literal"><span class="pre">CreateTime</span></code> aka 
&#8220;producer time&#8221;
-                  (which is the default).  This represents the time when a 
Kafka producer sent the original message.  If you use Kafka&#8217;s
-                  official producer client, the timestamp represents 
milliseconds since the epoch.</li>
-                <li><strong>ingestion-time</strong> processing semantics if 
<code class="docutils literal"><span 
class="pre">log.message.timestamp.type</span></code> is set to <code 
class="docutils literal"><span class="pre">LogAppendTime</span></code> aka 
&#8220;broker
-                  time&#8221;.  This represents the time when the Kafka broker 
received the original message, in milliseconds since the epoch.</li>
-              </ul>
-              <p>The <code class="docutils literal"><span 
class="pre">FailOnInvalidTimestamp</span></code> extractor throws an exception 
if a record contains an invalid (i.e. negative) built-in
-                timestamp, because Kafka Streams would not process this record 
but silently drop it.  Invalid built-in timestamps can
-                occur for various reasons:  if for example, you consume a 
topic that is written to by pre-0.10 Kafka producer clients
-                or by third-party producer clients that don&#8217;t support 
the new Kafka 0.10 message format yet;  another situation where
-                this may happen is after upgrading your Kafka cluster from 
<code class="docutils literal"><span class="pre">0.9</span></code> to <code 
class="docutils literal"><span class="pre">0.10</span></code>, where all the 
data that was generated
-                with <code class="docutils literal"><span 
class="pre">0.9</span></code> does not include the <code class="docutils 
literal"><span class="pre">0.10</span></code> message timestamps.</p>
-              <p>If you have data with invalid timestamps and want to process 
it, then there are two alternative extractors available.
-                Both work on built-in timestamps, but handle invalid 
timestamps differently.</p>
-              <ul class="simple">
-                <li><a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.html">LogAndSkipOnInvalidTimestamp</a>:
-                  This extractor logs a warn message and returns the invalid 
timestamp to Kafka Streams, which will not process but
-                  silently drop the record.
-                  This log-and-skip strategy allows Kafka Streams to make 
progress instead of failing if there are records with an
-                  invalid built-in timestamp in your input data.</li>
-                <li><a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/processor/UsePartitionTimeOnInvalidTimestamp.html">UsePartitionTimeOnInvalidTimestamp</a>.
-                  This extractor returns the record&#8217;s built-in timestamp 
if it is valid (i.e. not negative).  If the record does not
-                  have a valid built-in timestamps, the extractor returns the 
previously extracted valid timestamp from a record of the
-                  same topic partition as the current record as a timestamp 
estimation.  In case that no timestamp can be estimated, it
-                  throws an exception.</li>
-              </ul>
-              <p>Another built-in extractor is
-                <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/processor/WallclockTimestampExtractor.html">WallclockTimestampExtractor</a>.
-                This extractor does not actually &#8220;extract&#8221; a 
timestamp from the consumed record but rather returns the current time in
-                milliseconds from the system clock (think: <code 
class="docutils literal"><span 
class="pre">System.currentTimeMillis()</span></code>), which effectively means 
Streams will operate
-                on the basis of the so-called <strong>processing-time</strong> 
of events.</p>
-              <p>You can also provide your own timestamp extractors, for 
instance to retrieve timestamps embedded in the payload of
-                messages.  If you cannot extract a valid timestamp, you can 
either throw an exception, return a negative timestamp, or
-                estimate a timestamp.  Returning a negative timestamp will 
result in data loss &#8211; the corresponding record will not be
-                processed but silently dropped.  If you want to estimate a new 
timestamp, you can use the value provided via
-                <code class="docutils literal"><span 
class="pre">previousTimestamp</span></code> (i.e., a Kafka Streams timestamp 
estimation).  Here is an example of a custom
-                <code class="docutils literal"><span 
class="pre">TimestampExtractor</span></code> implementation:</p>
-              <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="kn">import</span> <span 
class="nn">org.apache.kafka.clients.consumer.ConsumerRecord</span><span 
class="o">;</span>
-<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.processor.TimestampExtractor</span><span 
class="o">;</span>
-
-<span class="c1">// Extracts the embedded timestamp of a record (giving you 
&quot;event-time&quot; semantics).</span>
-<span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">MyEventTimeExtractor</span> <span class="kd">implements</span> <span 
class="n">TimestampExtractor</span> <span class="o">{</span>
-
-  <span class="nd">@Override</span>
-  <span class="kd">public</span> <span class="kt">long</span> <span 
class="nf">extract</span><span class="o">(</span><span class="kd">final</span> 
<span class="n">ConsumerRecord</span><span class="o">&lt;</span><span 
class="n">Object</span><span class="o">,</span> <span 
class="n">Object</span><span class="o">&gt;</span> <span 
class="n">record</span><span class="o">,</span> <span class="kd">final</span> 
<span class="kt">long</span> <span class="n">previousTimestamp</span><span 
class="o">) [...]
-    <span class="c1">// `Foo` is your own custom class, which we assume has a 
method that returns</span>
-    <span class="c1">// the embedded timestamp (milliseconds since midnight, 
January 1, 1970 UTC).</span>
-    <span class="kt">long</span> <span class="n">timestamp</span> <span 
class="o">=</span> <span class="o">-</span><span class="mi">1</span><span 
class="o">;</span>
-    <span class="kd">final</span> <span class="n">Foo</span> <span 
class="n">myPojo</span> <span class="o">=</span> <span class="o">(</span><span 
class="n">Foo</span><span class="o">)</span> <span class="n">record</span><span 
class="o">.</span><span class="na">value</span><span class="o">();</span>
-    <span class="k">if</span> <span class="o">(</span><span 
class="n">myPojo</span> <span class="o">!=</span> <span 
class="kc">null</span><span class="o">)</span> <span class="o">{</span>
-      <span class="n">timestamp</span> <span class="o">=</span> <span 
class="n">myPojo</span><span class="o">.</span><span 
class="na">getTimestampInMillis</span><span class="o">();</span>
-    <span class="o">}</span>
-    <span class="k">if</span> <span class="o">(</span><span 
class="n">timestamp</span> <span class="o">&lt;</span> <span 
class="mi">0</span><span class="o">)</span> <span class="o">{</span>
-      <span class="c1">// Invalid timestamp!  Attempt to estimate a new 
timestamp,</span>
-      <span class="c1">// otherwise fall back to wall-clock time 
(processing-time).</span>
-      <span class="k">if</span> <span class="o">(</span><span 
class="n">previousTimestamp</span> <span class="o">&gt;=</span> <span 
class="mi">0</span><span class="o">)</span> <span class="o">{</span>
-        <span class="k">return</span> <span 
class="n">previousTimestamp</span><span class="o">;</span>
-      <span class="o">}</span> <span class="k">else</span> <span 
class="o">{</span>
-        <span class="k">return</span> <span class="n">System</span><span 
class="o">.</span><span class="na">currentTimeMillis</span><span 
class="o">();</span>
-      <span class="o">}</span>
-    <span class="o">}</span>
-  <span class="o">}</span>
-
-<span class="o">}</span>
-</pre></div>
-              </div>
-              <p>You would then define the custom timestamp extractor in your 
Streams configuration as follows:</p>
-              <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="kn">import</span> <span 
class="nn">java.util.Properties</span><span class="o">;</span>
-<span class="kn">import</span> <span 
class="nn">org.apache.kafka.streams.StreamsConfig</span><span class="o">;</span>
-
-<span class="n">Properties</span> <span class="n">streamsConfiguration</span> 
<span class="o">=</span> <span class="k">new</span> <span 
class="n">Properties</span><span class="o">();</span>
-<span class="n">streamsConfiguration</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG</span><span 
class="o">,</span> <span class="n">MyEventTimeExtractor</span><span 
class="o">.</span><span class="na">class</span><span class="o">);</span>
-</pre></div>
               </div>
+              <dl class="docutils">
+                <dt>Notes for example:</dt>
+                <dd><ol class="first last arabic simple">
+                  <li><code class="docutils literal"><span 
class="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) 
options.tableFormatConfig();</span></code> Get a reference to the existing 
table config rather than create a new one, so you don't accidentally overwrite 
defaults such as the <code class="docutils literal"><span 
class="pre">BloomFilter</span></code>, which is an important optimization.
+                  <li><code class="docutils literal"><span 
class="pre">tableConfig.setBlockSize(16</span> <span class="pre">*</span> <span 
class="pre">1024L);</span></code> Modify the default <a class="reference 
external" 
href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79";>block
 size</a> per these instructions from the <a class="reference external" 
href="https://github.com/facebook/rocksdb/wiki/Memory-usage- [...]
+                  <li><code class="docutils literal"><span 
class="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do 
not let the index and filter blocks grow unbounded. For more information, see 
the <a class="reference external" 
href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks";>RocksDB
 GitHub</a>.</li>
+                  <li><code class="docutils literal"><span 
class="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced 
options in the <a class="reference external" 
href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103";>RocksDB
 GitHub</a>.</li>
+                  <li><code class="docutils literal"><span 
class="pre">cache.close();</span></code> To avoid memory leaks, you must close 
any objects you constructed that extend org.rocksdb.RocksObject. See  <a 
class="reference external" 
href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management";>RocksJava
 docs</a> for more details.</li>
+                </ol>
+                </dd>
+              </dl>
             </div></blockquote>
         </div>
-        <div class="section" id="upgrade-from">
-          <h4><a class="toc-backref" href="#id14">upgrade.from</a><a 
class="headerlink" href="#upgrade-from" title="Permalink to this 
headline"></a></h4>
-          <blockquote>
-            <div>
-              The version you are upgrading from. It is important to set this 
config when performing a rolling upgrade to certain versions, as described in 
the upgrade guide.
-              You should set this config to the appropriate version before 
bouncing your instances and upgrading them to the newer version. Once everyone 
is on the
-              newer version, you should remove this config and do a second 
rolling bounce. It is only necessary to set this config and follow the 
two-bounce upgrade path
-              when upgrading from below version 2.0, or when upgrading to 2.4+ 
from any version lower than 2.4.
-            </div>
-          </blockquote>
-        </div>
       </div>
-      <div class="section" 
id="kafka-consumers-and-producer-configuration-parameters">
-        <h3><a class="toc-backref" href="#id16">Kafka consumers, producer and 
admin client configuration parameters</a><a class="headerlink" 
href="#kafka-consumers-and-producer-configuration-parameters" title="Permalink 
to this headline"></a></h3>
-        <p>You can specify parameters for the Kafka <a class="reference 
external" 
href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/package-summary.html">consumers</a>,
 <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/clients/producer/package-summary.html">producers</a>,
-            and <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/kafka/clients/admin/package-summary.html">admin
 client</a> that are used internally.
-            The consumer, producer and admin client settings are defined by 
specifying parameters in a <code class="docutils literal"><span 
class="pre">StreamsConfig</span></code> instance.</p>
-        <p>In this example, the Kafka <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG">consumer
 session timeout</a> is configured to be 60000 milliseconds in the Streams 
settings:</p>
-        <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Properties</span> <span 
class="n">streamsSettings</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+      </blockquote>
+    </div>
+    <div class="section" id="state-dir">
+      <h4><a class="toc-backref" href="#id14">state.dir</a><a 
class="headerlink" href="#state-dir" title="Permalink to this 
headline"></a></h4>
+      <blockquote>
+        <div>The state directory. Kafka Streams persists local states under 
the state directory. Each application has a subdirectory on its hosting
+          machine that is located under the state directory. The name of the 
subdirectory is the application ID. The state stores associated
+          with the application are created under this subdirectory. When 
running multiple instances of the same application on a single machine,
+          this path must be unique for each such instance.</div>
+      </blockquote>
+    </div>
+    <div class="section" id="topology-optimization">
+      <h4><a class="toc-backref" href="#id31">topology.optimization</a><a 
class="headerlink" href="#topology-optimization" title="Permalink to this 
headline"></a></h4>
+      <blockquote>
+        <div>
+          <p>
+            You can tell Streams to apply topology optimizations by setting 
this config. The optimizations are currently all or none and disabled by 
default.
+            These optimizations include moving/reducing repartition topics and 
reusing the source topic as the changelog for source KTables. It is recommended 
to enable this.
+          </p>
+          <p>
+            Note that as of 2.3, you need to do two things to enable 
optimizations. In addition to setting this config to 
<code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
+            configuration properties when building your topology by using the 
overloaded <code>StreamsBuilder.build(Properties)</code> method.
+            For example <code>KafkaStreams myStream = new 
KafkaStreams(streamsBuilder.build(properties), properties)</code>.
+          </p>
+        </div></blockquote>
+    </div>
+    <div class="section" id="upgrade-from">
+      <h4><a class="toc-backref" href="#id14">upgrade.from</a><a 
class="headerlink" href="#upgrade-from" title="Permalink to this 
headline"></a></h4>
+      <blockquote>
+        <div>
+          The version you are upgrading from. It is important to set this 
config when performing a rolling upgrade to certain versions, as described in 
the upgrade guide.
+          You should set this config to the appropriate version before 
bouncing your instances and upgrading them to the newer version. Once everyone 
is on the
+          newer version, you should remove this config and do a second rolling 
bounce. It is only necessary to set this config and follow the two-bounce 
upgrade path
+          when upgrading from below version 2.0, or when upgrading to 2.4+ 
from any version lower than 2.4.
+        </div>
+      </blockquote>
+    </div>
+  </div>
+  <div class="section" 
id="kafka-consumers-and-producer-configuration-parameters">
+    <h3><a class="toc-backref" href="#id16">Kafka consumers, producer and 
admin client configuration parameters</a><a class="headerlink" 
href="#kafka-consumers-and-producer-configuration-parameters" title="Permalink 
to this headline"></a></h3>
+    <p>You can specify parameters for the Kafka <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/package-summary.html">consumers</a>,
 <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/clients/producer/package-summary.html">producers</a>,
+      and <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/kafka/clients/admin/package-summary.html">admin
 client</a> that are used internally.
+      The consumer, producer and admin client settings are defined by 
specifying parameters in a <code class="docutils literal"><span 
class="pre">StreamsConfig</span></code> instance.</p>
+    <p>In this example, the Kafka <a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG">consumer
 session timeout</a> is configured to be 60000 milliseconds in the Streams 
settings:</p>
+    <div class="highlight-java"><div class="highlight"><pre><span></span><span 
class="n">Properties</span> <span class="n">streamsSettings</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">Properties</span><span class="o">();</span>
 <span class="c1">// Example of a &quot;normal&quot; setting for Kafka 
Streams</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">BOOTSTRAP_SERVERS_CONFIG</span><span class="o">,</span> <span 
class="s">&quot;kafka-broker-01:9092&quot;</span><span class="o">);</span>
 <span class="c1">// Customize the Kafka consumer settings of your Streams 
application</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">ConsumerConfig</span><span class="o">.</span><span 
class="na">SESSION_TIMEOUT_MS_CONFIG</span><span class="o">,</span> <span 
class="mi">60000</span><span class="o">);</span>
 </pre></div>
-        </div>
-        <div class="section" id="naming">
-          <h4><a class="toc-backref" href="#id17">Naming</a><a 
class="headerlink" href="#naming" title="Permalink to this headline"></a></h4>
-          <p>Some consumer, producer and admin client configuration parameters 
use the same parameter name, and Kafka Streams library itself also uses some 
parameters that share the same name with its embedded client. For example, 
<code class="docutils literal"><span 
class="pre">send.buffer.bytes</span></code> and
-              <code class="docutils literal"><span 
class="pre">receive.buffer.bytes</span></code> are used to configure TCP 
buffers; <code class="docutils literal"><span 
class="pre">request.timeout.ms</span></code> and <code class="docutils 
literal"><span class="pre">retry.backoff.ms</span></code> control retries for 
client request;
-              <code class="docutils literal"><span 
class="pre">retries</span></code> are used to configure how many retries are 
allowed when handling retriable errors from broker request responses.
-              You can avoid duplicate names by prefix parameter names with 
<code class="docutils literal"><span class="pre">consumer.</span></code>, <code 
class="docutils literal"><span class="pre">producer.</span></code>, or <code 
class="docutils literal"><span class="pre">admin.</span></code> (e.g., <code 
class="docutils literal"><span 
class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils 
literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
-          <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Properties</span> <span 
class="n">streamsSettings</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+    </div>
+    <div class="section" id="naming">
+      <h4><a class="toc-backref" href="#id17">Naming</a><a class="headerlink" 
href="#naming" title="Permalink to this headline"></a></h4>
+      <p>Some consumer, producer and admin client configuration parameters use 
the same parameter name, and Kafka Streams library itself also uses some 
parameters that share the same name with its embedded client. For example, 
<code class="docutils literal"><span 
class="pre">send.buffer.bytes</span></code> and
+        <code class="docutils literal"><span 
class="pre">receive.buffer.bytes</span></code> are used to configure TCP 
buffers; <code class="docutils literal"><span 
class="pre">request.timeout.ms</span></code> and <code class="docutils 
literal"><span class="pre">retry.backoff.ms</span></code> control retries for 
client request;
+        <code class="docutils literal"><span class="pre">retries</span></code> 
are used to configure how many retries are allowed when handling retriable 
errors from broker request responses.
+        You can avoid duplicate names by prefix parameter names with <code 
class="docutils literal"><span class="pre">consumer.</span></code>, <code 
class="docutils literal"><span class="pre">producer.</span></code>, or <code 
class="docutils literal"><span class="pre">admin.</span></code> (e.g., <code 
class="docutils literal"><span 
class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils 
literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
+      <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Properties</span> <span 
class="n">streamsSettings</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
 <span class="c1">// same value for consumer, producer, and admin client</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">,</span> <span 
class="s">&quot;value&quot;</span><span class="o">);</span>
 <span class="c1">// different values for consumer and producer</span>
@@ -678,14 +810,14 @@
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">producerPrefix</span><span class="o">(</span><span 
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span 
class="s">&quot;producer-value&quot;</span><span class="o">);</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">adminClientPrefix</span><span class="o">(</span><span 
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span 
class="s">&quot;admin-value&quot;</span><span class="o">);</span>
 </pre></div>
-          <p>You could further separate consumer configuration by adding 
different prefixes:</p>
-          <ul class="simple">
-            <li><code class="docutils literal"><span 
class="pre">main.consumer.</span></code> for main consumer which is the default 
consumer of stream source.</li>
-            <li><code class="docutils literal"><span 
class="pre">restore.consumer.</span></code> for restore consumer which is in 
charge of state store recovery.</li>
-            <li><code class="docutils literal"><span 
class="pre">global.consumer.</span></code> for global consumer which is used in 
global KTable construction.</li>
-          </ul>
-          <p>For example, if you only want to set restore consumer config 
without touching other consumers' settings, you could simply use <code 
class="docutils literal"><span class="pre">restore.consumer.</span></code> to 
set the config.</p>
-          <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Properties</span> <span 
class="n">streamsSettings</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+        <p>You could further separate consumer configuration by adding 
different prefixes:</p>
+        <ul class="simple">
+          <li><code class="docutils literal"><span 
class="pre">main.consumer.</span></code> for main consumer which is the default 
consumer of stream source.</li>
+          <li><code class="docutils literal"><span 
class="pre">restore.consumer.</span></code> for restore consumer which is in 
charge of state store recovery.</li>
+          <li><code class="docutils literal"><span 
class="pre">global.consumer.</span></code> for global consumer which is used in 
global KTable construction.</li>
+        </ul>
+        <p>For example, if you only want to set restore consumer config 
without touching other consumers' settings, you could simply use <code 
class="docutils literal"><span class="pre">restore.consumer.</span></code> to 
set the config.</p>
+        <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Properties</span> <span 
class="n">streamsSettings</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
 <span class="c1">// same config value for all consumer types</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;consumer.PARAMETER_NAME&quot;</span><span class="o">,</span> 
<span class="s">&quot;general-consumer-value&quot;</span><span 
class="o">);</span>
 <span class="c1">// set a different restore consumer config. This would make 
restore consumer take restore-consumer-value,</span>
@@ -694,103 +826,103 @@
 <span class="c1">// alternatively, you can use</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">restoreConsumerPrefix</span><span class="o">(</span><span 
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span 
class="s">&quot;restore-consumer-value&quot;</span><span class="o">);</span>
 </pre></div>
-          </div>
-          <p> Same applied to <code class="docutils literal"><span 
class="pre">main.consumer.</span></code> and <code class="docutils 
literal"><span class="pre">main.consumer.</span></code>, if you only want to 
specify one consumer type config.</p>
-          <p> Additionally, to configure the internal repartition/changelog 
topics, you could use the <code class="docutils literal"><span 
class="pre">topic.</span></code> prefix, followed by any of the standard topic 
configs.</p>
-            <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Properties</span> <span 
class="n">streamsSettings</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+        </div>
+        <p> Same applied to <code class="docutils literal"><span 
class="pre">main.consumer.</span></code> and <code class="docutils 
literal"><span class="pre">main.consumer.</span></code>, if you only want to 
specify one consumer type config.</p>
+        <p> Additionally, to configure the internal repartition/changelog 
topics, you could use the <code class="docutils literal"><span 
class="pre">topic.</span></code> prefix, followed by any of the standard topic 
configs.</p>
+        <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Properties</span> <span 
class="n">streamsSettings</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
 <span class="c1">// Override default for both changelog and repartition 
topics</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;topic.PARAMETER_NAME&quot;</span><span class="o">,</span> <span 
class="s">&quot;topic-value&quot;</span><span class="o">);</span>
 <span class="c1">// alternatively, you can use</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">topicPrefix</span><span class="o">(</span><span 
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span 
class="s">&quot;topic-value&quot;</span><span class="o">);</span>
 </pre></div>
-            </div>
-          </div>
-        </div>
-        <div class="section" id="default-values">
-          <h4><a class="toc-backref" href="#id18">Default Values</a><a 
class="headerlink" href="#default-values" title="Permalink to this 
headline"></a></h4>
-          <p>Kafka Streams uses different default values for some of the 
underlying client configs, which are summarized below. For detailed descriptions
-            of these configs, see <a class="reference external" 
href="http://kafka.apache.org/0100/documentation.html#producerconfigs";>Producer 
Configs</a>
-            and <a class="reference external" 
href="http://kafka.apache.org/0100/documentation.html#newconsumerconfigs";>Consumer
 Configs</a>.</p>
-          <table border="1" class="non-scrolling-table docutils">
-            <thead valign="bottom">
-            <tr class="row-odd"><th class="head">Parameter Name</th>
-              <th class="head">Corresponding Client</th>
-              <th class="head">Streams Default</th>
-            </tr>
-            </thead>
-            <tbody valign="top">
-            <tr class="row-even"><td>auto.offset.reset</td>
-              <td>Consumer</td>
-              <td>earliest</td>
-            </tr>
-            <tr class="row-even"><td>linger.ms</td>
-              <td>Producer</td>
-              <td>100</td>
-            </tr>
-            <tr class="row-odd"><td>max.poll.interval.ms</td>
-              <td>Consumer</td>
-              <td>Integer.MAX_VALUE</td>
-            </tr>
-            <tr class="row-even"><td>max.poll.records</td>
-              <td>Consumer</td>
-              <td>1000</td>
-            </tr>
-            </tbody>
-          </table>
-        </div>
-        <div class="section" id="parameters-controlled-by-kafka-streams">
-          <h3><a class="toc-backref" href="#id26">Parameters controlled by 
Kafka Streams</a><a class="headerlink" 
href="#parameters-controlled-by-kafka-streams" title="Permalink to this 
headline"></a></h3>
-          <p>Kafka Streams assigns the following configuration parameters. If 
you try to change
-            <code class="docutils literal"><span 
class="pre">allow.auto.create.topics</span></code>, your value
-            is ignored and setting it has no effect in a Kafka Streams 
application. You can set the other parameters.
-            Kafka Streams sets them to different default values than a plain
-            <code class="docutils literal"><span 
class="pre">KafkaConsumer</span></code>.
-          <p>Kafka Streams uses the <code class="docutils literal"><span 
class="pre">client.id</span></code>
-            parameter to compute derived client IDs for internal clients. If 
you don't set
-            <code class="docutils literal"><span 
class="pre">client.id</span></code>, Kafka Streams sets it to
-            <code class="docutils literal"><span 
class="pre">&lt;application.id&gt;-&lt;random-UUID&gt;</span></code>.
-            <table border="1" class="non-scrolling-table docutils">
-              <colgroup>
-              <col width="50%">
-              <col width="19%">
-              <col width="31%">
-              </colgroup>
-              <thead valign="bottom">
-              <tr class="row-odd"><th class="head">Parameter Name</th>
-              <th class="head">Corresponding Client</th>
-              <th class="head">Streams Default</th>
-              </tr>
-              </thead>
-              <tbody valign="top">
-              <tr class="row-odd"><td>allow.auto.create.topics</td>
-              <td>Consumer</td>
-              <td>false</td>
-              </tr>
-              <tr class="row-even"><td>auto.offset.reset</td>
-              <td>Consumer</td>
-              <td>earliest</td>
-              </tr>
-              <tr class="row-odd"><td>linger.ms</td>
-              <td>Producer</td>
-              <td>100</td>
-              </tr>
-              <tr class="row-even"><td>max.poll.interval.ms</td>
-              <td>Consumer</td>
-              <td>300000</td>
-              </tr>
-              <tr class="row-odd"><td>max.poll.records</td>
-              <td>Consumer</td>
-              <td>1000</td>
-              </tr>
-              </tbody>
-              </table>
-        <div class="section" id="enable-auto-commit">
-          <span 
id="streams-developer-guide-consumer-auto-commit"></span><h4><a 
class="toc-backref" href="#id19">enable.auto.commit</a><a class="headerlink" 
href="#enable-auto-commit" title="Permalink to this headline"></a></h4>
-          <blockquote>
-            <div>The consumer auto commit. To guarantee at-least-once 
processing semantics and turn off auto commits, Kafka Streams overrides this 
consumer config
-              value to <code class="docutils literal"><span 
class="pre">false</span></code>.  Consumers will only commit explicitly via 
<em>commitSync</em> calls when the Kafka Streams library or a user decides
-              to commit the current processing state.</div></blockquote>
         </div>
+      </div>
+    </div>
+    <div class="section" id="default-values">
+      <h4><a class="toc-backref" href="#id18">Default Values</a><a 
class="headerlink" href="#default-values" title="Permalink to this 
headline"></a></h4>
+      <p>Kafka Streams uses different default values for some of the 
underlying client configs, which are summarized below. For detailed descriptions
+        of these configs, see <a class="reference external" 
href="http://kafka.apache.org/0100/documentation.html#producerconfigs";>Producer 
Configs</a>
+        and <a class="reference external" 
href="http://kafka.apache.org/0100/documentation.html#newconsumerconfigs";>Consumer
 Configs</a>.</p>
+      <table border="1" class="non-scrolling-table docutils">
+        <thead valign="bottom">
+        <tr class="row-odd"><th class="head">Parameter Name</th>
+          <th class="head">Corresponding Client</th>
+          <th class="head">Streams Default</th>
+        </tr>
+        </thead>
+        <tbody valign="top">
+        <tr class="row-even"><td>auto.offset.reset</td>
+          <td>Consumer</td>
+          <td>earliest</td>
+        </tr>
+        <tr class="row-even"><td>linger.ms</td>
+          <td>Producer</td>
+          <td>100</td>
+        </tr>
+        <tr class="row-odd"><td>max.poll.interval.ms</td>
+          <td>Consumer</td>
+          <td>Integer.MAX_VALUE</td>
+        </tr>
+        <tr class="row-even"><td>max.poll.records</td>
+          <td>Consumer</td>
+          <td>1000</td>
+        </tr>
+        </tbody>
+      </table>
+    </div>
+    <div class="section" id="parameters-controlled-by-kafka-streams">
+      <h3><a class="toc-backref" href="#id26">Parameters controlled by Kafka 
Streams</a><a class="headerlink" href="#parameters-controlled-by-kafka-streams" 
title="Permalink to this headline"></a></h3>
+      <p>Kafka Streams assigns the following configuration parameters. If you 
try to change
+        <code class="docutils literal"><span 
class="pre">allow.auto.create.topics</span></code>, your value
+        is ignored and setting it has no effect in a Kafka Streams 
application. You can set the other parameters.
+        Kafka Streams sets them to different default values than a plain
+        <code class="docutils literal"><span 
class="pre">KafkaConsumer</span></code>.
+      <p>Kafka Streams uses the <code class="docutils literal"><span 
class="pre">client.id</span></code>
+        parameter to compute derived client IDs for internal clients. If you 
don't set
+        <code class="docutils literal"><span 
class="pre">client.id</span></code>, Kafka Streams sets it to
+        <code class="docutils literal"><span 
class="pre">&lt;application.id&gt;-&lt;random-UUID&gt;</span></code>.
+      <table border="1" class="non-scrolling-table docutils">
+        <colgroup>
+          <col width="50%">
+          <col width="19%">
+          <col width="31%">
+        </colgroup>
+        <thead valign="bottom">
+        <tr class="row-odd"><th class="head">Parameter Name</th>
+          <th class="head">Corresponding Client</th>
+          <th class="head">Streams Default</th>
+        </tr>
+        </thead>
+        <tbody valign="top">
+        <tr class="row-odd"><td>allow.auto.create.topics</td>
+          <td>Consumer</td>
+          <td>false</td>
+        </tr>
+        <tr class="row-even"><td>auto.offset.reset</td>
+          <td>Consumer</td>
+          <td>earliest</td>
+        </tr>
+        <tr class="row-odd"><td>linger.ms</td>
+          <td>Producer</td>
+          <td>100</td>
+        </tr>
+        <tr class="row-even"><td>max.poll.interval.ms</td>
+          <td>Consumer</td>
+          <td>300000</td>
+        </tr>
+        <tr class="row-odd"><td>max.poll.records</td>
+          <td>Consumer</td>
+          <td>1000</td>
+        </tr>
+        </tbody>
+      </table>
+      <div class="section" id="enable-auto-commit">
+        <span id="streams-developer-guide-consumer-auto-commit"></span><h4><a 
class="toc-backref" href="#id19">enable.auto.commit</a><a class="headerlink" 
href="#enable-auto-commit" title="Permalink to this headline"></a></h4>
+        <blockquote>
+          <div>The consumer auto commit. To guarantee at-least-once processing 
semantics and turn off auto commits, Kafka Streams overrides this consumer 
config
+            value to <code class="docutils literal"><span 
class="pre">false</span></code>.  Consumers will only commit explicitly via 
<em>commitSync</em> calls when the Kafka Streams library or a user decides
+            to commit the current processing state.</div></blockquote>
+      </div>
       <div class="section" 
id="recommended-configuration-parameters-for-resiliency">
         <h3><a class="toc-backref" href="#id21">Recommended configuration 
parameters for resiliency</a><a class="headerlink" 
href="#recommended-configuration-parameters-for-resiliency" title="Permalink to 
this headline"></a></h3>
         <p>There are several Kafka and Kafka Streams configuration options 
that need to be configured explicitly for resiliency in face of broker 
failures:</p>
@@ -845,10 +977,10 @@
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">topicPrefix</span><span class="o">(</span><span 
class="n">TopicConfig</span><span class="o">.</span><span 
class="na">MIN_IN_SYNC_REPLICAS_CONFIG</span><span class="o">),</span> <span 
class="mi">2</span><span class="o">);</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">producerPrefix</span><span class="o">(</span><span 
class="n">ProducerConfig</span><span class="o">.</span><span 
class="na">ACKS_CONFIG</span><span class="o">),</span> <span 
class="s">&quot;all&quot;</span><span class="o">);</span></code></pre></div>
           </div>
-</div>
-</div>
-</div>
-</div>
+        </div>
+      </div>
+    </div>
+  </div>
 
 
                </div>
diff --git a/26/streams/developer-guide/memory-mgmt.html 
b/26/streams/developer-guide/memory-mgmt.html
index 8b02485..91a53c4 100644
--- a/26/streams/developer-guide/memory-mgmt.html
+++ b/26/streams/developer-guide/memory-mgmt.html
@@ -202,7 +202,10 @@
        <span class="o">}</span>
     <span class="o">}</span>
       </div>
-        <sup id="fn1">1. INDEX_FILTER_BLOCK_RATIO can be used to set a 
fraction of the block cache to set aside for "high priority" (aka index and 
filter) blocks, preventing them from being evicted by data blocks. See the full 
signature of the <a class="reference external" 
href="https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/LRUCache.java#L72";>LRUCache
 constructor</a>. </sup>
+        <sup id="fn1">1. INDEX_FILTER_BLOCK_RATIO can be used to set a 
fraction of the block cache to set aside for "high priority" (aka index and 
filter) blocks, preventing them from being evicted by data blocks. See the full 
signature of the <a class="reference external" 
href="https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/LRUCache.java#L72";>LRUCache
 constructor</a>.
+          NOTE: the boolean parameter in the cache constructor lets you 
control whether the cache should enforce a strict memory limit by failing the 
read or iteration in the rare cases where it might go larger than its capacity. 
Due to a
+          <a class="reference external" 
href="https://github.com/facebook/rocksdb/issues/6247";>bug in RocksDB</a>, this 
option cannot be used
+          if the write buffer memory is also counted against the cache. If you 
set this to true, you should NOT pass the cache in to the 
<code>WriteBufferManager</code> and just control the write buffer and cache 
memory separately.</sup>
         <br>
         <sup id="fn2">2. This must be set in order for 
INDEX_FILTER_BLOCK_RATIO to take effect (see footnote 1) as described in the <a 
class="reference external" 
href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks";>RocksDB
 docs</a></sup>
         <br>
diff --git a/26/streams/developer-guide/running-app.html 
b/26/streams/developer-guide/running-app.html
index 8f5341f..6a9128a 100644
--- a/26/streams/developer-guide/running-app.html
+++ b/26/streams/developer-guide/running-app.html
@@ -109,6 +109,18 @@ $ java -cp path-to-app-fatjar.jar 
com.example.MyStreamsApp</code></pre></div>
                       <li>If a local state store exists, the changelog is 
replayed from the previously checkpointed offset. The changes are applied and 
the state is restored to the most recent snapshot. This method takes less time 
because it is applying a smaller portion of the changelog.</li>
                   </ul>
                   <p>For more information, see <a class="reference internal" 
href="config-streams.html#num-standby-replicas"><span class="std 
std-ref">Standby Replicas</span></a>.</p>
+                  <p>
+                      As of version 2.6, Streams will now do most of a task's 
restoration in the background through warmup replicas. These will be assigned 
to instances that need to restore a lot of state for a task.
+                      A stateful active task will only be assigned to an 
instance once its state is within the configured
+                      <a class="reference internal" 
href="config-streams.html#acceptable-recovery-lag"><span class="std 
std-ref"><code>acceptable.recovery.lag</code></span></a>, if one exists. This 
means that
+                      most of the time, a task migration will <b>not</b> 
result in downtime for that task. It will remain active on the instance that's 
already caught up, while the instance that it's being
+                      migrated to works on restoring the state. Streams will 
<a class="reference internal" 
href="config-streams.html#probing-rebalance-interval-ms"><span class="std 
std-ref">regularly probe</span></a> for warmup tasks that have finished 
restoring and transition them to active tasks when ready.
+                  </p>
+                  <p>
+                      Note, the one exception to this task availability is if 
none of the instances have a caught up version of that task. In that case, we 
have no choice but to assign the active
+                      task to an instance that is not caught up and will have 
to block further processing on restoration of the task's state from the 
changelog. If high availability is important
+                      for your application, you are highly recommended to 
enable standbys.
+                  </p>
               </div>
               <div class="section" 
id="determining-how-many-application-instances-to-run">
                   <h3><a class="toc-backref" href="#id8">Determining how many 
application instances to run</a><a class="headerlink" 
href="#determining-how-many-application-instances-to-run" title="Permalink to 
this headline"></a></h3>
diff --git a/26/streams/upgrade-guide.html b/26/streams/upgrade-guide.html
index 4d59818..b308b4b 100644
--- a/26/streams/upgrade-guide.html
+++ b/26/streams/upgrade-guide.html
@@ -42,7 +42,7 @@
     <ul>
         <li> prepare your application instances for a rolling bounce and make 
sure that config <code>upgrade.from</code> is set to the version from which it 
is being upgrade.</li>
         <li> bounce each instance of your application once </li>
-        <li> prepare your newly deployed {{fullDotVersion}} application 
instances for a second round of rolling bounces; make sure to remove the value 
for config <code>upgrade.mode</code> </li>
+        <li> prepare your newly deployed {{fullDotVersion}} application 
instances for a second round of rolling bounces; make sure to remove the value 
for config <code>upgrade.from</code> </li>
         <li> bounce each instance of your application once more to complete 
the upgrade </li>
     </ul>
     <p> As an alternative, an offline upgrade is also possible. Upgrading from 
any versions as old as 0.10.0.x to {{fullDotVersion}} in offline mode require 
the following steps: </p>
@@ -95,7 +95,19 @@
         Note that you need brokers with version 2.5 or newer to use this 
feature.
     </p>
     <p>
-        As of 2.6.0 Kafka Streams deprecates <code>KStream.through()</code> in 
favor of the new <code>KStream.repartition()</code> operator
+        For more highly available stateful applications, we've modified the 
task assignment algorithm to delay the movement of stateful active tasks to 
instances
+        that aren't yet caught up with that task's state. Instead, to migrate 
a task from one instance to another (eg when scaling out),
+        Streams will assign a warmup replica to the target instance so it can 
begin restoring the state while the active task stays available on an instance
+        that already had the task. The instances warming up tasks will 
communicate their progress to the group so that, once ready, Streams can move 
active
+        tasks to their new owners in the background. Check out <a 
href="https://cwiki.apache.org/confluence/x/0i4lBg";>KIP-441</a>
+        for full details, including several new configs for control over this 
new feature.
+    </p>
+    <p>
+        New end-to-end latency metrics have been added. These task-level 
metrics will be logged at the INFO level and report the min and max end-to-end 
latency of a record at the beginning/source node(s)
+        and end/terminal node(s) of a task. See <a 
href="https://cwiki.apache.org/confluence/x/gBkRCQ";>KIP-613</a> for more 
information.
+    </p>
+    <p>
+        As of 2.6.0 Kafka Streams deprecates <code>KStream.through()</code> if 
favor of the new <code>KStream.repartition()</code> operator
         (as per <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint";>KIP-221</a>).
         <code>KStream.repartition()</code> is similar to 
<code>KStream.through()</code>, however Kafka Streams will manage the topic for 
you.
         If you need to write into and read back from a topic that you mange, 
you can fall back to use <code>KStream.to()</code> in combination with 
<code>StreamsBuilder#stream()</code>.

Reply via email to