http://git-wip-us.apache.org/repos/asf/kafka-site/blob/97a78e3b/0110/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/0110/streams/developer-guide.html 
b/0110/streams/developer-guide.html
index 15298a7..c74ca6e 100644
--- a/0110/streams/developer-guide.html
+++ b/0110/streams/developer-guide.html
@@ -18,8 +18,67 @@
 <script><!--#include virtual="../js/templateData.js" --></script>
 
 <script id="content-template" type="text/x-handlebars-template">
-    <h1>Developer Manual</h1>
+    <h1>Developer Guide for Kafka Streams API</h1>
+    
+    <p>
+        This developer guide describes how to write, configure, and execute a 
Kafka Streams application. There is a <a 
href="/{{version}}/documentation/#quickstart_kafkastreams">quickstart</a> 
example that provides how to run a stream processing program coded in the Kafka 
Streams library.
+    </p>
+
+    <p>
+        The computational logic of a Kafka Streams application is defined as a 
<a 
href="/{{version}}/documentation/streams/core-concepts#streams_topology">processor
 topology</a>. Kafka Streams provide two sets of APIs to define the processor 
topology, Low-Level Processor API and High-Level Streams DSL.
+    </p>
 
+    <ul class="toc">
+        <li><a href="#streams_processor">1. Low-level Processor API</a>
+            <ul>
+                <li><a href="#streams_processor_process">1.1 Processor</a>
+                <li><a href="#streams_processor_topology">1.2 Processor 
Topology</a>
+                <li><a href="#streams_processor_statestore">1.3 State 
Stores</a>
+                <li><a href="#restoration_progress">1.4 Monitoring the 
Restoration Progress of Fault-tolerant State Store</a>
+                <li><a href="#disable-changelogs">1.5 Enable / Disable Fault 
Tolerance of State Stores (Store Changelogs)</a>
+                <li><a href="#implementing-custom-state-stores">1.6 
Implementing Custom State Stores</a>
+                <li><a href="#connecting-processors-and-state-stores">1.7 
Connecting Processors and State Stores</a>
+                <li><a href="#streams_processor_describe">1.5 Describe a 
Topology</a>
+            </ul>
+        </li>
+        <li><a href="#streams_dsl">2. High-Level Streams DSL</a>
+            <ul>
+                <li><a href="#streams_duality">2.1 Duality of Streams and 
Tables</a>
+                <li><a href="#streams_dsl_source">2.2 Creating Source Streams 
from Kafka</a>
+                <li><a href="#streams_dsl_transform">2.3 Transform a stream</a>
+                <li><a href="#streams_dsl_sink">2.4 Write streams back to 
Kafka</a>
+                <li><a href="#streams_dsl_build">2.5 Generate the processor 
topology</a>
+            </ul>
+        </li>
+        <li><a href="#streams_interactive_queries">3. Interactive Queries</a>
+            <ul>
+                <li><a 
href="#streams_developer-guide_interactive-queries_your_app">3.1 Your 
application and interactive queries</a>
+                <li><a 
href="#streams_developer-guide_interactive-queries_local-stores">3.2 Querying 
local state stores (for an application instance)</a>
+                <li><a 
href="#streams_developer-guide_interactive-queries_local-key-value-stores">3.3 
Querying local key-value stores</a>
+                <li><a 
href="#streams_developer-guide_interactive-queries_local-window-stores">3.4 
Querying local window stores</a>
+                <li><a 
href="#streams_developer-guide_interactive-queries_custom-stores">3.5 Querying 
local custom state stores</a>
+                <li><a 
href="#streams_developer-guide_interactive-queries_discovery">3.6 Querying 
remote state stores (for the entire application)</a>
+                <li><a 
href="#streams_developer-guide_interactive-queries_rpc-layer">3.7 Adding an RPC 
layer to your application</a>
+                <li><a 
href="#streams_developer-guide_interactive-queries_expose-rpc">3.8 Exposing the 
RPC endpoints of your application</a>
+                <li><a 
href="#streams_developer-guide_interactive-queries_discover-app-instances-and-stores">3.9
 Discovering and accessing application instances and their respective local 
state stores</a>
+            </ul>
+        </li>
+        <li><a href="#streams_developer-guide_memory-management">4. Memory 
Management</a>
+            <ul>
+                <li><a 
href="#streams_developer-guide_memory-management_record-cache">4.1 Record 
caches in the DSL</a>
+                <li><a 
href="#streams_developer-guide_memory-management_state-store-cache">4.2 State 
store caches in the Processor API</a>
+                <li><a 
href="#streams_developer-guide_memory-management_other_memory_usage">4.3 Other 
memory usage</a>
+            </ul>
+        </li>
+        <li><a href="#streams_configure_execute">5. Application Configuration 
and Execution</a>
+            <ul>
+                <li><a href="#streams_client_config">5.1 Producer and Consumer 
Configuration</a>
+                <li><a href="#streams_broker_config">5.2 Broker 
Configuration</a>
+                <li><a href="#streams_topic_config">5.3 Internal Topic 
Configuration</a>
+                <li><a href="#streams_execute">5.4 Executing Your Kafka 
Streams Application</a>
+            </ul>
+        </li>
+    </ul>
     <p>
         There is a <a 
href="/{{version}}/documentation/#quickstart_kafkastreams">quickstart</a> 
example that provides how to run a stream processing program coded in the Kafka 
Streams library.
         This section focuses on how to write, configure, and execute a Kafka 
Streams application.
@@ -35,19 +94,17 @@
     <h4><a id="streams_processor_process" 
href="#streams_processor_process">Processor</a></h4>
 
     <p>
-        As mentioned in the <a 
href="/{{version}}/documentation/streams/core-concepts"><b>Core 
Concepts</b></a> section, a stream processor is a node in the processor 
topology that represents a single processing step.
-        With the <code>Processor</code> API developers can define arbitrary 
stream processors that process one received record at a time, and connect these 
processors with
+        A <a href="/{{version}}/documentation/streams/core-concepts"><b>stream 
processor</b></a> is a node in the processor topology that represents a single 
processing step.
+        With the <code>Processor</code> API, you can define arbitrary stream 
processors that process one received record at a time, and connect these 
processors with
         their associated state stores to compose the processor topology that 
represents their customized processing logic.
     </p>
 
     <p>
-        The <code>Processor</code> interface provides two main API methods:
-        <code>process</code> and <code>punctuate</code>. The 
<code>process</code> method is performed on each
-        of the received record; and the <code>punctuate</code> method is 
performed periodically based on elapsed time.
-        In addition, the processor can maintain the current 
<code>ProcessorContext</code> instance variable initialized in the
-        <code>init</code> method, and use the context to schedule the 
punctuation period (<code>context().schedule</code>), to
-        forward the modified / new key-value pair to downstream processors 
(<code>context().forward</code>), to commit the current
-        processing progress (<code>context().commit</code>), etc.
+        The <code>Processor</code> interface provides the <code>process</code> 
method API, which is performed on each record that is received.
+        The processor can maintain the current <code>ProcessorContext</code> 
instance variable initialized in the <code>init</code> method
+        and use the context to schedule a periodically called punctuation 
function (<code>context().schedule</code>),
+        to forward the new or modified key-value pair to downstream processors 
(<code>context().forward</code>),
+        to commit the current processing progress 
(<code>context().commit</code>), and so on.
     </p>
 
     <p>
@@ -65,8 +122,26 @@
     // keep the processor context locally because we need it in punctuate() 
and commit()
     this.context = context;
 
-    // call this processor's punctuate() method every 1000 milliseconds.
-    this.context.schedule(1000);
+    // schedule a punctuation method every 1000 milliseconds.
+    this.context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new 
Punctuator() {
+        @Override
+        public void punctuate(long timestamp) {
+            KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
+
+            while (iter.hasNext()) {
+                KeyValue&lt;String, Long&gt; entry = iter.next();
+                context.forward(entry.key, entry.value.toString());
+            }
+
+            // it is the caller's responsibility to close the iterator on 
state store;
+            // otherwise it may lead to memory and file handlers leak 
depending on the
+            // underlying state store implementation.
+            iter.close();
+
+            // commit the current processing progress
+            context.commit();
+        }
+        });
 
     // retrieve the key-value store named "Counts"
     this.kvStore = (KeyValueStore&lt;String, Long&gt;) 
context.getStateStore("Counts");
@@ -96,7 +171,7 @@
         context.forward(entry.key, entry.value.toString());
     }
 
-    iter.close();
+    iter.close(); // avoid OOM
     // commit the current processing progress
     context.commit();
     }
@@ -111,27 +186,28 @@
     </pre>
 
     <p>
-        In the above implementation, the following actions are performed:
+        In the previous example, the following actions are performed:
     </p>
 
     <ul>
         <li>In the <code>init</code> method, schedule the punctuation every 1 
second and retrieve the local state store by its name "Counts".</li>
         <li>In the <code>process</code> method, upon each received record, 
