This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 0f48446 DOCS-3625: Add section to config topic: parameters controlled by Kafka Streams (#8268) 0f48446 is described below commit 0f48446690e42b78a9a6b8c6a9bbab9f01d84cb1 Author: Jim Galasyn <jim.gala...@confluent.io> AuthorDate: Wed Mar 25 16:21:27 2020 -0700 DOCS-3625: Add section to config topic: parameters controlled by Kafka Streams (#8268) Reviewer: Matthias J. Sax <matth...@confluent.io> --- docs/streams/architecture.html | 5 + docs/streams/developer-guide/config-streams.html | 172 ++++++++++++++--------- 2 files changed, 111 insertions(+), 66 deletions(-) diff --git a/docs/streams/architecture.html b/docs/streams/architecture.html index 7efd7ea..75d6d3d 100644 --- a/docs/streams/architecture.html +++ b/docs/streams/architecture.html @@ -80,6 +80,11 @@ tasks will be automatically restarted on other instances and continue to consume from the same stream partitions. </p> + <p><b>NOTE:</b> Topic partitions are assigned to tasks, and tasks are assigned to all threads over all instances, in a best-effort attempt + to trade off load-balancing and stickiness of stateful tasks. For this assignment, Kafka Streams uses the + <a href="https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java">StreamsPartitionAssignor</a> + class and doesn't let you change to a different assignor. If you try to use a different assignor, Kafka Streams ignores it. + <p> The following diagram shows two tasks each assigned with one partition of the input streams. </p> diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index 479e8a1..dc0164a 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -269,6 +269,11 @@ <td colspan="2">The amount of time in milliseconds, before a request is retried. This applies if the <code class="docutils literal"><span class="pre">retries</span></code> parameter is configured to be greater than 0. </td> <td>100</td> </tr> + <tr class="row-even"><td>rocksdb.config.setter</td> + <td>Medium</td> + <td colspan="2">The RocksDB configuration.</td> + <td></td> + </tr> <tr class="row-odd"><td>state.cleanup.delay.ms</td> <td>Low</td> <td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td> @@ -474,6 +479,57 @@ </dl> </div></blockquote> </div> + <div class="section" id="rocksdb-config-setter"> + <span id="streams-developer-guide-rocksdb-config"></span><h4><a class="toc-backref" href="#id20">rocksdb.config.setter</a><a class="headerlink" href="#rocksdb-config-setter" title="Permalink to this headline"></a></h4> + <blockquote> + <div><p>The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default + configuration for RocksDB, you can implement <code class="docutils literal"><span class="pre">RocksDBConfigSetter</span></code> and provide your custom class via <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html">rocksdb.config.setter</a>.</p> + <p>Here is an example that adjusts the memory size consumed by RocksDB.</p> + <div class="highlight-java"><div class="highlight"><pre><span></span> <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">CustomRocksDBConfig</span> <span class="kd">implements</span> <span class="n">RocksDBConfigSetter</span> <span class="o">{</span> + <span class="c1">// This object should be a member variable so it can be closed in RocksDBConfigSetter#close.</span> + <span class="kd">private</span> <span class="n">org.rocksdb.Cache</span> <span class="n">cache</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">rocksdb</span><span class="o">.</span><span class="na">LRUCache</span><span class="o">(</span><span class="mi">16</span> <span class="o">*</span> <span class="mi">1024L</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setConfig</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Map</span><span class="o"><</span><span class="n">String</span><span clas [...] + <span class="c1">// See #1 below.</span> + <span class="n">BlockBasedTableConfig</span> <span class="n">tableConfig</span> <span class="o">=</span> <span class="k">(BlockBasedTableConfig)</span> <span class="n">options</span><span><span class="o">.</span><span class="na">tableFormatConfig</span><span class="o">();</span> + <span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockCache</span><span class="o">(</span><span class="mi">cache</span></span><span class="o">);</span> + <span class="c1">// See #2 below.</span> + <span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockSize</span><span class="o">(</span><span class="mi">16</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span> + <span class="c1">// See #3 below.</span> + <span class="n">tableConfig</span><span class="o">.</span><span class="na">setCacheIndexAndFilterBlocks</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span> + <span class="n">options</span><span class="o">.</span><span class="na">setTableFormatConfig</span><span class="o">(</span><span class="n">tableConfig</span><span class="o">);</span> + <span class="c1">// See #4 below.</span> + <span class="n">options</span><span class="o">.</span><span class="na">setMaxWriteBufferNumber</span><span class="o">(</span><span class="mi">2</span><span class="o">);</span> + <span class="o">}</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">)</span> <span class="o">{</span> + <span class="c1">// See #5 below.</span> + <span class="n">cache</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> + <span class="o">}</span> + <span class="o">}</span> + + <span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span> + <span class="n">streamsConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">ROCKSDB_CONFIG_SETTER_CLASS_CONFIG</span><span class="o">,</span> <span class="n">CustomRocksDBConfig</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> + </pre></div> + </div> + <dl class="docutils"> + <dt>Notes for example:</dt> + <dd><ol class="first last arabic simple"> + <li><code class="docutils literal"><span class="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();</span></code> Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the <code class="docutils literal"><span class="pre">BloomFilter</span></code>, which is an important optimization. + <li><code class="docutils literal"><span class="pre">tableConfig.setBlockSize(16</span> <span class="pre">*</span> <span class="pre">1024L);</span></code> Modify the default <a class="reference external" href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Memory- [...] + <li><code class="docutils literal"><span class="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li> + <li><code class="docutils literal"><span class="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <a class="reference external" href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li> + <li><code class="docutils literal"><span class="pre">cache.close();</span></code> To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management">RocksJava docs</a> for more details.</li> + </ol> + </dd> + </dl> + </div></blockquote> + </div> + </div> + </blockquote> + </div> <div class="section" id="state-dir"> <h4><a class="toc-backref" href="#id14">state.dir</a><a class="headerlink" href="#state-dir" title="Permalink to this headline"></a></h4> <blockquote> @@ -657,21 +713,9 @@ </thead> <tbody valign="top"> <tr class="row-even"><td>auto.offset.reset</td> - <td>Global Consumer</td> - <td>none (cannot be changed)</td> - </tr> - <tr class="row-even"><td>auto.offset.reset</td> - <td>Restore Consumer</td> - <td>none (cannot be changed)</td> - </tr> - <tr class="row-even"><td>auto.offset.reset</td> <td>Consumer</td> <td>earliest</td> </tr> - <tr class="row-odd"><td>enable.auto.commit</td> - <td>Consumer</td> - <td>false</td> - </tr> <tr class="row-even"><td>linger.ms</td> <td>Producer</td> <td>100</td> @@ -684,13 +728,59 @@ <td>Consumer</td> <td>1000</td> </tr> - <tr class="row-odd"><td>rocksdb.config.setter</td> - <td>Consumer</td> - <td> </td> - </tr> </tbody> </table> </div> + <div class="section" id="parameters-controlled-by-kafka-streams"> + <h3><a class="toc-backref" href="#id26">Parameters controlled by Kafka Streams</a><a class="headerlink" href="#parameters-controlled-by-kafka-streams" title="Permalink to this headline"></a></h3> + <p>Kafka Streams assigns the following configuration parameters. If you try to change + <code class="docutils literal"><span class="pre">allow.auto.create.topics</span></code>, your value + is ignored and setting it has no effect in a Kafka Streams application. You can set the other parameters. + Kafka Streams sets them to different default values than a plain + <code class="docutils literal"><span class="pre">KafkaConsumer</span></code>. + <p>Kafka Streams uses the <code class="docutils literal"><span class="pre">client.id</span></code> + parameter to compute derived client IDs for internal clients. If you don't set + <code class="docutils literal"><span class="pre">client.id</span></code>, Kafka Streams sets it to + <code class="docutils literal"><span class="pre"><application.id>-<random-UUID></span></code>. + <table border="1" class="non-scrolling-table docutils"> + <colgroup> + <col width="50%"> + <col width="19%"> + <col width="31%"> + </colgroup> + <thead valign="bottom"> + <tr class="row-odd"><th class="head">Parameter Name</th> + <th class="head">Corresponding Client</th> + <th class="head">Streams Default</th> + </tr> + </thead> + <tbody valign="top"> + <tr class="row-odd"><td>allow.auto.create.topics</td> + <td>Consumer</td> + <td>false</td> + </tr> + <tr class="row-even"><td>auto.offset.reset</td> + <td>Consumer</td> + <td>earliest</td> + </tr> + <tr class="row-odd"><td>linger.ms</td> + <td>Producer</td> + <td>100</td> + </tr> + <tr class="row-even"><td>max.poll.interval.ms</td> + <td>Consumer</td> + <td>300000</td> + </tr> + <tr class="row-odd"><td>max.poll.records</td> + <td>Consumer</td> + <td>1000</td> + </tr> + <tr class="row-even"><td>retries</td> + <td>Producer</td> + <td>10</td> + </tr> + </tbody> + </table> <div class="section" id="enable-auto-commit"> <span id="streams-developer-guide-consumer-auto-commit"></span><h4><a class="toc-backref" href="#id19">enable.auto.commit</a><a class="headerlink" href="#enable-auto-commit" title="Permalink to this headline"></a></h4> <blockquote> @@ -698,56 +788,6 @@ value to <code class="docutils literal"><span class="pre">false</span></code>. Consumers will only commit explicitly via <em>commitSync</em> calls when the Kafka Streams library or a user decides to commit the current processing state.</div></blockquote> </div> - <div class="section" id="rocksdb-config-setter"> - <span id="streams-developer-guide-rocksdb-config"></span><h4><a class="toc-backref" href="#id20">rocksdb.config.setter</a><a class="headerlink" href="#rocksdb-config-setter" title="Permalink to this headline"></a></h4> - <blockquote> - <div><p>The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default - configuration for RocksDB, implement <code class="docutils literal"><span class="pre">RocksDBConfigSetter</span></code> and provide your custom class via <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html">rocksdb.config.setter</a>.</p> - <p>Here is an example that adjusts the memory size consumed by RocksDB.</p> - <div class="highlight-java"><div class="highlight"><pre><span></span> <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">CustomRocksDBConfig</span> <span class="kd">implements</span> <span class="n">RocksDBConfigSetter</span> <span class="o">{</span> - - <span class="c1">// This object should be a member variable so it can be closed in RocksDBConfigSetter#close.</span> - <span class="kd">private</span> <span class="n">org.rocksdb.Cache</span> <span class="n">cache</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">rocksdb</span><span class="o">.</span><span class="na">LRUCache</span><span class="o">(</span><span class="mi">16</span> <span class="o">*</span> <span class="mi">1024L</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span> - - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setConfig</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span [...] - <span class="c1">// See #1 below.</span> - <span class="n">BlockBasedTableConfig</span> <span class="n">tableConfig</span> <span class="o">=</span> <span class="k">(BlockBasedTableConfig)</span> <span class="n">options</span><span><span class="o">.</span><span class="na">tableFormatConfig</span><span class="o">();</span> - <span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockCache</span><span class="o">(</span><span class="mi">cache</span></span><span class="o">);</span> - <span class="c1">// See #2 below.</span> - <span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockSize</span><span class="o">(</span><span class="mi">16</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span> - <span class="c1">// See #3 below.</span> - <span class="n">tableConfig</span><span class="o">.</span><span class="na">setCacheIndexAndFilterBlocks</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span> - <span class="n">options</span><span class="o">.</span><span class="na">setTableFormatConfig</span><span class="o">(</span><span class="n">tableConfig</span><span class="o">);</span> - <span class="c1">// See #4 below.</span> - <span class="n">options</span><span class="o">.</span><span class="na">setMaxWriteBufferNumber</span><span class="o">(</span><span class="mi">2</span><span class="o">);</span> - <span class="o">}</span> - - <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">)</span> <span class="o">{</span> - <span class="c1">// See #5 below.</span> - <span class="n">cache</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> - <span class="o">}</span> - <span class="o">}</span> - -<span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span> -<span class="n">streamsConfig</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">ROCKSDB_CONFIG_SETTER_CLASS_CONFIG</span><span class="o">,</span> <span class="n">CustomRocksDBConfig</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> -</pre></div> - </div> - <dl class="docutils"> - <dt>Notes for example:</dt> - <dd><ol class="first last arabic simple"> - <li><code class="docutils literal"><span class="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();</span></code> Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the <code class="docutils literal"><span class="pre">BloomFilter</span></code>, which is an important optimization. - <li><code class="docutils literal"><span class="pre">tableConfig.setBlockSize(16</span> <span class="pre">*</span> <span class="pre">1024L);</span></code> Modify the default <a class="reference external" href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Memory-usage- [...] - <li><code class="docutils literal"><span class="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li> - <li><code class="docutils literal"><span class="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <a class="reference external" href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li> - <li><code class="docutils literal"><span class="pre">cache.close();</span></code> To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management">RocksJava docs</a> for more details.</li> - </ol> - </dd> - </dl> - </div></blockquote> - </div> - </div> <div class="section" id="recommended-configuration-parameters-for-resiliency"> <h3><a class="toc-backref" href="#id21">Recommended configuration parameters for resiliency</a><a class="headerlink" href="#recommended-configuration-parameters-for-resiliency" title="Permalink to this headline"></a></h3> <p>There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:</p>