split the value string into words, and update their counts into the state store 
(we will talk about this feature later in the section).</li>
-        <li>In the <code>punctuate</code> method, iterate the local state 
store and send the aggregated counts to the downstream processor, and commit 
the current stream state.</li>
+        <li>In the scheduled <code>punctuate</code> method, iterate the local 
state store and send the aggregated counts to the downstream processor, and 
commit the current stream state.</li>
+        <li>When done with the <code>KeyValueIterator&lt;String, 
Long&gt;</code> you <em>must</em> close the iterator, as shown above or use the 
try-with-resources statement.</li>
     </ul>
 
 
     <h4><a id="streams_processor_topology" 
href="#streams_processor_topology">Processor Topology</a></h4>
 
     <p>
-        With the customized processors defined in the Processor API, 
developers can use the <code>TopologyBuilder</code> to build a processor 
topology
+        With the customized processors defined in the Processor API, you can 
use <code>Topology</code> to build a processor topology
         by connecting these processors together:
     </p>
 
     <pre class="brush: java;">
-    TopologyBuilder builder = new TopologyBuilder();
+    Topology topology = new Topology();
 
-    builder.addSource("SOURCE", "src-topic")
+    topology.addSource("SOURCE", "src-topic")
     // add "PROCESS1" node which takes the source processor "SOURCE" as its 
upstream processor
     .addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
 
@@ -154,63 +230,257 @@
     .addSink("SINK3", "sink-topic3", "PROCESS3");
     </pre>
 
-    There are several steps in the above code to build the topology, and here 
is a quick walk through:
+    Here is a quick walk through of the previous code to build the topology:
 
     <ul>
-        <li>First of all a source node named "SOURCE" is added to the topology 
using the <code>addSource</code> method, with one Kafka topic "src-topic" fed 
to it.</li>
-        <li>Three processor nodes are then added using the 
<code>addProcessor</code> method; here the first processor is a child of the 
"SOURCE" node, but is the parent of the other two processors.</li>
-        <li>Finally three sink nodes are added to complete the topology using 
the <code>addSink</code> method, each piping from a different parent processor 
node and writing to a separate topic.</li>
+        <li>A source node (<code>"SOURCE"</code>) is added to the topology 
using the <code>addSource</code> method, with one Kafka topic 
(<code>"src-topic"</code>) fed to it.</li>
+        <li>Three processor nodes are then added using the 
<code>addProcessor</code> method; here the first processor is a child of the 
source node, but is the parent of the other two processors.</li>
+        <li>Three sink nodes are added to complete the topology using the 
<code>addSink</code> method, each piping from a different parent processor node 
and writing to a separate topic.</li>
     </ul>
 
-    <h4><a id="streams_processor_statestore" 
href="#streams_processor_statestore">State Stores</a></h4>
+<h4><a id="streams_processor_statestore" 
href="#streams_processor_statestore">State Stores</a></h4>
+
+<p>
+To make state stores fault-tolerant (e.g., to recover from machine crashes) as 
well as to allow for state store migration without data loss (e.g., to migrate 
a stateful stream task from one machine to another when elastically adding or 
removing capacity from your application), a state store can be 
<strong>continuously backed up</strong> to a Kafka topic behind the scenes. 
+We sometimes refer to this topic as the state store's associated <em>changelog 
topic</em> or simply its <em>changelog</em>. 
+In the case of a machine failure, for example, the state store and thus the 
application's state can be fully restored from its changelog. 
+You can enable or disable this backup feature for a state store, and thus its 
fault tolerance.
+</p>
+
+<p>
+By default, persistent <strong>key-value stores</strong> are fault-tolerant. 
+They are backed by a <a 
href="https://kafka.apache.org/documentation.html#compaction";>compacted</a> 
changelog topic. 
+The purpose of compacting this topic is to prevent the topic from growing 
indefinitely, to reduce the storage consumed in the associated Kafka cluster, 
and to minimize recovery time if a state store needs to be restored from its 
changelog topic.
+</p>
+
+<p>
+Similarly, persistent <strong>window stores</strong> are fault-tolerant. 
+They are backed by a topic that uses both <em>compaction</em> and 
<em>deletion</em>. 
+Using deletion in addition to compaction is required for the changelog topics 
of window stores because of the structure of the message keys that are being 
sent to the changelog topics: for window stores, the message keys are composite 
keys that include not only the &quot;normal&quot; key but also window 
timestamps. 
+For such composite keys it would not be sufficient to enable just compaction 
in order to prevent a changelog topic from growing out of bounds. 
+With deletion enabled, old windows that have expired will be cleaned up by 
Kafka's log cleaner as the log segments expire. 
+The default retention setting is <code>Windows#maintainMs()</code> + 1 day. 
This setting can be overriden by specifying 
<code>StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG</code>
 in the <code>StreamsConfig</code>.
+</p>
+
+<p>
+One additional note regarding the use of state stores.  Any time you open an 
<code>Iterator</code> from a state store you <em>must</em> call 
<code>close()</code> on the iterator
+when you are done working with it to reclaim resources.  Or you can use the 
iterator from within a try-with-resources statement.
+    By not closing an iterator, you may likely encounter an OOM error.
+</p>
 
-    <p>
-        Note that the <code>Processor</code> API is not limited to only 
accessing the current records as they arrive in the <code>process()</code> 
method, but can also maintain processing states
-        that keep recently arrived records to use in stateful processing 
operations such as windowed joins or aggregation.
-        To take advantage of these states, users can define a state store by 
implementing the <code>StateStore</code> interface (the Kafka Streams library 
also has a few extended interfaces such as <code>KeyValueStore</code>);
-        in practice, though, users usually do not need to customize such a 
state store from scratch but can simply use the <code>Stores</code> factory to 
define a state store by specifying whether it should be persistent, log-backed, 
etc.
-        In the following example, a persistent key-value store named "Counts" 
with key type <code>String</code> and value type <code>Long</code> is created.
-    </p>
 
-    <pre class="brush: java;">
-    StateStoreSupplier countStore = Stores.create("Counts")
-    .withKeys(Serdes.String())
-    .withValues(Serdes.Long())
-    .persistent()
-    .build();
-    </pre>
+<h4><a id="restoration_progress" href="#restoration_progress">Monitoring the 
Restoration Progress of Fault-tolerant State Stores</a></h4>
+
+<p>
+When starting up your application any fault-tolerant state stores don't need a 
restoration process as the persisted state is read from local disk. 
+But there could be situations when a full restore from the backing changelog 
topic is required (e.g., a failure wiped out the local state or your 
application runs in a stateless environment and persisted data is lost on 
re-starts).
+</p>
+
+<p>
+If you have a significant amount of data in the changelog topic, the 
restoration process could take a non-negligible amount of time. 
+Given that processing of new data won't start until the restoration process is 
completed, having a window into the progress of restoration is useful.
+</p>
+
+<p>
+In order to observe the restoration of all state stores you provide your 
application an instance of the 
<code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface. 
+You set the 
<code>org.apache.kafka.streams.processor.StateRestoreListener</code> by calling 
the <code>KafkaStreams#setGlobalStateRestoreListener</code> method.
+</p>
 
-    <p>
-        To take advantage of these state stores, developers can use the 
<code>TopologyBuilder.addStateStore</code> method when building the
-        processor topology to create the local state and associate it with the 
processor nodes that needs to access it; or they can connect a created
-        state store with the existing processor nodes through 
<code>TopologyBuilder.connectProcessorAndStateStores</code>.
-    </p>
+<p>
+ A basic implementation example that prints restoration status to the console:
+</p>
 
-    <pre class="brush: java;">
-    TopologyBuilder builder = new TopologyBuilder();
+<pre class="brush: java;">
+  import org.apache.kafka.common.TopicPartition;
+  import org.apache.kafka.streams.processor.StateRestoreListener;
 
-    builder.addSource("SOURCE", "src-topic")
+   public class ConsoleGlobalRestoreListerner implements StateRestoreListener {
 
-    .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
-    // add the created state store "COUNTS" associated with processor 
"PROCESS1"
-    .addStateStore(countStore, "PROCESS1")
-    .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that 
can generate MyProcessor3 */, "PROCESS1")
-    .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that 
can generate MyProcessor3 */, "PROCESS1")
+      @Override
+      public void onRestoreStart(final TopicPartition topicPartition,
+                                 final String storeName,
+                                 final long startingOffset,
+                                 final long endingOffset) {
 
-    // connect the state store "COUNTS" with processor "PROCESS2"
-    .connectProcessorAndStateStores("PROCESS2", "COUNTS");
+          System.out.print("Started restoration of " + storeName + " partition 
" + topicPartition.partition());
+          System.out.println(" total records to be restored " + (endingOffset 
- startingOffset));
+      }
+
+      @Override
+      public void onBatchRestored(final TopicPartition topicPartition,
+                                  final String storeName,
+                                  final long batchEndOffset,
+                                  final long numRestored) {
 
-    .addSink("SINK1", "sink-topic1", "PROCESS1")
-    .addSink("SINK2", "sink-topic2", "PROCESS2")
-    .addSink("SINK3", "sink-topic3", "PROCESS3");
-    </pre>
+          System.out.println("Restored batch " + numRestored + " for " + 
storeName + " partition " + topicPartition.partition());
+
+      }
+
+      @Override
+      public void onRestoreEnd(final TopicPartition topicPartition,
+                               final String storeName,
+                               final long totalRestored) {
+
+          System.out.println("Restoration complete for " + storeName + " 
partition " + topicPartition.partition());
+      }
+  }
+</pre>
+
+<blockquote>
+<p>
+  The <code>StateRestoreListener</code> instance is shared across all 
<code>org.apache.kafka.streams.processor.internals.StreamThread</code> 
instances and it is assumed all methods are stateless. 
+  If any stateful operations are desired, then the user will need to provide 
synchronization internally.
+</p>
+</blockquote>
+
+<h4> <a id="disable-changelogs" href="#disable-changelogs">Enable / Disable 
Fault Tolerance of State Stores (Store Changelogs)</a></h4>
+
+<p>
+    You can enable or disable fault tolerance for a state store by enabling or 
disabling, respectively ,the changelogging of the store through 
<code>StateStoreBuilder#withLoggingEnabled(Map&lt;String, String&gt;)</code>
+    and <code>StateStoreBuilder#withLoggingDisabled()</code>.
+    You can also fine-tune the associated topic’s configuration if needed.
+</p>
+
+<p>Example for disabling fault-tolerance:</p>
+
+<pre class="brush: java;">
+
+  import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
+  import org.apache.kafka.streams.processor.state.StateStoreBuilder;
+  import org.apache.kafka.streams.state.Stores;
+
+  KeyValueBytesStoreSupplier countStoreSupplier = 
Stores.inMemoryKeyValueStore("Counts");
+  StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
+                                                          Serdes.String(),
+                                                          Serdes.Long())
+                                    .withLoggingDisabled(); // disable backing 
up the store to a changelog topic
+
+</pre>
+
+<blockquote>
+<p>If the changelog is disabled then the attached state store is no longer 
fault tolerant and it can't have any standby replicas</p>
+</blockquote>
+
+<p>
+   Example for enabling fault tolerance, with additional changelog-topic 
configuration: You can add any log config 
+   from kafka.log.LogConfig|core/src/main/scala/kafka/log/LogConfig.scala#L61. 
Unrecognized configs will be ignored.
+</p>
+
+<pre class="brush: java;">
+
+  import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
+  import org.apache.kafka.streams.processor.state.StateStoreBuilder;
+  import org.apache.kafka.streams.state.Stores;
+
+  Map&lt;String, String&gt; changelogConfig = new HashMap();
+  // override min.insync.replicas
+  changelogConfig.put("min.insyc.replicas", "1")
+
+  KeyValueBytesStoreSupplier countStoreSupplier = 
Stores.inMemoryKeyValueStore("Counts");
+  StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
+                                                          Serdes.String(),
+                                                          Serdes.Long())
+                                    .withLoggingEnabled(changelogConfig); // 
enable changelogging, with custom changelog settings
+
+
+</pre>
+
+<h4><a id="implementing-custom-state-stores" 
href="#implementing-custom-state-stores">Implementing custom State 
Stores</a></h4>
+
+<p>
+ Apart from using the built-in state store types, you can also implement your 
own. 
+ The primary interface to implement for the store is 
<code>org.apache.kafka.streams.processor.StateStore</code>. 
+ Beyond that, Kafka Streams also has a few extended interfaces such as 
<code>KeyValueStore</code>.
+</p>
+
+<p>
+  In addition to the actual store, you also need to provide a 
&quot;factory&quot; for the store by implementing the 
<code>org.apache.kafka.streams.processor.state.StoreSupplier</code> interface, 
which Kafka Streams uses to create instances of your store.
+</p>
+
+<p>
+  You also have the option of providing a 
<code>org.apache.kafka.streams.processor.StateRestoreCallback</code> instance 
used to restore the state store from its backing changelog topic. 
+  This is done via the 
<code>org.apache.kafka.streams.processor.ProcessorContext#register</code> call 
inside the <code>StateStore#init</code> all.
+</p>
+
+<pre class="brush: java;">
+  public void init(ProcessorContext context, StateStore store) {
+     context.register(store, false, stateRestoreCallBackIntance);
+   }    
+</pre>
+
+<p>
+  There is an additional interface 
<code>org.apache.kafka.streams.processor.BatchingStateRestoreCallback</code> 
that provides bulk restoration semantics vs. the single record-at-a-time 
restoration semantics offered by the <code>StateRestoreCallback</code> 
interface.
+</p>
+
+<p>
+  Addtionally there are two abstract classes that implement 
<code>StateRestoreCallback</code> or <code>BatchingStateRestoreCallback</code> 
in conjuntion with the 
<code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface 
(<code>org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback</code>
 and 
<code>org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback</code>
 respectively) that provide the ability for the state store to recieve 
notification of restoration progress for that store. 
+  The <code>StateRestoreListener</code> in this case is per state store 
instance and is used for internal purposes such as updating config settings 
based on the status of the restoration process.
+</p>
+
+<h4><a id="connecting-processors-and-state-stores" 
href="#connecting-processors-and-state-stores">Connecting Processors and State 
Stores</a></h4>
+
+<p>
+Now that we have defined a processor (WordCountProcessor) and the state 
stores, we can now construct the processor topology by connecting these 
processors and state stores together by using the <code>Topology</code> 
instance. 
+In addition, users can add <em>source processors</em> with the specified Kafka 
topics to generate input data streams into the topology, and <em>sink 
processors</em> with the specified Kafka topics to generate output data streams 
out of the topology.
+</p>
+
+<pre class="brush: java;">
+       Topology topology = new Topology();
+
+      // add the source processor node that takes Kafka topic "source-topic" 
as input
+      topology.addSource("Source", "source-topic")
+
+      // add the WordCountProcessor node which takes the source processor as 
its upstream processor
+      .addProcessor("Process", () -> new WordCountProcessor(), "Source")
+
+      // add the count store associated with the WordCountProcessor processor
+      .addStateStore(countStoreSupplier, "Process")
+
+      // add the sink processor node that takes Kafka topic "sink-topic" as 
output
+      // and the WordCountProcessor node as its upstream processor
+      .addSink("Sink", "sink-topic", "Process");
+</pre>
+
+<p>There are several steps in the above implementation to build the topology, 
and here is a quick walk-through:</p>
+<ul>
+   <li>A source processor node named &quot;Source&quot; is added to the 
topology using the <code>addSource</code> method, with one Kafka topic 
&quot;source-topic&quot; fed to it.</li>
+   <li>A processor node named &quot;Process&quot; with the pre-defined 
<code>WordCountProcessor</code> logic is then added as the downstream processor 
of the &quot;Source&quot; node using the <code>addProcessor</code> method.</li>
+   <li>A predefined persistent key-value state store is created and associated 
with the &quot;Process&quot; node, using <code>countStoreSupplier</code>.</li>
+   <li>A sink processor node is then added to complete the topology using the 
<code>addSink</code> method, taking the &quot;Process&quot; node as its 
upstream processor and writing to a separate &quot;sink-topic&quot; Kafka 
topic.</li>
+</ul>
+
+<p>
+In this topology, the &quot;Process&quot; stream processor node is considered 
a downstream processor of the &quot;Source&quot; node, and an upstream 
processor of the &quot;Sink&quot; node. 
+As a result, whenever the &quot;Source&quot; node forward a newly fetched 
record from Kafka to its downstream &quot;Process&quot; node, 
<code>WordCountProcessor#process()</code> method is triggered to process the 
record and update the associated state store; and whenever 
<code>context#forward()</code> is called in the 
<code>Punctuator#punctuate()</code> method, the aggregate key-value pair will 
be sent via the &quot;Sink&quot; processor node to the Kafka topic 
&quot;sink-topic&quot;.
+Note that in the <code>WordCountProcessor</code> implementation, users need to 
refer to the same store name &quot;Counts&quot; when accessing the key-value 
store; otherwise an exception will be thrown at runtime, indicating that the 
state store cannot be found; also, if the state store itself is not associated 
with the processor in the <code>Topology</code> code, accessing it in the 
processor's <code>init()</code> method will also throw an exception at runtime, 
indicating the state store is not accessible from this processor.
+</p>
+
+
+    <h4><a id="streams_processor_describe" 
href="#streams_processor_describe">Describe a <code>Topology</code></a></h4>
+
+    <p>
+        After a <code>Topology</code> is specified, it is possible to retrieve 
a description of the corresponding DAG via <code>#describe()</code> that 
returns a <code>TopologyDescription</code>.
+        A <code>TopologyDescription</code> contains all added source, 
processor, and sink nodes as well as all attached stores.
+        You can access the specified input and output topic names and patterns 
for source and sink nodes.
+        For processor nodes, the attached stores are added to the description.
+        Additionally, all nodes have a list to all their connected successor 
and predecessor nodes.
+        Thus, <code>TopologyDescritpion</code> allows to retrieve the DAG 
structure of the specified topology.
+        <br />
+        Note that global stores are listed explicitly because they are 
accessible by all nodes without the need to explicitly connect them.
+        Furthermore, nodes are grouped by <code>Sub-topologies</code>, where 
each sub-topology is a group of processor nodes that are directly connected to 
each other (i.e., either by a direct connection&mdash;but not a topic&mdash;or 
by sharing a store).
+        During execution, each <code>Sub-topology</code> will be processed by 
<a 
href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">one
 or multiple tasks</a>.
+        Thus, each <code>Sub-topology</code> describes an independent unit of 
works that can be executed by different threads in parallel.
+        <br />
+        Describing a <code>Topology</code> before starting your streams 
application with the specified topology is helpful to reason about tasks and 
thus maximum parallelism (we will talk about how to execute your written 
application later in this section).
+        It is also helpful to get insight into a <code>Topology</code> if it 
is not specified directly as described above but via Kafka Streams DSL (we will 
describe the DSL in the next section.
+    </p>
 
     In the next section we present another way to build the processor 
topology: the Kafka Streams DSL.
     <br>
 
     <h3><a id="streams_dsl" href="#streams_dsl">High-Level Streams DSL</a></h3>
 
-    To build a processor topology using the Streams DSL, developers can apply 
the <code>KStreamBuilder</code> class, which is extended from the 
<code>TopologyBuilder</code>.
+    To build a <code>Topology</code> using the Streams DSL, developers can 
apply the <code>StreamsBuilder</code> class.
     A simple example is included with the source code for Kafka in the 
<code>streams/examples</code> package. The rest of this section will walk
     through some code to demonstrate the key steps in creating a topology 
using the Streams DSL, but we recommend developers to read the full example 
source
     codes for details.
@@ -264,25 +534,1438 @@
     ("alice", 1) --> ("alice", 3)
     </pre>
 
-    If these records a KStream and the stream processing application were to 
sum the values it would return <code>4</code>. If these records were a KTable 
or GlobalKTable, the return would be <code>3</code>, since the last record 
would be considered as an update.
+    If the stream is defined as a KStream and the stream processing 
application were to sum the values it would return <code>4</code>. If the 
stream is defined as a KTable or GlobalKTable, the return would be 
<code>3</code>, since the last record would be considered as an update.
+
+    <h4><a id="streams_dsl_source" href="#streams_dsl_source">Creating Source 
Streams from Kafka</a></h4>
 
-    <h4><a id="streams_dsl_source" href="#streams_dsl_source">Create Source 
Streams from Kafka</a></h4>
+    <p>
+    You can easily read data from Kafka topics into your application. We 
support the following operations.
+    </p>
+    <table class="data-table" border="1">
+        <tbody><tr>
+            <th>Reading from Kafka</th>
+            <th>Description</th>
+        </tr>
+        <tr>
+            <td><b>Stream</b>: input topic(s) &rarr; <code>KStream</code></td>
+            <td>Create a <code>KStream</code> from the specified Kafka input 
topic(s), interpreting the data as a record stream.
+                A <code>KStream</code> represents a partitioned record stream.
+                <p>
+                    Slightly simplified, in the case of a KStream, the local 
KStream instance of every application instance will be populated
+                    with data from only a <b>subset</b> of the partitions of 
the input topic. Collectively, i.e. across all application instances,
+                    all the partitions of the input topic will be read and 
processed.
+                </p>
+                <pre class="brush: java;">
+                    import org.apache.kafka.common.serialization.Serdes;
+                    import org.apache.kafka.streams.StreamsBuilder;
+                    import org.apache.kafka.streams.kstream.KStream;
+
+                    StreamsBuilder builder = new StreamsBuilder();
+
+                    KStream&lt;String, Long&gt; wordCounts = builder.stream(
+                        "word-counts-input-topic" /* input topic */,
+                        Consumed.with(Serdes.String(), Serdes.Long()); // 
define key and value serdes
+                </pre>
+                When to provide serdes explicitly:
+                <ul>
+                    <li>If you do not specify serdes explicitly, the default 
serdes from the configuration are used.</li>
+                    <li>You must specificy serdes explicitly if the key and/or 
value types of the records in the Kafka input topic(s) do not match
+                        the configured default serdes. </li>
+                </ul>
+                Several variants of <code>stream</code> exist to e.g. specify 
a regex pattern for input topics to read from.</td>
+        </tr>
+        <tr>
+            <td><b>Table</b>: input topic(s) &rarr; <code>KTable</code></td>
+            <td>
+                Reads the specified Kafka input topic into a 
<code>KTable</code>. The topic is interpreted as a changelog stream,
+                where records with the same key are interpreted as UPSERT aka 
INSERT/UPDATE (when the record value is not <code>null</code>) or
+                as DELETE (when the value is null) for that key.
+                <p>
+                    Slightly simplified, in the case of a KTable, the local 
KTable instance of every application instance will be populated
+                    with data from only a subset of the partitions of the 
input topic. Collectively, i.e. across all application instances, all
+                    the partitions of the input topic will be read and 
processed.
+                </p>
+                <p>
+                You may provide an optional name for the table (more 
precisely, for the internal state store that backs the table).
+                When a name is provided, the table can be queryied using <a 
href="#streams_interactive_queries">interactive queries</a>.
+                When a name is not provided the table will not queryable and 
an internal name will be provided for the state store.
+                </p>
+                <pre class="brush: java;">
+                    import org.apache.kafka.common.serialization.Serdes;
+                    import org.apache.kafka.streams.StreamsBuilder;
+                    import org.apache.kafka.streams.kstream.KTable;
+
+                    StreamsBuilder builder = new StreamsBuilder();
+
+                    KTable&lt;String, Long&gt; wordCounts = builder.table(
+                        Serdes.String(), /* key serde */
+                        Serdes.Long(),   /* value serde */
+                        "word-counts-input-topic", /* input topic */
+                        "word-counts-partitioned-store" /* table/store name 
*/);
+                </pre>
+
+                When to provide serdes explicitly:
+                <ul>
+                    <li>If you do not specify serdes explicitly, the default 
serdes from the configuration are used.</li>
+                    <li>You must specificy serdes explicitly if the key and/or 
value types of the records in the Kafka input topic do not
+                        match the configured default serdes.</li>
+                </ul>
+
+                Several variants of <code>table</code> exist to e.g. specify 
the <code>auto.offset.reset</code>
+                policy to be used when reading from the input topic.
+            </td>
+        <tr>
+            <td><b>Global Table</b>: input topic &rarr; 
<code>GlobalKTable</code></td>
+            <td>
+                Reads the specified Kafka input topic into a 
<code>GlobalKTable</code>. The topic is interpreted as a changelog stream, 
where records
+                with the same key are interpreted as UPSERT aka INSERT/UPDATE 
(when the record value is not <code>null</code>) or as DELETE (when the
+                value is <code>null</code>) for that key.
+                <p>
+                    Slightly simplified, in the case of a GlobalKTable, the 
local GlobalKTable instance of every application instance will be
+                    populated with data from all the partitions of the input 
topic. In other words, when using a global table, every application
+                    instance will get its own, full copy of the topic's data.
+                </p>
+                <p>
+                You may provide an optional name for the table (more 
precisely, for the internal state store that backs the table).
+                When a name is provided, the table can be queryied using <a 
href="#streams_interactive_queries">interactive queries</a>.
+                When a name is not provided the table will not queryable and 
an internal name will be provided for the state store.
+                </p>
+                <pre class="brush: java;">
+                    import org.apache.kafka.common.serialization.Serdes;
+                    import org.apache.kafka.streams.StreamsBuilder;
+                    import org.apache.kafka.streams.kstream.GlobalKTable;
+
+                    StreamsBuilder builder = new StreamsBuilder();
+
+                    GlobalKTable&lt;String, Long&gt; wordCounts = 
builder.globalTable(
+                        Serdes.String(), /* key serde */
+                        Serdes.Long(),   /* value serde */
+                        "word-counts-input-topic", /* input topic */
+                        "word-counts-global-store" /* table/store name */);
+                </pre>
+
+                When to provide serdes explicitly:
+                <ul>
+                    <li>If you do not specify serdes explicitly, the default 
serdes from the configuration are used.</li>
+                    <li>You must specificy serdes explicitly if the key and/or 
value types of the records in the Kafka input topic do not
+                        match the configured default serdes.</li>
+                </ul>
+                Several variants of <code>globalTable</code> exist to e.g. 
specify explicit serdes.
+
+            </td>
+        </tbody>
+    </table>
+
+    <h4><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform 
a stream</a></h4>
+    <p>
+    <code>KStream</code> and <code>KTable</code> support a variety of 
transformation operations. Each of these operations
+    can be translated into one or more connected processors into the 
underlying processor topology. Since <code>KStream</code>
+    and <code>KTable</code> are strongly typed, all these transformation 
operations are defined as generic functions where
+    users could specify the input and output data types.
+    </p>
+    <p>
+    Some <code>KStream</code> transformations may generate one or more 
<code>KStream</code> objects (e.g., filter and
+    map on <code>KStream</code> generate another <code>KStream</code>, while 
branch on <code>KStream</code> can generate
+    multiple <code>KStream</code> instances) while some others may generate a 
<code>KTable</code> object (e.g., aggregation) interpreted
+    as the changelog stream to the resulted relation. This allows Kafka 
Streams to continuously update the computed value upon arrival
+    of late records after it has already been produced to the downstream 
transformation operators. As for <code>KTable</code>,
+    all its transformation operations can only generate another 
<code>KTable</code> (though the Kafka Streams DSL does
+    provide a special function to convert a <code>KTable</code> representation 
into a <code>KStream</code>, which we will
+    describe later). Nevertheless, all these transformation methods can be 
chained together to compose a complex processor topology.
+    </p>
+    <p>
+    We describe these transformation operations in the following subsections, 
categorizing them into two categories:
+    stateless and stateful transformations.
+    </p>
+    <h5><a id="streams_dsl_transformations_stateless" 
href="#streams_dsl_transformations_stateless">Stateless transformations</a></h5>
+    <p>
+    Stateless transformations, by definition, do not depend on any state for 
processing, and hence implementation-wise they do not
+    require a state store associated with the stream processor.
+    </p>
+    <table class="data-table" border="1">
+        <tbody><tr>
+            <th>Transformation</th>
+            <th>Description</th>
+        </tr>
+        <tr>
+            <td><b>Branch</b>: <code>KStream &rarr; KStream</code></td>
+            <td>
+                <p>
+                Branch (or split) a <code>KStream</code> based on the supplied 
predicates into one or more <code>KStream</code> instances.
+                </p>
+                <p>
+                Predicates are evaluated in order. A record is placed to one 
and only one output stream on the first match:
+                if the n-th predicate evaluates to true, the record is placed 
to n-th stream. If no predicate matches,
+                the record is dropped.
+                </p>
+                <p>
+                Branching is useful, for example, to route records to 
different downstream topics.
+                </p>
+                <pre class="brush: java;">
+                    KStream&lt;String, Long&gt; stream = ...;
+                    KStream&lt;String, Long&gt;[] branches = stream.branch(
+                            (key, value) -> key.startsWith("A"), /* first 
predicate  */
+                            (key, value) -> key.startsWith("B"), /* second 
predicate */
+                            (key, value) -> true                 /* third 
predicate  */
+                    );
+                    // KStream branches[0] contains all records whose keys 
start with "A"
+                    // KStream branches[1] contains all records whose keys 
start with "B"
+                    // KStream branches[2] contains all other records
+                    // Java 7 example: cf. `filter` for how to create 
`Predicate` instances
+            </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Filter</b>: <code>KStream &rarr; KStream or KTable &rarr; 
KTable</code></td>
+            <td>
+                <p>
+                Evaluates a boolean function for each element and retains 
those for which the function returns true.
+                </p>
+                <pre class="brush: java;">
+                     KStream&lt;String, Long&gt; stream = ...;
+                     KTable&lt;String, Long&gt; table = ...;
+                     // A filter that selects (keeps) only positive numbers
+                     // Java 8+ example, using lambda expressions
+                     KStream&lt;String, Long&gt; onlyPositives = 
stream.filter((key, value) -> value > 0);
+
+                     // Java 7 example
+                     KStream&lt;String, Long&gt; onlyPositives = stream.filter(
+                       new Predicate&lt;String, Long&gt;() {
+                         @Override
+                         public boolean test(String key, Long value) {
+                           return value > 0;
+                         }
+                       });
+
+                    // A filter on a KTable that materializes the result into 
a StateStore
+                    table.filter((key, value) -> value != 0, 
Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, 
byte[]&gt;&gt;as("filtered"));
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Inverse Filter</b>: <code>KStream &rarr; KStream or KTable 
&rarr; KTable</code></td>
+            <td>
+                <p>
+                Evaluates a boolean function for each element and drops those 
for which the function returns true.
+                </p>
+                <pre class="brush: java;">
+                     KStream&lt;String, Long&gt; stream = ...;
+
+                     // An inverse filter that discards any negative numbers 
or zero
+                     // Java 8+ example, using lambda expressions
+                     KStream&lt;String, Long&gt; onlyPositives = 
stream.filterNot((key, value) -> value <= 0);
+
+                     // Java 7 example
+                     KStream&lt;String, Long&gt; onlyPositives = 
stream.filterNot(
+                      new Predicate&lt;String, Long&gt;() {
+                        @Override
+                        public boolean test(String key, Long value) {
+                            return value <= 0;
+                        }
+                     });
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>FlatMap</b>: <code>KStream &rarr; KStream </code></td>
+            <td>
+                <p>
+                Takes one record and produces zero, one, or more records. You 
can modify the record keys and values,
+                including their types.
+                </p>
+
+                <p>
+                Marks the stream for data re-partitioning: Applying a grouping 
or a join after <code>flatMap</code> will result in
+                re-partitioning of the records. If possible use 
<code>flatMapValues</code> instead, which will not cause data re-partitioning.
+                </p>
+                <pre class="brush: java;">
+                     KStream&lt;Long, String> stream = ...;
+                     KStream&lt;String, Integer&gt; transformed = 
stream.flatMap(
+                         // Here, we generate two output records for each 
input record.
+                         // We also change the key and value types.
+                         // Example: (345L, "Hello") -> ("HELLO", 1000), 
("hello", 9000)
+                         (key, value) -> {
+                             List&lt;KeyValue&lt;String, Integer&gt;&gt; 
result = new LinkedList&lt;&gt;();
+                             result.add(KeyValue.pair(value.toUpperCase(), 
1000));
+                             result.add(KeyValue.pair(value.toLowerCase(), 
9000));
+                             return result;
+                         }
+                     );
+                     // Java 7 example: cf. `map` for how to create 
`KeyValueMapper` instances
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>FlatMap (values only)</b>: <code>KStream &rarr; KStream 
</code></td>
+            <td>
+                <p>
+                Takes one record and produces zero, one, or more records, 
while retaining the key of the original record.
+                You can modify the record values and the value type.
+                </p>
+                <p>
+                <code>flatMapValues</code> is preferable to 
<code>flatMap</code> because it will not cause data re-partitioning. However,
+                it does not allow you to modify the key or key type like 
<code>flatMap</code> does.
+                </p>
+                <pre class="brush: java;">
+                   // Split a sentence into words.
+                   KStream&lt;byte[], String&gt; sentences = ...;
+                   KStream&lt;byte[], String&gt; words = 
sentences.flatMapValues(value -> Arrays.asList(value.split("\\s+")));
+
+                   // Java 7 example: cf. `mapValues` for how to create 
`ValueMapper` instances
+               </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Foreach</b>: <code>KStream &rarr; void </code></td>
+            <td>
+                <p>
+                Terminal operation. Performs a stateless action on each record.
+                </p>
+                <p>
+                Note on processing guarantees: Any side effects of an action 
(such as writing to external systems)
+                are not trackable by Kafka, which means they will typically 
not benefit from Kafka's processing guarantees.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;String, Long&gt; stream = ...;
+
+                       // Print the contents of the KStream to the local 
console.
+                       // Java 8+ example, using lambda expressions
+                       stream.foreach((key, value) -> System.out.println(key + 
" => " + value));
+
+                       // Java 7 example
+                       stream.foreach(
+                           new ForeachAction&lt;String, Long&gt;() {
+                               @Override
+                               public void apply(String key, Long value) {
+                                 System.out.println(key + " => " + value);
+                               }
+                       });
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>GroupByKey</b>: <code>KStream &rarr; KGroupedStream 
</code></td>
+            <td>
+                <p>
+                Groups the records by the existing key.
+                </p>
+                <p>
+                Grouping is a prerequisite for aggregating a stream or a table 
and ensures that data is properly
+                partitioned ("keyed") for subsequent operations.
+                </p>
+                <p>
+                <b>When to set explicit serdes</b>: Variants of 
<code>groupByKey</code> exist to override the configured default serdes of
+                your application, which you must do if the key and/or value 
types of the resulting <code>KGroupedStream</code> do
+                not match the configured default serdes.
+                </p>
+                <p>
+                <b>Note:</b>
+                Grouping vs. Windowing: A related operation is windowing, 
which lets you control how to "sub-group" the
+                grouped records of the same key into so-called windows for 
stateful operations such as windowed aggregations
+                or windowed joins.
+                </p>
+                <p>
+                Causes data re-partitioning if and only if the stream was 
marked for re-partitioning. <code>groupByKey</code> is
+                preferable to <code>groupBy</code> because it re-partitions 
data only if the stream was already marked for re-partitioning.
+                However, <code>groupByKey</code> does not allow you to modify 
the key or key type like <code>groupBy</code> does.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+
+                       // Group by the existing key, using the application's 
configured
+                       // default serdes for keys and values.
+                       KGroupedStream&lt;byte[], String&gt; groupedStream = 
stream.groupByKey();
+
+                       // When the key and/or value types do not match the 
configured
+                       // default serdes, we must explicitly specify serdes.
+                       KGroupedStream&lt;byte[], String&gt; groupedStream = 
stream.groupByKey(
+                           Serialized.with(
+                                Serdes.ByteArray(), /* key */
+                                Serdes.String())     /* value */
+                       );
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>GroupBy</b>: <code>KStream &rarr; KGroupedStream or KTable 
&rarr; KGroupedTable</code></td>
+            <td>
+                <p>
+                Groups the records by a new key, which may be of a different 
key type. When grouping a table,
+                you may also specify a new value and value type. groupBy is a 
shorthand for selectKey(...).groupByKey().
+                </p>
+                <p>
+                Grouping is a prerequisite for aggregating a stream or a table 
and ensures that data is properly
+                partitioned ("keyed") for subsequent operations.
+                </p>
+                <p>
+                <b>When to set explicit serdes</b>: Variants of groupBy exist 
to override the configured default serdes of your
+                application, which you must do if the key and/or value types 
of the resulting KGroupedStream or
+                KGroupedTable do not match the configured default serdes.
+                </p>
+                <p>
+                <b>Note:</b>
+                Grouping vs. Windowing: A related operation is windowing, 
which lets you control how to “sub-group” the
+                grouped records of the same key into so-called windows for 
stateful operations such as windowed aggregations
+                or windowed joins.
+                </p>
+                <p>
+                <b>Always causes data re-partitioning:</b> groupBy always 
causes data re-partitioning. If possible use groupByKey
+                instead, which will re-partition data only if required.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+                       KTable&lt;byte[], String&gt; table = ...;
+
+                       // Java 8+ examples, using lambda expressions
+
+                       // Group the stream by a new key and key type
+                       KGroupedStream&lt;String, String&gt; groupedStream = 
stream.groupBy(
+                           (key, value) -> value,
+                           Serialize.with(
+                                Serdes.String(), /* key (note: type was 
modified) */
+                                Serdes.String())  /* value */
+                       );
+
+                       // Group the table by a new key and key type, and also 
modify the value and value type.
+                       KGroupedTable&lt;String, Integer&gt; groupedTable = 
table.groupBy(
+                           (key, value) -> KeyValue.pair(value, 
value.length()),
+                           Serialized.with(
+                               Serdes.String(), /* key (note: type was 
modified) */
+                               Serdes.Integer()) /* value (note: type was 
modified) */
+                       );
+
+
+                       // Java 7 examples
+
+                       // Group the stream by a new key and key type
+                       KGroupedStream&lt;String, String&gt; groupedStream = 
stream.groupBy(
+                           new KeyValueMapper&lt;byte[], String, 
String&gt;&gt;() {
+                               @Override
+                               public String apply(byte[] key, String value) {
+                                  return value;
+                               }
+                           },
+                           Serialized.with(
+                                Serdes.String(), /* key (note: type was 
modified) */
+                                Serdes.String())  /* value */
+                       );
+
+                       // Group the table by a new key and key type, and also 
modify the value and value type.
+                       KGroupedTable&lt;String, Integer&gt; groupedTable = 
table.groupBy(
+                            new KeyValueMapper&lt;byte[], String, 
KeyValue&lt;String, Integer&gt;&gt;() {
+                            @Override
+                                public KeyValue&lt;String, Integer&gt; 
apply(byte[] key, String value) {
+                                   return KeyValue.pair(value, value.length());
+                                }
+                            },
+                            Serialized.with(
+                                Serdes.String(), /* key (note: type was 
modified) */
+                                Serdes.Integer()) /* value (note: type was 
modified) */
+                       );
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Map</b>: <code>KStream &rarr; KStream</code></td>
+            <td>
+                <p>
+                Takes one record and produces one record. You can modify the 
record key and value, including their types.
+                </p>
+
+                <p>
+                <b>Marks the stream for data re-partitioning:</b> Applying a 
grouping or a join after <code>flatMap</code> will result in
+                re-partitioning of the records. If possible use 
<code>mapValues</code> instead, which will not cause data re-partitioning.
+                </p>
+
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+
+                       // Java 8+ example, using lambda expressions
+                       // Note how we change the key and the key type (similar 
to `selectKey`)
+                       // as well as the value and the value type.
+                       KStream&lt;String, Integer&gt; transformed = stream.map(
+                           (key, value) -> KeyValue.pair(value.toLowerCase(), 
value.length()));
+
+                       // Java 7 example
+                       KStream&lt;String, Integer&gt; transformed = stream.map(
+                           new KeyValueMapper&lt;byte[], String, 
KeyValue&lt;String, Integer&gt;&gt;() {
+                           @Override
+                           public KeyValue&lt;String, Integer&gt; apply(byte[] 
key, String value) {
+                               return new 
KeyValue&lt;&gt;(value.toLowerCase(), value.length());
+                           }
+                       });
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Map (values only)</b>: <code>KStream &rarr; KStream or 
KTable &rarr; KTable</code></td>
+            <td>
+                <p>
+                Takes one record and produces one record, while retaining the 
key of the original record. You can modify
+                the record value and the value type.
+                </p>
+                <p>
+                <code>mapValues</code> is preferable to <code>map</code> 
because it will not cause data re-partitioning. However, it does not
+                allow you to modify the key or key type like <code>map</code> 
does.
+                </p>
+
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+                       KTable&lt;String, String&gt; table = ...;
+
+                       // Java 8+ example, using lambda expressions
+                       KStream&lt;byte[], String&gt; uppercased = 
stream.mapValues(value -> value.toUpperCase());
+
+                       // Java 7 example
+                       KStream&lt;byte[], String&gt; uppercased = 
stream.mapValues(
+                          new ValueMapper&lt;String&gt;() {
+                          @Override
+                          public String apply(String s) {
+                             return s.toUpperCase();
+                          }
+                       });
+
+                       // mapValues on a KTable and also materialize the 
results into a statestore
+                       table.mapValue(value -> value.toUpperCase(), 
Materialized.&lt;String, String, KeyValueStore&lt;Bytes, 
byte[]&gt;&gt;as("uppercased"));
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Print</b>: <code>KStream &rarr; void </code></td>
+            <td>
+                <p>
+                Terminal operation. Prints the records to 
<code>System.out</code>. See Javadocs for serde and <code>toString()</code> 
caveats.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+                       stream.print();
+                    
+                       // You can also override how and where the data is 
printed, i.e, to file:
+                       stream.print(Printed.toFile("stream.out"));
+
+                       // with a custom KeyValueMapper and label
+                       stream.print(Printed.toSysOut()
+                                .withLabel("my-stream")
+                                .withKeyValueMapper((key, value) -> key + " -> 
" + value));
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>SelectKey</b>: <code>KStream &rarr; KStream</code></td>
+            <td>
+                <p>
+                Assigns a new key, possibly of a new key type, to each record.
+                </p>
+                <p>
+                Marks the stream for data re-partitioning: Applying a grouping 
or a join after <code>flatMap</code> will result in
+                re-partitioning of the records.
+                </p>
+
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+
+                       // Derive a new record key from the record's value.  
Note how the key type changes, too.
+                       // Java 8+ example, using lambda expressions
+                       KStream&lt;String, String&gt; rekeyed = 
stream.selectKey((key, value) -> value.split(" ")[0])
+
+                       // Java 7 example
+                       KStream&lt;String, String&gt; rekeyed = 
stream.selectKey(
+                           new KeyValueMapper&lt;byte[], String, String&gt;() {
+                           @Override
+                           public String apply(byte[] key, String value) {
+                              return value.split(" ")[0];
+                           }
+                         });
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Table to Stream</b>: <code>KTable &rarr; KStream</code></td>
+            <td>
+                <p>
+                Converts this table into a stream.
+                </p>
+                <pre class="brush: java;">
+                       KTable&lt;byte[], String> table = ...;
+
+                       // Also, a variant of `toStream` exists that allows you
+                       // to select a new key for the resulting stream.
+                       KStream&lt;byte[], String> stream = table.toStream();
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>WriteAsText</b>: <code>KStream &rarr; void </code></td>
+            <td>
+                <p>
+                Terminal operation. Write the records to a file. See Javadocs 
for serde and <code>toString()</code> caveats.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+                       stream.writeAsText("/path/to/local/output.txt");
+
+                       // Several variants of `writeAsText` exist to e.g. 
override the
+                       // default serdes for record keys and record values.
+                       stream.writeAsText("/path/to/local/output.txt", 
Serdes.ByteArray(), Serdes.String());
+                </pre>
+            </td>
+        </tr>
+        </tbody>
+    </table>
+
+
+    <h5><a id="streams_dsl_transformations_stateful" 
href="#streams_dsl_transformations_stateful">Stateful transformations</a></h5>
+    <h6><a id="streams_dsl_transformations_stateful_overview" 
href="#streams_dsl_transformations_stateful_overview">Overview</a></h6>
+    <p>
+        Stateful transformations, by definition, depend on state for 
processing inputs and producing outputs, and
+        hence implementation-wise they require a state store associated with 
the stream processor. For example,
+        in aggregating operations, a windowing state store is used to store 
the latest aggregation results per window;
+        in join operations, a windowing state store is used to store all the 
records received so far within the
+        defined window boundary.
+    </p>
+    <p>
+        Note, that state stores are fault-tolerant. In case of failure, Kafka 
Streams guarantees to fully restore
+        all state stores prior to resuming the processing.
+    </p>
+    <p>
+        Available stateful transformations in the DSL include:
+    <ul>
+        <li><a href=#streams_dsl_aggregations>Aggregating</a></li>
+        <li><a href="#streams_dsl_joins">Joining</a></li>
+        <li><a href="#streams_dsl_windowing">Windowing (as part of 
aggregations and joins)</a></li>
+        <li>Applying custom processors and transformers, which may be 
stateful, for Processor API integration</li>
+    </ul>
+    </p>
+    <p>
+        The following diagram shows their relationships:
+    </p>
+    <figure>
+        <img class="centered" 
src="/{{version}}/images/streams-stateful_operations.png" style="width:500pt;">
+        <figcaption style="text-align: center;"><i>Stateful transformations in 
the DSL</i></figcaption>
+    </figure>
+
+    <p>
+        We will discuss the various stateful transformations in detail in the 
subsequent sections. However, let's start
+        with a first example of a stateful application: the canonical 
WordCount algorithm.
+    </p>
+    <p>
+        WordCount example in Java 8+, using lambda expressions:
+    </p>
+    <pre class="brush: java;">
+        // We assume record values represent lines of text.  For the sake of 
this example, we ignore
+        // whatever may be stored in the record keys.
+        KStream&lt;String, String&gt; textLines = ...;
+
+        KStream&lt;String, Long&gt; wordCounts = textLines
+            // Split each text line, by whitespace, into words.  The text 
lines are the record
+            // values, i.e. we can ignore whatever data is in the record keys 
and thus invoke
+            // `flatMapValues` instead of the more generic `flatMap`.
+            .flatMapValues(value -> 
Arrays.asList(value.toLowerCase().split("\\W+")))
+            // Group the stream by word to ensure the key of the record is the 
word.
+            .groupBy((key, word) -> word)
+            // Count the occurrences of each word (record key).
+            //
+            // This will change the stream type from 
`KGroupedStream&lt;String, String&gt;` to
+            // `KTable&lt;String, Long&gt;` (word -> count).  We must provide 
a name for
+            // the resulting KTable, which will be used to name e.g. its 
associated
+            // state store and changelog topic.
+            .count("Counts")
+            // Convert the `KTable&lt;String, Long&gt;` into a 
`KStream&lt;String, Long&gt;`.
+            .toStream();
+    </pre>
+    <p>
+        WordCount example in Java 7:
+    </p>
+    <pre class="brush: java;">
+        // Code below is equivalent to the previous Java 8+ example above.
+        KStream&lt;String, String&gt; textLines = ...;
+
+        KStream&lt;String, Long&gt; wordCounts = textLines
+            .flatMapValues(new ValueMapper&lt;String, 
Iterable&lt;String&gt;&gt;() {
+                @Override
+                public Iterable&lt;String&gt; apply(String value) {
+                    return Arrays.asList(value.toLowerCase().split("\\W+"));
+                }
+            })
+            .groupBy(new KeyValueMapper&lt;String, String, String&gt;&gt;() {
+                @Override
+                public String apply(String key, String word) {
+                    return word;
+                }
+            })
+            .count("Counts")
+            .toStream();
+    </pre>
 
+    <h6><a id="streams_dsl_aggregations" 
href="#streams_dsl_aggregations">Aggregate a stream</a></h6>
     <p>
-        Either a <b>record stream</b> (defined as <code>KStream</code>) or a 
<b>changelog stream</b> (defined as <code>KTable</code> or 
<code>GlobalKTable</code>)
-        can be created as a source stream from one or more Kafka topics (for 
<code>KTable</code> and <code>GlobalKTable</code> you can only create the 
source stream
-        from a single topic).
+        Once records are grouped by key via <code>groupByKey</code> or 
<code>groupBy</code> -- and
+        thus represented as either a <code>KGroupedStream</code> or a
+        <code>KGroupedTable</code> -- they can be aggregated via an operation 
such as
+        <code>reduce</code>.
+        For windowed aggregations use 
<code>windowedBy(Windows).reduce(Reducer)</code>.
+        Aggregations are <i>key-based</i> operations, i.e.they always operate 
over records (notably record values) <i>of the same key</i>.
+        You maychoose to perform aggregations on
+        <a href="#streams_dsl_windowing">windowed</a> or non-windowed data.
     </p>
+    <table class="data-table" border="1">
+        <tbody>
+        <tr>
+            <th>Transformation</th>
+            <th>Description</th>
+        </tr>
+        <tr>
+            <td><b>Aggregate</b>: <code>KGroupedStream &rarr; KTable</code> or 
<code>KGroupedTable
+                &rarr; KTable</code></td>
+            <td>
+                <p>
+                    <b>Rolling aggregation</b>. Aggregates the values of 
(non-windowed) records by
+                    the grouped key. Aggregating is a generalization of 
<code>reduce</code> and allows, for example, the
+                    aggregate value to have a different type than the input 
values.
+                </p>
+                <p>
+                    When aggregating a grouped stream, you must provide an 
initializer (think:
+                    <code>aggValue = 0</code>) and an "adder"
+                    aggregator (think: <code>aggValue + curValue</code>). When 
aggregating a <i>grouped</i>
+                    table, you must additionally provide a "subtractor" 
aggregator (think: <code>aggValue - oldValue</code>).
+                </p>
+                <p>
+                    Several variants of <code>aggregate</code> exist, see 
Javadocs for details.
+                </p>
+                <pre class="brush: java;">
+                    KGroupedStream&lt;Bytes, String&gt; groupedStream = ...;
+                    KGroupedTable&lt;Bytes, String&gt; groupedTable = ...;
+
+                    // Java 8+ examples, using lambda expressions
+
+                    // Aggregating a KGroupedStream (note how the value type 
changes from String to Long)
+                    KTable&lt;Bytes, Long&gt; aggregatedStream = 
groupedStream.aggregate(
+                        () -> 0L, /* initializer */
+                        (aggKey, newValue, aggValue) -> aggValue + 
newValue.length(), /* adder */
+                        Serdes.Long(), /* serde for aggregate value */
+                        "aggregated-stream-store" /* state store name */);
+
+                    // Aggregating a KGroupedTable (note how the value type 
changes from String to Long)
+                    KTable&lt;Bytes, Long&gt; aggregatedTable = 
groupedTable.aggregate(
+                        () -> 0L, /* initializer */
+                        (aggKey, newValue, aggValue) -> aggValue + 
newValue.length(), /* adder */
+                        (aggKey, oldValue, aggValue) -> aggValue - 
oldValue.length(), /* subtractor */
+                        Serdes.Long(), /* serde for aggregate value */
+                        "aggregated-table-store" /* state store name */);
+
+
+                    // windowed aggregation
+                    KTable&lt;Windowed&ltBytes&gt;, Long&gt; windowedAggregate 
= groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES(5).toMillis())
+                        .aggregate(() -> 0L, /* initializer */
+                            (aggKey, newValue, aggValue) -> aggValue + 
newValue.length(), /* aggregator */
+                            Serdes.Long()) /* serde for aggregate value */
+
+
+                    // Java 7 examples
+
+                    // Aggregating a KGroupedStream (note how the value type 
changes from String to Long)
+                    KTable&lt;Bytes, Long&gt; aggregatedStream = 
groupedStream.aggregate(
+                        new Initializer&lt;Long&gt;() { /* initializer */
+                          @Override
+                          public Long apply() {
+                            return 0L;
+                          }
+                        },
+                        new Aggregator&lt;Bytes, String, Long&gt;() { /* adder 
*/
+                          @Override
+                          public Long apply(Bytes aggKey, String newValue, 
Long aggValue) {
+                            return aggValue + newValue.length();
+                          }
+                        },
+                        Serdes.Long(),
+                        "aggregated-stream-store");
+
+                    // Aggregating a KGroupedTable (note how the value type 
changes from String to Long)
+                    KTable&lt;Bytes, Long&gt; aggregatedTable = 
groupedTable.aggregate(
+                        new Initializer&lt;Long&gt;() { /* initializer */
+                          @Override
+                          public Long apply() {
+                            return 0L;
+                          }
+                        },
+                        new Aggregator&lt;Bytes, String, Long&gt;() { /* adder 
*/
+                          @Override
+                          public Long apply(Bytes aggKey, String newValue, 
Long aggValue) {
+                            return aggValue + newValue.length();
+                          }
+                        },
+                        new Aggregator&lt;Bytes, String, Long&gt;() { /* 
subtractor */
+                          @Override
+                          public Long apply(Bytes aggKey, String oldValue, 
Long aggValue) {
+                            return aggValue - oldValue.length();
+                          }
+                        },
+                        Serdes.Long(),
+                        "aggregated-table-store");
+
+                    // Windowed aggregation
+                    KTable&lt;Bytes, Long&gt; aggregatedStream = 
groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES(5).toMillis())
+                        .aggregate(
+                            new Initializer&lt;Long&gt;() { /* initializer */
+                              @Override
+                              public Long apply() {
+                                return 0L;
+                              }
+                            },
+                            new Aggregator&lt;Bytes, String, Long&gt;() { /* 
adder */
+                              @Override
+                              public Long apply(Bytes aggKey, String newValue, 
Long aggValue) {
+                                return aggValue + newValue.length();
+                              }
+                            },
+                            Serdes.Long());
+                </pre>
+                <p>
+                    Detailed behavior of <code>KGroupedStream</code>:
+                </p>
+                <ul>
+                    <li>Input records with <code>null</code> keys are ignored 
in general.</li>
+                    <li>When a record key is received for the first time, the 
initializer is called
+                        (and called before the adder).</li>
+                    <li>Whenever a record with a non-null value is received, 
the adder is called.</li>
+                </ul>
+                <p>
+                    Detailed behavior of KGroupedTable:
+                </p>
+                <ul>
+                    <li>Input records with null keys are ignored in 
general.</li>
+                    <li>When a record key is received for the first time, the 
initializer is called
+                        (and called before the adder and subtractor). Note 
that, in contrast to <code>KGroupedStream</code>, over
+                        time the initializer may be called more
+                        than once for a key as a result of having received 
input tombstone records
+                        for that key (see below).</li>
+                    <li>When the first non-<code>null</code> value is received 
for a key (think:
+                        INSERT), then only the adder is called.</li>
+                    <li>When subsequent non-<code>null</code> values are 
received for a key (think:
+                        UPDATE), then (1) the subtractor is called
+                        with the old value as stored in the table and (2) the 
adder is called with
+                        the new value of the input record
+                        that was just received. The order of execution for the 
subtractor and adder
+                        is not defined.</li>
+                    <li>When a tombstone record -- i.e. a record with a 
<code>null</code> value -- is
+                        received for a key (think: DELETE), then
+                        only the subtractor is called. Note that, whenever the 
subtractor returns a
+                    <code>null</code> value itself, then the
+                    corresponding key is removed from the resulting KTable. If 
that happens, any
+                    next input record for that key will trigger the 
initializer again.</li>
+                </ul>
+                <p>
+                    See the example at the bottom of this section for a 
visualization of the
+                    aggregation semantics.
+                </p>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Aggregate (windowed)</b>: <code>KGroupedStream &rarr; 
KTable</code></td>
+            <td>
+                <p>
+                    <b>Windowed aggregation</b>. Aggregates the values of 
records, per window, by
+                    the grouped key. Aggregating is a generalization of
+                    <code>reduce</code> and allows, for example, the aggregate 
value to have a
+                    different type than the input values.
+                </p>
+                <p>
+                    You must provide an initializer (think: <code>aggValue = 
0</code>), "adder"
+                    aggregator (think: <code>aggValue + curValue</code>),
+                    and a window. When windowing based on sessions, you must 
additionally provide a
+                    "session merger" aggregator (think:
+                    <code>mergedAggValue = leftAggValue + 
rightAggValue</code>).
+                </p>
+                <p>
+                    The windowed <code>aggregate</code> turns a 
<code>KGroupedStream
+                    &lt;K , V&gt;</code> into a windowed 
<code>KTable&lt;Windowed&lt;K&gt;, V&gt;</code>.
+                </p>
+                <p>
+                    Several variants of <code>aggregate</code> exist, see 
Javadocs for details.
+                </p>
+
+                <pre class="brush: java;">
+                    import java.util.concurrent.TimeUnit;
+                    KGroupedStream&lt;String, Long&gt; groupedStream = ...;
+
+                    // Java 8+ examples, using lambda expressions
+
+                    // Aggregating with time-based windowing (here: with 
5-minute tumbling windows)
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; 
timeWindowedAggregatedStream = groupedStream
+                        
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window 
*/
+                        .aggregate(
+                            () -> 0L, /* initializer */
+                            (aggKey, newValue, aggValue) -> aggValue + 
newValue, /* adder */
+                            Materialized.&lt;String, Long, 
WindowStore&lt;Bytes, byte[]&gt;&gt;as("time-windowed-aggregated-stream-store") 
/* state store name */
+                                .withValueSerde(Serdes.Long())); /* serde for 
aggregate value */
+
+
+                    // Aggregating with session-based windowing (here: with an 
inactivity gap of 5 minutes)
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; 
sessionizedAggregatedStream = groupedStream
+                        
.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session 
window */
+                        .aggregate(
+                            () -> 0L, /* initializer */
+                            (aggKey, newValue, aggValue) -> aggValue + 
newValue, /* adder */
+                            (aggKey, leftAggValue, rightAggValue) -> 
leftAggValue + rightAggValue, /* session merger */
+                            Materialized.&lt;String, Long, 
SessionStore&lt;Bytes, byte[]&gt;&gt;as("sessionized-aggregated-stream-store") 
/* state store name */
+                                .withValueSerde(Serdes.Long())); /* serde for 
aggregate value */
+
+                    // Java 7 examples
+
+                    // Aggregating with time-based windowing (here: with 
5-minute tumbling windows)
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; 
timeWindowedAggregatedStream = groupedStream
+                        
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window 
*/
+                        .aggregate(
+                            new Initializer&lt;Long&gt;() { /* initializer */
+                              @Override
+                              public Long apply() {
+                                return 0L;
+                              }
+                            },
+                            new Aggregator&lt;String, Long, Long&gt;() { /* 
adder */
+                              @Override
+                              public Long apply(String aggKey, Long newValue, 
Long aggValue) {
+                                return aggValue + newValue;
+                              }
+                            },
+                            Materialized.&lt;String, Long, 
WindowStore&lt;Bytes, byte[]&gt;&gt;as("time-windowed-aggregated-stream-store") 
/* state store name */
+                                    .withValueSerde(Serdes.Long()) /* serde 
for aggregate value */
+                    );
+
+                    // Aggregating with session-based windowing (here: with an 
inactivity gap of 5 minutes)
+                    KTable&lt;Windowed&lt;String&gt;, Long&gt; 
sessionizedAggregatedStream = groupedStream
+                        
.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session 
window */
+                        .aggregate(
+                            new Initializer&lt;Long&gt;() { /* initializer */
+                              @Override
+                              public Long apply() {
+                                return 0L;
+                              }
+                            },
+                            new Aggregator&lt;String, Long, Long&gt;() { /* 
adder */
+                              @Override
+                              public Long apply(String aggKey, Long newValue, 
Long aggValue) {
+                                return aggValue + newValue;
+                              }
+                            },
+                            new Merger&lt;String, Long&gt;() { /* session 
merger */
+                              @Override
+                              public Long apply(String aggKey, Long 
leftAggValue, Long rightAggValue) {
+                                return rightAggValue + leftAggValue;
+                              }
+                            },
+                            Materialized.&lt;String, Long, 
SessionStore&lt;Bytes, byte[]&gt;&gt;as("sessionized-aggregated-stream-store") 
/* state store name */
+                                .withValueSerde(Serdes.Long()) /* serde for 
aggregate value */
+                    );
+                </pre>
+
+                <p>
+                    Detailed behavior:
+                </p>
+                <ul>
+                    <li>The windowed aggregate behaves similar to the rolling 
aggregate described
+                        above. The additional twist is that the behavior 
applies per window.</li>
+                    <li>Input records with <code>null</code> keys are ignored 
in general.</li>
+                    <li>When a record key is received for the first time for a 
given window, the
+                        initializer is called (and called before the 
adder).</li>
+                    <li>Whenever a record with a non-<code>null</code> value 
is received for a given window, the
+                        adder is called.
+                        (Note: As a result of a known bug in Kafka 0.11.0.0, 
the adder is currently
+                        also called for <code>null</code> values. You can work 
around this, for example, by
+                        manually filtering out <code>null</code> values prior 
to grouping the stream.)</li>
+                    <li>When using session windows: the session merger is 
called whenever two
+                        sessions are being merged.</li>
+                </ul>
+                <p>
+                See the example at the bottom of this section for a 
visualization of the aggregation semantics.
+                </p>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Count</b>: <code>KGroupedStream &rarr; KTable or 
KGroupedTable &rarr; KTable</code></td>
+            <td>
+                <p>
+                    <b>Rolling aggregation</b>. Counts the number of records 
by the grouped key.
+                    Several variants of <code>count</code> exist, see Javadocs 
for details.
+                </p>
+                <pre class="brush: java;">
+                    KGroupedStream&lt;String, Long&gt; groupedStream = ...;
+                    KGroupedTable&lt;String, Long&gt; groupedTable = ...;
+
+                    // Counting a KGroupedStream
+                    KTable&lt;String, Long&gt; aggregatedStream = 
groupedStream.count();
+
+                    // Counting a KGroupedTable
+                    KTable&lt;String, Long&gt; aggregatedTable = 
groupedTable.count();
+                </pre>
+                <p>
+                    Detailed behavior for <code>KGroupedStream</code>:
+                </p>
+                <ul>
+                    <li>Input records with null keys or values are 
ignored.</li>
+                </ul>
+                <p>
+                    Detailed behavior for <code>KGroupedTable</code>:
+                </p>
+                <ul>
+                    <li>Input records with <code>null</code> keys are ignored. 
Records with <code>null</code>
+                        values are not ignored but interpreted as "tombstones" 
for the corresponding key, which
+                        indicate the deletion of the key from the table.</li>
+                </ul>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Count (Windowed)</b>: <code>KGroupedStream &rarr; 
KTable</code></td>
+            <td>
+                <p>
+                    Windowed aggregation. Counts the number of records, per 
window, by the grouped key.
+                </p>
+                <p>
+                    The windowed <code>count</code> turns a 
<code>KGroupedStream<&lt;K, V&gt;</code> into a windowed 
<code>KTable&lt;Windowed&lt;K&gt;, V&gt;</code>.
+                </p>
+                <p>
+                    Several variants of count exist, see Javadocs for d

<TRUNCATED>

Reply via email to