Repository: kafka-site
Updated Branches:
  refs/heads/asf-site 9a36603f4 -> a7c3675d3


http://git-wip-us.apache.org/repos/asf/kafka-site/blob/a7c3675d/0102/streams.html
----------------------------------------------------------------------
diff --git a/0102/streams.html b/0102/streams.html
new file mode 100644
index 0000000..dec17ef
--- /dev/null
+++ b/0102/streams.html
@@ -0,0 +1,424 @@
+<!--~
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~-->
+
+<script><!--#include virtual="js/templateData.js" --></script>
+
+<script id="streams-template" type="text/x-handlebars-template">
+    <h1>Streams</h1>
+
+        <ol class="toc">
+            <li>
+                <a href="#streams_overview">Overview</a>
+            </li>
+            <li>
+                <a href="#streams_developer">Developer guide</a>
+                <ul>
+                    <li><a href="#streams_concepts">Core concepts</a>
+                    <li><a href="#streams_processor">Low-level processor 
API</a>
+                    <li><a href="#streams_dsl">High-level streams DSL</a>
+                </ul>
+            </li>
+        </ol>
+
+        <h2><a id="streams_overview" href="#overview">Overview</a></h2>
+
+        <p>
+        Kafka Streams is a client library for processing and analyzing data 
stored in Kafka and either write the resulting data back to Kafka or send the 
final output to an external system. It builds upon important stream processing 
concepts such as properly distinguishing between event time and processing 
time, windowing support, and simple yet efficient management of application 
state.
+        Kafka Streams has a <b>low barrier to entry</b>: You can quickly write 
and run a small-scale proof-of-concept on a single machine; and you only need 
to run additional instances of your application on multiple machines to scale 
up to high-volume production workloads. Kafka Streams transparently handles the 
load balancing of multiple instances of the same application by leveraging 
Kafka's parallelism model.
+        </p>
+        <p>
+        Some highlights of Kafka Streams:
+        </p>
+
+        <ul>
+            <li>Designed as a <b>simple and lightweight client library</b>, 
which can be easily embedded in any Java application and integrated with any 
existing packaging, deployment and operational tools that users have for their 
streaming applications.</li>
+            <li>Has <b>no external dependencies on systems other than Apache 
Kafka itself</b> as the internal messaging layer; notably, it uses Kafka's 
partitioning model to horizontally scale processing while maintaining strong 
ordering guarantees.</li>
+            <li>Supports <b>fault-tolerant local state</b>, which enables very 
fast and efficient stateful operations like joins and windowed 
aggregations.</li>
+            <li>Employs <b>one-record-at-a-time processing</b> to achieve low 
processing latency, and supports <b>event-time based windowing 
operations</b>.</li>
+            <li>Offers necessary stream processing primitives, along with a 
<b>high-level Streams DSL</b> and a <b>low-level Processor API</b>.</li>
+
+        </ul>
+
+        <h2><a id="streams_developer" href="#streams_developer">Developer 
Guide</a></h2>
+
+        <p>
+        There is a <a href="#quickstart_kafkastreams">quickstart</a> example 
that provides how to run a stream processing program coded in the Kafka Streams 
library.
+        This section focuses on how to write, configure, and execute a Kafka 
Streams application.
+        </p>
+
+        <h4><a id="streams_concepts" href="#streams_concepts">Core 
Concepts</a></h4>
+
+        <p>
+        We first summarize the key concepts of Kafka Streams.
+        </p>
+
+        <h5><a id="streams_topology" href="#streams_topology">Stream 
Processing Topology</a></h5>
+
+        <ul>
+            <li>A <b>stream</b> is the most important abstraction provided by 
Kafka Streams: it represents an unbounded, continuously updating data set. A 
stream is an ordered, replayable, and fault-tolerant sequence of immutable data 
records, where a <b>data record</b> is defined as a key-value pair.</li>
+            <li>A <b>stream processing application</b> written in Kafka 
Streams defines its computational logic through one or more <b>processor 
topologies</b>, where a processor topology is a graph of stream processors 
(nodes) that are connected by streams (edges).</li>
+            <li>A <b>stream processor</b> is a node in the processor topology; 
it represents a processing step to transform data in streams by receiving one 
input record at a time from its upstream processors in the topology, applying 
its operation to it, and may subsequently producing one or more output records 
to its downstream processors.</li>
+        </ul>
+
+        <p>
+        Kafka Streams offers two ways to define the stream processing 
topology: the <a href="#streams_dsl"><b>Kafka Streams DSL</b></a> provides
+        the most common data transformation operations such as 
<code>map</code> and <code>filter</code>; the lower-level <a 
href="#streams_processor"><b>Processor API</b></a> allows
+        developers define and connect custom processors as well as to interact 
with <a href="#streams_state">state stores</a>.
+        </p>
+
+        <h5><a id="streams_time" href="#streams_time">Time</a></h5>
+
+        <p>
+        A critical aspect in stream processing is the notion of <b>time</b>, 
and how it is modeled and integrated.
+        For example, some operations such as <b>windowing</b> are defined 
based on time boundaries.
+        </p>
+        <p>
+        Common notions of time in streams are:
+        </p>
+
+        <ul>
+            <li><b>Event time</b> - The point in time when an event or data 
record occurred, i.e. was originally created "at the source".</li>
+            <li><b>Processing time</b> - The point in time when the event or 
data record happens to be processed by the stream processing application, i.e. 
when the record is being consumed. The processing time may be milliseconds, 
hours, or days etc. later than the original event time.</li>
+            <li><b>Ingestion time</b> - The point in time when an event or 
data record is stored in a topic partition by a Kafka broker. The difference to 
event time is that this ingestion timestamp is generated when the record is 
appended to the target topic by the Kafka broker, not when the record is 
created "at the source". The difference to processing time is that processing 
time is when the stream processing application processes the record. For 
example, if a record is never processed, there is no notion of processing time 
for it, but it still has an ingestion time.
+        </ul>
+        <p>
+        The choice between event-time and ingestion-time is actually done 
through the configuration of Kafka (not Kafka Streams): From Kafka 0.10.x 
onwards, timestamps are automatically embedded into Kafka messages. Depending 
on Kafka's configuration these timestamps represent event-time or 
ingestion-time. The respective Kafka configuration setting can be specified on 
the broker level or per topic. The default timestamp extractor in Kafka Streams 
will retrieve these embedded timestamps as-is. Hence, the effective time 
semantics of your application depend on the effective Kafka configuration for 
these embedded timestamps.
+        </p>
+        <p>
+        Kafka Streams assigns a <b>timestamp</b> to every data record
+        via the <code>TimestampExtractor</code> interface.
+        Concrete implementations of this interface may retrieve or compute 
timestamps based on the actual contents of data records such as an embedded 
timestamp field
+        to provide event-time semantics, or use any other approach such as 
returning the current wall-clock time at the time of processing,
+        thereby yielding processing-time semantics to stream processing 
applications.
+        Developers can thus enforce different notions of time depending on 
their business needs. For example,
+        per-record timestamps describe the progress of a stream with regards 
to time (although records may be out-of-order within the stream) and
+        are leveraged by time-dependent operations such as joins.
+        </p>
+
+        <p>
+        Finally, whenever a Kafka Streams application writes records to Kafka, 
then it will also assign timestamps to these new records. The way the 
timestamps are assigned depends on the context:
+        <ul>
+            <li> When new output records are generated via processing some 
input record, for example, <code>context.forward()</code> triggered in the 
<code>process()</code> function call, output record timestamps are inherited 
from input record timestamps directly.</li>
+            <li> When new output records are generated via periodic functions 
such as <code>punctuate()</code>, the output record timestamp is defined as the 
current internal time (obtained through <code>context.timestamp()</code>) of 
the stream task.</li>
+            <li> For aggregations, the timestamp of a resulting aggregate 
update record will be that of the latest arrived input record that triggered 
the update.</li>
+        </ul>
+        </p>
+
+        <h5><a id="streams_state" href="#streams_state">States</a></h5>
+
+        <p>
+        Some stream processing applications don't require state, which means 
the processing of a message is independent from
+        the processing of all other messages.
+        However, being able to maintain state opens up many possibilities for 
sophisticated stream processing applications: you
+        can join input streams, or group and aggregate data records. Many such 
stateful operators are provided by the <a href="#streams_dsl"><b>Kafka Streams 
DSL</b></a>.
+        </p>
+        <p>
+        Kafka Streams provides so-called <b>state stores</b>, which can be 
used by stream processing applications to store and query data.
+        This is an important capability when implementing stateful operations.
+        Every task in Kafka Streams embeds one or more state stores that can 
be accessed via APIs to store and query data required for processing.
+        These state stores can either be a persistent key-value store, an 
in-memory hashmap, or another convenient data structure.
+        Kafka Streams offers fault-tolerance and automatic recovery for local 
state stores.
+        </p>
+        <p>
+        Kafka Streams allows direct read-only queries of the state stores by 
methods, threads, processes or applications external to the stream processing 
application that created the state stores. This is provided through a feature 
called <b>Interactive Queries</b>. All stores are named and Interactive Queries 
exposes only the read operations of the underlying implementation. 
+        </p>
+        <br>
+        <p>
+        As we have mentioned above, the computational logic of a Kafka Streams 
application is defined as a <a href="#streams_topology">processor topology</a>.
+        Currently Kafka Streams provides two sets of APIs to define the 
processor topology, which will be described in the subsequent sections.
+        </p>
+
+        <h4><a id="streams_processor" href="#streams_processor">Low-Level 
Processor API</a></h4>
+
+        <h5><a id="streams_processor_process" 
href="#streams_processor_process">Processor</a></h5>
+
+        <p>
+        Developers can define their customized processing logic by 
implementing the <code>Processor</code> interface, which
+        provides <code>process</code> and <code>punctuate</code> methods. The 
<code>process</code> method is performed on each
+        of the received record; and the <code>punctuate</code> method is 
performed periodically based on elapsed time.
+        In addition, the processor can maintain the current 
<code>ProcessorContext</code> instance variable initialized in the
+        <code>init</code> method, and use the context to schedule the 
punctuation period (<code>context().schedule</code>), to
+        forward the modified / new key-value pair to downstream processors 
(<code>context().forward</code>), to commit the current
+        processing progress (<code>context().commit</code>), etc.
+        </p>
+
+        <pre>
+            public class MyProcessor extends Processor<String, String> {
+                private ProcessorContext context;
+                private KeyValueStore<String, Integer> kvStore;
+
+                @Override
+                @SuppressWarnings("unchecked")
+                public void init(ProcessorContext context) {
+                    this.context = context;
+                    this.context.schedule(1000);
+                    this.kvStore = (KeyValueStore<String, Integer>) 
context.getStateStore("Counts");
+                }
+
+                @Override
+                public void process(String dummy, String line) {
+                    String[] words = line.toLowerCase().split(" ");
+
+                    for (String word : words) {
+                        Integer oldValue = this.kvStore.get(word);
+
+                        if (oldValue == null) {
+                            this.kvStore.put(word, 1);
+                        } else {
+                            this.kvStore.put(word, oldValue + 1);
+                        }
+                    }
+                }
+
+                @Override
+                public void punctuate(long timestamp) {
+                    KeyValueIterator<String, Integer> iter = 
this.kvStore.all();
+
+                    while (iter.hasNext()) {
+                        KeyValue<String, Integer> entry = iter.next();
+                        context.forward(entry.key, entry.value.toString());
+                    }
+
+                    iter.close();
+                    context.commit();
+                }
+
+                @Override
+                public void close() {
+                    this.kvStore.close();
+                }
+            };
+        </pre>
+
+        <p>
+        In the above implementation, the following actions are performed:
+
+        <ul>
+            <li>In the <code>init</code> method, schedule the punctuation 
every 1 second and retrieve the local state store by its name "Counts".</li>
+            <li>In the <code>process</code> method, upon each received record, 
split the value string into words, and update their counts into the state store 
(we will talk about this feature later in the section).</li>
+            <li>In the <code>punctuate</code> method, iterate the local state 
store and send the aggregated counts to the downstream processor, and commit 
the current stream state.</li>
+        </ul>
+        </p>
+
+        <h5><a id="streams_processor_topology" 
href="#streams_processor_topology">Processor Topology</a></h5>
+
+        <p>
+        With the customized processors defined in the Processor API, 
developers can use the <code>TopologyBuilder</code> to build a processor 
topology
+        by connecting these processors together:
+
+        <pre>
+            TopologyBuilder builder = new TopologyBuilder();
+
+            builder.addSource("SOURCE", "src-topic")
+
+                .addProcessor("PROCESS1", MyProcessor1::new /* the 
ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")
+                .addProcessor("PROCESS2", MyProcessor2::new /* the 
ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1")
+                .addProcessor("PROCESS3", MyProcessor3::new /* the 
ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
+
+                .addSink("SINK1", "sink-topic1", "PROCESS1")
+                .addSink("SINK2", "sink-topic2", "PROCESS2")
+                .addSink("SINK3", "sink-topic3", "PROCESS3");
+        </pre>
+
+        There are several steps in the above code to build the topology, and 
here is a quick walk through:
+
+        <ul>
+            <li>First of all a source node named "SOURCE" is added to the 
topology using the <code>addSource</code> method, with one Kafka topic 
"src-topic" fed to it.</li>
+            <li>Three processor nodes are then added using the 
<code>addProcessor</code> method; here the first processor is a child of the 
"SOURCE" node, but is the parent of the other two processors.</li>
+            <li>Finally three sink nodes are added to complete the topology 
using the <code>addSink</code> method, each piping from a different parent 
processor node and writing to a separate topic.</li>
+        </ul>
+        </p>
+
+        <h5><a id="streams_processor_statestore" 
href="#streams_processor_statestore">Local State Store</a></h5>
+
+        <p>
+        Note that the Processor API is not limited to only accessing the 
current records as they arrive, but can also maintain local state stores
+        that keep recently arrived records to use in stateful processing 
operations such as aggregation or windowed joins.
+        To take advantage of this local states, developers can use the 
<code>TopologyBuilder.addStateStore</code> method when building the
+        processor topology to create the local state and associate it with the 
processor nodes that needs to access it; or they can connect a created
+        local state store with the existing processor nodes through 
<code>TopologyBuilder.connectProcessorAndStateStores</code>.
+
+        <pre>
+            TopologyBuilder builder = new TopologyBuilder();
+
+            builder.addSource("SOURCE", "src-topic")
+
+                .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
+                // create the in-memory state store "COUNTS" associated with 
processor "PROCESS1"
+                
.addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(),
 "PROCESS1")
+                .addProcessor("PROCESS2", MyProcessor3::new /* the 
ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
+                .addProcessor("PROCESS3", MyProcessor3::new /* the 
ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
+
+                // connect the state store "COUNTS" with processor "PROCESS2"
+                .connectProcessorAndStateStores("PROCESS2", "COUNTS");
+
+                .addSink("SINK1", "sink-topic1", "PROCESS1")
+                .addSink("SINK2", "sink-topic2", "PROCESS2")
+                .addSink("SINK3", "sink-topic3", "PROCESS3");
+        </pre>
+
+        </p>
+
+        In the next section we present another way to build the processor 
topology: the Kafka Streams DSL.
+
+        <h4><a id="streams_dsl" href="#streams_dsl">High-Level Streams 
DSL</a></h4>
+
+        To build a processor topology using the Streams DSL, developers can 
apply the <code>KStreamBuilder</code> class, which is extended from the 
<code>TopologyBuilder</code>.
+        A simple example is included with the source code for Kafka in the 
<code>streams/examples</code> package. The rest of this section will walk
+        through some code to demonstrate the key steps in creating a topology 
using the Streams DSL, but we recommend developers to read the full example 
source
+        codes for details. 
+
+        <h5><a id="streams_kstream_ktable" 
href="#streams_kstream_ktable">KStream and KTable</a></h5>
+        The DSL uses two main abstractions. A <b>KStream</b> is an abstraction 
of a record stream, where each data record represents a self-contained datum in 
the unbounded data set. A <b>KTable</b> is an abstraction of a changelog 
stream, where each data record represents an update. More precisely, the value 
in a data record is considered to be an update of the last value for the same 
record key, if any (if a corresponding key doesn't exist yet, the update will 
be considered a create). To illustrate the difference between KStreams and 
KTables, let’s imagine the following two data records are being sent to the 
stream: <code>("alice", 1) --> ("alice", 3)</code>. If these records a KStream 
and the stream processing application were to sum the values it would return 
<code>4</code>. If these records were a KTable, the return would be 
<code>3</code>, since the last record would be considered as an update.
+
+
+
+        <h5><a id="streams_dsl_source" href="#streams_dsl_source">Create 
Source Streams from Kafka</a></h5>
+
+        <p>
+        Either a <b>record stream</b> (defined as <code>KStream</code>) or a 
<b>changelog stream</b> (defined as <code>KTable</code>)
+        can be created as a source stream from one or more Kafka topics (for 
<code>KTable</code> you can only create the source stream
+        from a single topic).
+        </p>
+
+        <pre>
+            KStreamBuilder builder = new KStreamBuilder();
+
+            KStream<String, GenericRecord> source1 = builder.stream("topic1", 
"topic2");
+            KTable<String, GenericRecord> source2 = builder.table("topic3", 
"stateStoreName");
+        </pre>
+
+        <h5><a id="streams_dsl_windowing" 
href="#streams_dsl_windowing">Windowing a stream</a></h5>
+        A stream processor may need to divide data records into time buckets, 
i.e. to <b>window</b> the stream by time. This is usually needed for join and 
aggregation operations, etc. Kafka Streams currently defines the following 
types of windows:
+        <ul>
+        <li><b>Hopping time windows</b> are windows based on time intervals. 
They model fixed-sized, (possibly) overlapping windows. A hopping window is 
defined by two properties: the window's size and its advance interval (aka 
"hop"). The advance interval specifies by how much a window moves forward 
relative to the previous one. For example, you can configure a hopping window 
with a size 5 minutes and an advance interval of 1 minute. Since hopping 
windows can overlap a data record may belong to more than one such windows.</li>
+        <li><b>Tumbling time windows</b> are a special case of hopping time 
windows and, like the latter, are windows based on time intervals. They model 
fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by 
a single property: the window's size. A tumbling window is a hopping window 
whose window size is equal to its advance interval. Since tumbling windows 
never overlap, a data record will belong to one and only one window.</li>
+        <li><b>Sliding windows</b> model a fixed-size window that slides 
continuously over the time axis; here, two data records are said to be included 
in the same window if the difference of their timestamps is within the window 
size. Thus, sliding windows are not aligned to the epoch, but on the data 
record timestamps. In Kafka Streams, sliding windows are used only for join 
operations, and can be specified through the <code>JoinWindows</code> 
class.</li>
+        </ul>
+
+        <h5><a id="streams_dsl_joins" href="#streams_dsl_joins">Joins</a></h5>
+        A <b>join</b> operation merges two streams based on the keys of their 
data records, and yields a new stream. A join over record streams usually needs 
to be performed on a windowing basis because otherwise the number of records 
that must be maintained for performing the join may grow indefinitely. In Kafka 
Streams, you may perform the following join operations:
+        <ul>
+        <li><b>KStream-to-KStream Joins</b> are always windowed joins, since 
otherwise the memory and state required to compute the join would grow 
infinitely in size. Here, a newly received record from one of the streams is 
joined with the other stream's records within the specified window interval to 
produce one result for each matching pair based on user-provided 
<code>ValueJoiner</code>. A new <code>KStream</code> instance representing the 
result stream of the join is returned from this operator.</li>
+        
+        <li><b>KTable-to-KTable Joins</b> are join operations designed to be 
consistent with the ones in relational databases. Here, both changelog streams 
are materialized into local state stores first. When a new record is received 
from one of the streams, it is joined with the other stream's materialized 
state stores to produce one result for each matching pair based on 
user-provided ValueJoiner. A new <code>KTable</code> instance representing the 
result stream of the join, which is also a changelog stream of the represented 
table, is returned from this operator.</li>
+        <li><b>KStream-to-KTable Joins</b> allow you to perform table lookups 
against a changelog stream (<code>KTable</code>) upon receiving a new record 
from another record stream (KStream). An example use case would be to enrich a 
stream of user activities (<code>KStream</code>) with the latest user profile 
information (<code>KTable</code>). Only records received from the record stream 
will trigger the join and produce results via <code>ValueJoiner</code>, not 
vice versa (i.e., records received from the changelog stream will be used only 
to update the materialized state store). A new <code>KStream</code> instance 
representing the result stream of the join is returned from this operator.</li>
+        </ul>
+
+        Depending on the operands the following join operations are supported: 
<b>inner joins</b>, <b>outer joins</b> and <b>left joins</b>. Their semantics 
are similar to the corresponding operators in relational databases.
+        a
+        <h5><a id="streams_dsl_transform" 
href="#streams_dsl_transform">Transform a stream</a></h5>
+
+        <p>
+        There is a list of transformation operations provided for 
<code>KStream</code> and <code>KTable</code> respectively.
+        Each of these operations may generate either one or more 
<code>KStream</code> and <code>KTable</code> objects and
+        can be translated into one or more connected processors into the 
underlying processor topology.
+        All these transformation methods can be chained together to compose a 
complex processor topology.
+        Since <code>KStream</code> and <code>KTable</code> are strongly typed, 
all these transformation operations are defined as
+        generics functions where users could specify the input and output data 
types.
+        </p>
+
+        <p>
+        Among these transformations, <code>filter</code>, <code>map</code>, 
<code>mapValues</code>, etc, are stateless
+        transformation operations and can be applied to both 
<code>KStream</code> and <code>KTable</code>,
+        where users can usually pass a customized function to these functions 
as a parameter, such as <code>Predicate</code> for <code>filter</code>,
+        <code>KeyValueMapper</code> for <code>map</code>, etc:
+
+        </p>
+
+        <pre>
+            // written in Java 8+, using lambda expressions
+            KStream<String, GenericRecord> mapped = source1.mapValue(record -> 
record.get("category"));
+        </pre>
+
+        <p>
+        Stateless transformations, by definition, do not depend on any state 
for processing, and hence implementation-wise
+        they do not require a state store associated with the stream 
processor; Stateful transformations, on the other hand,
+        require accessing an associated state for processing and producing 
outputs.
+        For example, in <code>join</code> and <code>aggregate</code> 
operations, a windowing state is usually used to store all the received records
+        within the defined window boundary so far. The operators can then 
access these accumulated records in the store and compute
+        based on them.
+        </p>
+
+        <pre>
+            // written in Java 8+, using lambda expressions
+            KTable<Windowed<String>, Long> counts = 
source1.groupByKey().aggregate(
+                () -> 0L,  // initial value
+                (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating 
value
+                TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals 
in milliseconds
+                Serdes.Long() // serde for aggregated value
+            );
+
+            KStream<String, String> joined = source1.leftJoin(source2,
+                (record1, record2) -> record1.get("user") + "-" + 
record2.get("region");
+            );
+        </pre>
+
+        <h5><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams 
back to Kafka</a></h5>
+
+        <p>
+        At the end of the processing, users can choose to (continuously) write 
the final resulted streams back to a Kafka topic through
+        <code>KStream.to</code> and <code>KTable.to</code>.
+        </p>
+
+        <pre>
+            joined.to("topic4");
+        </pre>
+
+        If your application needs to continue reading and processing the 
records after they have been materialized
+        to a topic via <code>to</code> above, one option is to construct a new 
stream that reads from the output topic;
+        Kafka Streams provides a convenience method called 
<code>through</code>:
+
+        <pre>
+            // equivalent to
+            //
+            // joined.to("topic4");
+            // materialized = builder.stream("topic4");
+            KStream<String, String> materialized = joined.through("topic4");
+        </pre>
+
+
+        <br>
+        <p>
+        Besides defining the topology, developers will also need to configure 
their applications
+        in <code>StreamsConfig</code> before running it. A complete list of
+        Kafka Streams configs can be found <a 
href="#streamsconfigs"><b>here</b></a>.
+        </p>
+</script>
+
+<!--#include virtual="../includes/_header.htm" -->
+<!--#include virtual="../includes/_top.htm" -->
+<div class="content documentation documentation--current">
+       <!--#include virtual="../includes/_nav.htm" -->
+       <div class="right">
+               <!--#include virtual="../includes/_docs_banner.htm" -->
+        <ul class="breadcrumbs">
+            <li><a href="/documentation">Documentation</a></li>
+        </ul>
+        <div class="p-streams"></div>
+    </div>
+</div>
+<!--#include virtual="../includes/_footer.htm" -->
+<script>
+$(function() {
+  // Show selected style on nav item
+  $('.b-nav__streams').addClass('selected');
+
+  // Display docs subnav items
+  $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
+});
+</script>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/a7c3675d/0102/upgrade.html
----------------------------------------------------------------------
diff --git a/0102/upgrade.html b/0102/upgrade.html
new file mode 100644
index 0000000..ebc61db
--- /dev/null
+++ b/0102/upgrade.html
@@ -0,0 +1,367 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 
0.10.0.x or 0.10.1.x to 0.10.2.0</a></h4>
+<p>0.10.2.0 has wire protocol changes. By following the recommended rolling 
upgrade plan below, you guarantee no downtime during the upgrade.
+However, please review the <a href="#upgrade_1020_notable">notable changes in 
0.10.2.0</a> before upgrading.
+</p>
+
+<p>Starting with version 0.10.2, Java clients (producer and consumer) have 
acquired the ability to communicate with older brokers. Version 0.10.2
+clients can talk to version 0.10.0 or newer brokers. However, if your brokers 
are older than 0.10.0, you must upgrade all the brokers in the
+Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 
0.8.x and newer clients.
+</p>
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+    <li> Update server.properties file on all brokers and add the following 
properties:
+        <ul>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 
0.8.2, 0.9.0, 0.10.0 or 0.10.1).</li>
+            <li>log.message.format.version=CURRENT_KAFKA_VERSION  (See <a 
href="#upgrade_10_performance_impact">potential performance impact following 
the upgrade</a> for the details on what this configuration does.)
+        </ul>
+    </li>
+    <li> Upgrade the brokers one at a time: shut down the broker, update the 
code, and restart it. </li>
+    <li> Once the entire cluster is upgraded, bump the protocol version by 
editing inter.broker.protocol.version and setting it to 0.10.2. </li>
+    <li> If your previous message format is 0.10.0, change 
log.message.format.version to 0.10.2 (this is a no-op as the message format is 
the same for 0.10.0, 0.10.1 and 0.10.2).
+        If your previous message format version is lower than 0.10.0, do not 
change log.message.format.version yet - this parameter should only change once 
all consumers have been upgraded to 0.10.0.0 or later.</li>
+    <li> Restart the brokers one by one for the new protocol version to take 
effect. </li>
+    <li> If log.message.format.version is still lower than 0.10.0 at this 
point, wait until all consumers have been upgraded to 0.10.0 or later,
+        then change log.message.format.version to 0.10.2 on each broker and 
restart them one by one. </li>
+</ol>
+
+<p><b>Note:</b> If you are willing to accept downtime, you can simply take all 
the brokers down, update the code and start all of them. They will start with 
the new protocol by default.
+
+<p><b>Note:</b> Bumping the protocol version and restarting can be done any 
time after the brokers were upgraded. It does not have to be immediately after.
+
+<ol>
+    <li>Upgrading a Kafka Streams Applications:
+        <ul>
+            <li>You need to recompile your code. Just swapping the jar file 
will not work and will break your appliation.</li>
+            <li>If you use a custom timestamp extractor, you will need to 
update this code, because the <code>TimestampExtractor</code> interface was 
changed.</li>
+        </ul>
+    </li>
+</ol>
+
+<h5><a id="upgrade_1020_notable" href="#upgrade_1020_notable">Notable changes 
in 0.10.2.0</a></h5>
+<ul>
+    <li>The Java clients (producer and consumer) have acquired the ability to 
communicate with older brokers. Version 0.10.2 clients
+        can talk to version 0.10.0 or newer brokers. Note that some features 
are not available or are limited when older brokers
+        are used. </li>
+    <li>Several methods on the Java consumer may now throw 
<code>InterruptException</code> if the calling thread is interrupted.
+        Please refer to the <code>KafkaConsumer</code> Javadoc for a more 
in-depth explanation of this change.</li>
+    <li>Java consumer now shuts down gracefully. By default, the consumer 
waits up to 30 seconds to complete pending requests.
+        A new close API with timeout has been added to 
<code>KafkaConsumer</code> to control the maximum wait time.</li>
+    <li>Multiple regular expressions separated by commas can be passed to 
MirrorMaker with the new Java consumer via the --whitelist option. This
+        makes the behaviour consistent with MirrorMaker when used the old 
Scala consumer.</li>
+    <li>The Zookeeper dependency was removed from the Streams API. The Streams 
API now uses the Kafka protocol to manage internal topics instead of
+        modifying Zookeeper directly. This eliminates the need for privileges 
to access Zookeeper directly and "StreamsConfig.ZOOKEEPER_CONFIG"
+        should not be set in the Streams app any more. If the Kafka cluster is 
secured, Streams apps must have the required security privileges to create new 
topics.</li>
+    <li>Several new fields including "security.protocol", 
"connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" and 
"request.timeout.ms" were added to
+        StreamsConfig class. User should pay attenntion to the default values 
and set these if needed. For more details please refer to <a 
id="streamsconfigs" href="#streamsconfigs">3.5 Kafka Streams Configs</a>.</li>
+</ul>
+
+<h5><a id="upgrade_1020_new_protocols" href="#upgrade_1020_new_protocols">New 
Protocol Versions</a></h5>
+<ul>
+    <li> <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update";>KIP-88</a>:
 OffsetFetchRequest v2 supports retrieval of offsets for all topics if the 
<code>topics</code> array is set to <code>null</code>. </li>
+    <li> <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update";>KIP-88</a>:
 OffsetFetchResponse v2 introduces a top-level <code>error_code</code> field. 
</li>
+    <li> <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic";>KIP-103</a>:
 UpdateMetadataRequest v3 introduces a <code>listener_name</code> field to the 
elements of the <code>end_points</code> array. </li>
+    <li> <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-108%3A+Create+Topic+Policy";>KIP-108</a>:
 CreateTopicsRequest v1 introduces a <code>validate_only</code> field. </li>
+    <li> <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-108%3A+Create+Topic+Policy";>KIP-108</a>:
 CreateTopicsResponse v1 introduces an <code>error_message</code> field to the 
elements of the <code>topic_errors</code> array. </li>
+</ul>
+
+<h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.8.x, 0.9.x or 
0.10.0.X to 0.10.1.0</a></h4>
+0.10.1.0 has wire protocol changes. By following the recommended rolling 
upgrade plan below, you guarantee no downtime during the upgrade.
+However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking 
changes in 0.10.1.0</a> before upgrade.
+<br>
+Note: Because new protocols are introduced, it is important to upgrade your 
Kafka clusters before upgrading your clients (i.e. 0.10.1.x clients
+only support 0.10.1.x or later brokers while 0.10.1.x brokers also support 
older clients).
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+    <li> Update server.properties file on all brokers and add the following 
properties:
+        <ul>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 
0.8.2.0, 0.9.0.0 or 0.10.0.0).</li>
+            <li>log.message.format.version=CURRENT_KAFKA_VERSION  (See <a 
href="#upgrade_10_performance_impact">potential performance impact following 
the upgrade</a> for the details on what this configuration does.)
+        </ul>
+    </li>
+    <li> Upgrade the brokers one at a time: shut down the broker, update the 
code, and restart it. </li>
+    <li> Once the entire cluster is upgraded, bump the protocol version by 
editing inter.broker.protocol.version and setting it to 0.10.1.0. </li>
+    <li> If your previous message format is 0.10.0, change 
log.message.format.version to 0.10.1 (this is a no-op as the message format is 
the same for both 0.10.0 and 0.10.1).
+         If your previous message format version is lower than 0.10.0, do not 
change log.message.format.version yet - this parameter should only change once 
all consumers have been upgraded to 0.10.0.0 or later.</li>
+    <li> Restart the brokers one by one for the new protocol version to take 
effect. </li>
+    <li> If log.message.format.version is still lower than 0.10.0 at this 
point, wait until all consumers have been upgraded to 0.10.0 or later,
+         then change log.message.format.version to 0.10.1 on each broker and 
restart them one by one. </li>
+</ol>
+
+<p><b>Note:</b> If you are willing to accept downtime, you can simply take all 
the brokers down, update the code and start all of them. They will start with 
the new protocol by default.
+
+<p><b>Note:</b> Bumping the protocol version and restarting can be done any 
time after the brokers were upgraded. It does not have to be immediately after.
+
+<h5><a id="upgrade_10_1_breaking" href="#upgrade_10_1_breaking">Potential 
breaking changes in 0.10.1.0</a></h5>
+<ul>
+    <li> The log retention time is no longer based on last modified time of 
the log segments. Instead it will be based on the largest timestamp of the 
messages in a log segment.</li>
+    <li> The log rolling time is no longer depending on log segment create 
time. Instead it is now based on the timestamp in the messages. More 
specifically. if the timestamp of the first message in the segment is T, the 
log will be rolled out when a new message has a timestamp greater than or equal 
to T + log.roll.ms </li>
+    <li> The open file handlers of 0.10.0 will increase by ~33% because of the 
addition of time index files for each segment.</li>
+    <li> The time index and offset index share the same index size 
configuration. Since each time index entry is 1.5x the size of offset index 
entry. User may need to increase log.index.size.max.bytes to avoid potential 
frequent log rolling. </li>
+    <li> Due to the increased number of index files, on some brokers with 
large amount the log segments (e.g. >15K), the log loading process during the 
broker startup could be longer. Based on our experiment, setting the 
num.recovery.threads.per.data.dir to one may reduce the log loading time. </li>
+</ul>
+
+<h5><a id="upgrade_1010_streams" href="#upgrade_1010_streams">Streams API 
changes in 0.10.1.0</a></h5>
+<ul>
+    <li> Stream grouping and aggregation split into two methods:
+        <ul>
+            <li> old: KStream #aggregateByKey(), #reduceByKey(), and 
#countByKey() </li>
+            <li> new: KStream#groupByKey() plus KGroupedStream #aggregate(), 
#reduce(), and #count() </li>
+            <li> Example: stream.countByKey() changes to 
stream.groupByKey().count() </li>
+        </ul>
+    </li>
+    <li> Auto Repartitioning:
+        <ul>
+            <li> a call to through() after a key-changing operator and before 
an aggregation/join is no longer required </li>
+            <li> Example: stream.selectKey(...).through(...).countByKey() 
changes to stream.selectKey().groupByKey().count() </li>
+        </ul>
+    </li>
+    <li> TopologyBuilder:
+        <ul>
+            <li> methods #sourceTopics(String applicationId) and 
#topicGroups(String applicationId) got simplified to #sourceTopics() and 
#topicGroups() </li>
+        </ul>
+    </li>
+    <li> DSL: new parameter to specify state store names:
+        <ul>
+            <li> The new Interactive Queries feature requires to specify a 
store name for all source KTables and window aggregation result KTables 
(previous parameter "operator/window name" is now the storeName) </li>
+            <li> KStreamBuilder#table(String topic) changes to #topic(String 
topic, String storeName) </li>
+            <li> KTable#through(String topic) changes to #through(String 
topic, String storeName) </li>
+            <li> KGroupedStream #aggregate(), #reduce(), and #count() require 
additional parameter "String storeName"</li>
+            <li> Example: stream.countByKey(TimeWindows.of("windowName", 
1000)) changes to stream.groupByKey().count(TimeWindows.of(1000), 
"countStoreName") </li>
+        </ul>
+    </li>
+    <li> Windowing:
+        <ul>
+            <li> Windows are not named anymore: TimeWindows.of("name", 1000) 
changes to TimeWindows.of(1000) (cf. DSL: new parameter to specify state store 
names) </li>
+            <li> JoinWindows has no default size anymore: 
JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) </li>
+        </ul>
+    </li>
+</ul>
+
+<h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes 
in 0.10.1.0</a></h5>
+<ul>
+    <li> The new Java consumer is no longer in beta and we recommend it for 
all new development. The old Scala consumers are still supported, but they will 
be deprecated in the next release
+         and will be removed in a future major release. </li>
+    <li> The <code>--new-consumer</code>/<code>--new.consumer</code> switch is 
no longer required to use tools like MirrorMaker and the Console Consumer with 
the new consumer; one simply
+         needs to pass a Kafka broker to connect to instead of the ZooKeeper 
ensemble. In addition, usage of the Console Consumer with the old consumer has 
been deprecated and it will be
+         removed in a future major release. </li>
+    <li> Kafka clusters can now be uniquely identified by a cluster id. It 
will be automatically generated when a broker is upgraded to 0.10.1.0. The 
cluster id is available via the kafka.server:type=KafkaServer,name=ClusterId 
metric and it is part of the Metadata response. Serializers, client 
interceptors and metric reporters can receive the cluster id by implementing 
the ClusterResourceListener interface. </li>
+    <li> The BrokerState "RunningAsController" (value 4) has been removed. Due 
to a bug, a broker would only be in this state briefly before transitioning out 
of it and hence the impact of the removal should be minimal. The recommended 
way to detect if a given broker is the controller is via the 
kafka.controller:type=KafkaController,name=ActiveControllerCount metric. </li>
+    <li> The new Java Consumer now allows users to search offsets by timestamp 
on partitions. </li>
+    <li> The new Java Consumer now supports heartbeating from a background 
thread. There is a new configuration
+         <code>max.poll.interval.ms</code> which controls the maximum time 
between poll invocations before the consumer
+         will proactively leave the group (5 minutes by default). The value of 
the configuration
+         <code>request.timeout.ms</code> must always be larger than 
<code>max.poll.interval.ms</code> because this is the maximum
+         time that a JoinGroup request can block on the server while the 
consumer is rebalancing, so we have changed its default
+         value to just above 5 minutes. Finally, the default value of 
<code>session.timeout.ms</code> has been adjusted down to
+         10 seconds, and the default value of <code>max.poll.records</code> 
has been changed to 500.</li>
+    <li> When using an Authorizer and a user doesn't have <b>Describe</b> 
authorization on a topic, the broker will no
+         longer return TOPIC_AUTHORIZATION_FAILED errors to requests since 
this leaks topic names. Instead, the UNKNOWN_TOPIC_OR_PARTITION
+         error code will be returned. This may cause unexpected timeouts or 
delays when using the producer and consumer since
+         Kafka clients will typically retry automatically on unknown topic 
errors. You should consult the client logs if you
+         suspect this could be happening.</li>
+    <li> Fetch responses have a size limit by default (50 MB for consumers and 
10 MB for replication). The existing per partition limits also apply (1 MB for 
consumers
+         and replication). Note that neither of these limits is an absolute 
maximum as explained in the next point. </li>
+    <li> Consumers and replicas can make progress if a message larger than the 
response/partition size limit is found. More concretely, if the first message 
in the
+         first non-empty partition of the fetch is larger than either or both 
limits, the message will still be returned. </li>
+    <li> Overloaded constructors were added to 
<code>kafka.api.FetchRequest</code> and <code>kafka.javaapi.FetchRequest</code> 
to allow the caller to specify the
+         order of the partitions (since order is significant in v3). The 
previously existing constructors were deprecated and the partitions are 
shuffled before
+         the request is sent to avoid starvation issues. </li>
+</ul>
+
+<h5><a id="upgrade_1010_new_protocols" href="#upgrade_1010_new_protocols">New 
Protocol Versions</a></h5>
+<ul>
+    <li> ListOffsetRequest v1 supports accurate offset search based on 
timestamps. </li>
+    <li> MetadataResponse v2 introduces a new field: "cluster_id". </li>
+    <li> FetchRequest v3 supports limiting the response size (in addition to 
the existing per partition limit), it returns messages
+         bigger than the limits if required to make progress and the order of 
partitions in the request is now significant. </li>
+    <li> JoinGroup v1 introduces a new field: "rebalance_timeout". </li>
+</ul>
+
+<h4><a id="upgrade_10" href="#upgrade_10">Upgrading from 0.8.x or 0.9.x to 
0.10.0.0</a></h4>
+0.10.0.0 has <a href="#upgrade_10_breaking">potential breaking changes</a> 
(please review before upgrading) and possible <a 
href="#upgrade_10_performance_impact">  performance impact following the 
upgrade</a>. By following the recommended rolling upgrade plan below, you 
guarantee no downtime and no performance impact during and following the 
upgrade.
+<br>
+Note: Because new protocols are introduced, it is important to upgrade your 
Kafka clusters before upgrading your clients.
+<p/>
+<b>Notes to clients with version 0.9.0.0: </b>Due to a bug introduced in 
0.9.0.0,
+clients that depend on ZooKeeper (old Scala high-level Consumer and 
MirrorMaker if used with the old consumer) will not
+work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 
0.9.0.1 <b>before</b> brokers are upgraded to
+0.10.0.x. This step is not necessary for 0.8.X or 0.9.0.1 clients.
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+    <li> Update server.properties file on all brokers and add the following 
properties:
+         <ul>
+         <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2 
or 0.9.0.0).</li>
+         <li>log.message.format.version=CURRENT_KAFKA_VERSION  (See <a 
href="#upgrade_10_performance_impact">potential performance impact following 
the upgrade</a> for the details on what this configuration does.)
+         </ul>
+    </li>
+    <li> Upgrade the brokers. This can be done a broker at a time by simply 
bringing it down, updating the code, and restarting it. </li>
+    <li> Once the entire cluster is upgraded, bump the protocol version by 
editing inter.broker.protocol.version and setting it to 0.10.0.0. NOTE: You 
shouldn't touch log.message.format.version yet - this parameter should only 
change once all consumers have been upgraded to 0.10.0.0 </li>
+    <li> Restart the brokers one by one for the new protocol version to take 
effect. </li>
+    <li> Once all consumers have been upgraded to 0.10.0, change 
log.message.format.version to 0.10.0 on each broker and restart them one by one.
+    </li>
+</ol>
+
+<p><b>Note:</b> If you are willing to accept downtime, you can simply take all 
the brokers down, update the code and start all of them. They will start with 
the new protocol by default.
+
+<p><b>Note:</b> Bumping the protocol version and restarting can be done any 
time after the brokers were upgraded. It does not have to be immediately after.
+
+<h5><a id="upgrade_10_performance_impact" 
href="#upgrade_10_performance_impact">Potential performance impact following 
upgrade to 0.10.0.0</a></h5>
+<p>
+    The message format in 0.10.0 includes a new timestamp field and uses 
relative offsets for compressed messages.
+    The on disk message format can be configured through 
log.message.format.version in the server.properties file.
+    The default on-disk message format is 0.10.0. If a consumer client is on a 
version before 0.10.0.0, it only understands
+    message formats before 0.10.0. In this case, the broker is able to convert 
messages from the 0.10.0 format to an earlier format
+    before sending the response to the consumer on an older version. However, 
the broker can't use zero-copy transfer in this case.
+
+    Reports from the Kafka community on the performance impact have shown CPU 
utilization going from 20% before to 100% after an upgrade, which forced an 
immediate upgrade of all clients to bring performance back to normal.
+
+    To avoid such message conversion before consumers are upgraded to 
0.10.0.0, one can set log.message.format.version to 0.8.2 or 0.9.0 when 
upgrading the broker to 0.10.0.0. This way, the broker can still use zero-copy 
transfer to send the data to the old consumers. Once consumers are upgraded, 
one can change the message format to 0.10.0 on the broker and enjoy the new 
message format that includes new timestamp and improved compression.
+
+    The conversion is supported to ensure compatibility and can be useful to 
support a few apps that have not updated to newer clients yet, but is 
impractical to support all consumer traffic on even an overprovisioned cluster. 
Therefore, it is critical to avoid the message conversion as much as possible 
when brokers have been upgraded but the majority of clients have not.
+</p>
+<p>
+    For clients that are upgraded to 0.10.0.0, there is no performance impact.
+</p>
+<p>
+    <b>Note:</b> By setting the message format version, one certifies that all 
existing messages are on or below that
+    message format version. Otherwise consumers before 0.10.0.0 might break. 
In particular, after the message format
+    is set to 0.10.0, one should not change it back to an earlier format as it 
may break consumers on versions before 0.10.0.0.
+</p>
+<p>
+    <b>Note:</b> Due to the additional timestamp introduced in each message, 
producers sending small messages may see a
+    message throughput degradation because of the increased overhead.
+    Likewise, replication now transmits an additional 8 bytes per message.
+    If you're running close to the network capacity of your cluster, it's 
possible that you'll overwhelm the network cards
+    and see failures and performance issues due to the overload.
+</p>
+    <b>Note:</b> If you have enabled compression on producers, you may notice 
reduced producer throughput and/or
+    lower compression rate on the broker in some cases. When receiving 
compressed messages, 0.10.0
+    brokers avoid recompressing the messages, which in general reduces the 
latency and improves the throughput. In
+    certain cases, however, this may reduce the batching size on the producer, 
which could lead to worse throughput. If this
+    happens, users can tune linger.ms and batch.size of the producer for 
better throughput. In addition, the producer buffer
+    used for compressing messages with snappy is smaller than the one used by 
the broker, which may have a negative
+    impact on the compression ratio for the messages on disk. We intend to 
make this configurable in a future Kafka
+    release.
+<p>
+
+</p>
+
+<h5><a id="upgrade_10_breaking" href="#upgrade_10_breaking">Potential breaking 
changes in 0.10.0.0</a></h5>
+<ul>
+    <li> Starting from Kafka 0.10.0.0, the message format version in Kafka is 
represented as the Kafka version. For example, message format 0.9.0 refers to 
the highest message version supported by Kafka 0.9.0. </li>
+    <li> Message format 0.10.0 has been introduced and it is used by default. 
It includes a timestamp field in the messages and relative offsets are used for 
compressed messages. </li>
+    <li> ProduceRequest/Response v2 has been introduced and it is used by 
default to support message format 0.10.0 </li>
+    <li> FetchRequest/Response v2 has been introduced and it is used by 
default to support message format 0.10.0 </li>
+    <li> MessageFormatter interface was changed from <code>def writeTo(key: 
Array[Byte], value: Array[Byte], output: PrintStream)</code> to
+        <code>def writeTo(consumerRecord: ConsumerRecord[Array[Byte], 
Array[Byte]], output: PrintStream)</code> </li>
+    <li> MessageReader interface was changed from <code>def readMessage(): 
KeyedMessage[Array[Byte], Array[Byte]]</code> to
+        <code>def readMessage(): ProducerRecord[Array[Byte], 
Array[Byte]]</code> </li>
+    </li>
+    <li> MessageFormatter's package was changed from <code>kafka.tools</code> 
to <code>kafka.common</code> </li>
+    <li> MessageReader's package was changed from <code>kafka.tools</code> to 
<code>kafka.common</code> </li>
+    <li> MirrorMakerMessageHandler no longer exposes the <code>handle(record: 
MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never 
called. </li>
+    <li> The 0.7 KafkaMigrationTool is no longer packaged with Kafka. If you 
need to migrate from 0.7 to 0.10.0, please migrate to 0.8 first and then follow 
the documented upgrade process to upgrade from 0.8 to 0.10.0. </li>
+    <li> The new consumer has standardized its APIs to accept 
<code>java.util.Collection</code> as the sequence type for method parameters. 
Existing code may have to be updated to work with the 0.10.0 client library. 
</li>
+    <li> LZ4-compressed message handling was changed to use an interoperable 
framing specification (LZ4f v1.5.1).
+         To maintain compatibility with old clients, this change only applies 
to Message format 0.10.0 and later.
+         Clients that Produce/Fetch LZ4-compressed messages using v0/v1 
(Message format 0.9.0) should continue
+         to use the 0.9.0 framing implementation. Clients that use 
Produce/Fetch protocols v2 or later
+         should use interoperable LZ4f framing. A list of interoperable LZ4 
libraries is available at http://www.lz4.org/
+</ul>
+
+<h5><a id="upgrade_10_notable" href="#upgrade_10_notable">Notable changes in 
0.10.0.0</a></h5>
+
+<ul>
+    <li> Starting from Kafka 0.10.0.0, a new client library named <b>Kafka 
Streams</b> is available for stream processing on data stored in Kafka topics. 
This new client library only works with 0.10.x and upward versioned brokers due 
to message format changes mentioned above. For more information please read <a 
href="#streams_overview">this section</a>.</li>
+    <li> The default value of the configuration parameter 
<code>receive.buffer.bytes</code> is now 64K for the new consumer.</li>
+    <li> The new consumer now exposes the configuration parameter 
<code>exclude.internal.topics</code> to restrict internal topics (such as the 
consumer offsets topic) from accidentally being included in regular expression 
subscriptions. By default, it is enabled.</li>
+    <li> The old Scala producer has been deprecated. Users should migrate 
their code to the Java producer included in the kafka-clients JAR as soon as 
possible. </li>
+    <li> The new consumer API has been marked stable. </li>
+</ul>
+
+<h4><a id="upgrade_9" href="#upgrade_9">Upgrading from 0.8.0, 0.8.1.X or 
0.8.2.X to 0.9.0.0</a></h4>
+
+0.9.0.0 has <a href="#upgrade_9_breaking">potential breaking changes</a> 
(please review before upgrading) and an inter-broker protocol change from 
previous versions. This means that upgraded brokers and clients may not be 
compatible with older versions. It is important that you upgrade your Kafka 
cluster before upgrading your clients. If you are using MirrorMaker downstream 
clusters should be upgraded first as well.
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+       <li> Update server.properties file on all brokers and add the following 
property: inter.broker.protocol.version=0.8.2.X </li>
+       <li> Upgrade the brokers. This can be done a broker at a time by simply 
bringing it down, updating the code, and restarting it. </li>
+       <li> Once the entire cluster is upgraded, bump the protocol version by 
editing inter.broker.protocol.version and setting it to 0.9.0.0.</li>
+       <li> Restart the brokers one by one for the new protocol version to 
take effect </li>
+</ol>
+
+<p><b>Note:</b> If you are willing to accept downtime, you can simply take all 
the brokers down, update the code and start all of them. They will start with 
the new protocol by default.
+
+<p><b>Note:</b> Bumping the protocol version and restarting can be done any 
time after the brokers were upgraded. It does not have to be immediately after.
+
+<h5><a id="upgrade_9_breaking" href="#upgrade_9_breaking">Potential breaking 
changes in 0.9.0.0</a></h5>
+
+<ul>
+    <li> Java 1.6 is no longer supported. </li>
+    <li> Scala 2.9 is no longer supported. </li>
+    <li> Broker IDs above 1000 are now reserved by default to automatically 
assigned broker IDs. If your cluster has existing broker IDs above that 
threshold make sure to increase the reserved.broker.max.id broker configuration 
property accordingly. </li>
+    <li> Configuration parameter replica.lag.max.messages was removed. 
Partition leaders will no longer consider the number of lagging messages when 
deciding which replicas are in sync. </li>
+    <li> Configuration parameter replica.lag.time.max.ms now refers not just 
to the time passed since last fetch request from replica, but also to time 
since the replica last caught up. Replicas that are still fetching messages 
from leaders but did not catch up to the latest messages in 
replica.lag.time.max.ms will be considered out of sync. </li>
+    <li> Compacted topics no longer accept messages without key and an 
exception is thrown by the producer if this is attempted. In 0.8.x, a message 
without key would cause the log compaction thread to subsequently complain and 
quit (and stop compacting all compacted topics). </li>
+    <li> MirrorMaker no longer supports multiple target clusters. As a result 
it will only accept a single --consumer.config parameter. To mirror multiple 
source clusters, you will need at least one MirrorMaker instance per source 
cluster, each with its own consumer configuration. </li>
+    <li> Tools packaged under <em>org.apache.kafka.clients.tools.*</em> have 
been moved to <em>org.apache.kafka.tools.*</em>. All included scripts will 
still function as usual, only custom code directly importing these classes will 
be affected. </li>
+    <li> The default Kafka JVM performance options 
(KAFKA_JVM_PERFORMANCE_OPTS) have been changed in kafka-run-class.sh. </li>
+    <li> The kafka-topics.sh script (kafka.admin.TopicCommand) now exits with 
non-zero exit code on failure. </li>
+    <li> The kafka-topics.sh script (kafka.admin.TopicCommand) will now print 
a warning when topic names risk metric collisions due to the use of a '.' or 
'_' in the topic name, and error in the case of an actual collision. </li>
+    <li> The kafka-console-producer.sh script (kafka.tools.ConsoleProducer) 
will use the Java producer instead of the old Scala producer be default, and 
users have to specify 'old-producer' to use the old producer. </li>
+    <li> By default, all command line tools will print all logging messages to 
stderr instead of stdout. </li>
+</ul>
+
+<h5><a id="upgrade_901_notable" href="#upgrade_901_notable">Notable changes in 
0.9.0.1</a></h5>
+
+<ul>
+    <li> The new broker id generation feature can be disabled by setting 
broker.id.generation.enable to false. </li>
+    <li> Configuration parameter log.cleaner.enable is now true by default. 
This means topics with a cleanup.policy=compact will now be compacted by 
default, and 128 MB of heap will be allocated to the cleaner process via 
log.cleaner.dedupe.buffer.size. You may want to review 
log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values 
based on your usage of compacted topics. </li>
+    <li> Default value of configuration parameter fetch.min.bytes for the new 
consumer is now 1 by default. </li>
+</ul>
+
+<h5>Deprecations in 0.9.0.0</h5>
+
+<ul>
+    <li> Altering topic configuration from the kafka-topics.sh script 
(kafka.admin.TopicCommand) has been deprecated. Going forward, please use the 
kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality. 
</li>
+    <li> The kafka-consumer-offset-checker.sh 
(kafka.tools.ConsumerOffsetChecker) has been deprecated. Going forward, please 
use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) for this 
functionality. </li>
+    <li> The kafka.tools.ProducerPerformance class has been deprecated. Going 
forward, please use org.apache.kafka.tools.ProducerPerformance for this 
functionality (kafka-producer-perf-test.sh will also be changed to use the new 
class). </li>
+    <li> The producer config block.on.buffer.full has been deprecated and will 
be removed in future release. Currently its default value has been changed to 
false. The KafkaProducer will no longer throw BufferExhaustedException but 
instead will use max.block.ms value to block, after which it will throw a 
TimeoutException. If block.on.buffer.full property is set to true explicitly, 
it will set the max.block.ms to Long.MAX_VALUE and metadata.fetch.timeout.ms 
will not be honoured</li>
+</ul>
+
+<h4><a id="upgrade_82" href="#upgrade_82">Upgrading from 0.8.1 to 
0.8.2</a></h4>
+
+0.8.2 is fully compatible with 0.8.1. The upgrade can be done one broker at a 
time by simply bringing it down, updating the code, and restarting it.
+
+<h4><a id="upgrade_81" href="#upgrade_81">Upgrading from 0.8.0 to 
0.8.1</a></h4>
+
+0.8.1 is fully compatible with 0.8. The upgrade can be done one broker at a 
time by simply bringing it down, updating the code, and restarting it.
+
+<h4><a id="upgrade_7" href="#upgrade_7">Upgrading from 0.7</a></h4>
+
+Release 0.7 is incompatible with newer releases. Major changes were made to 
the API, ZooKeeper data structures, and protocol, and configuration in order to 
add replication (Which was missing in 0.7). The upgrade from 0.7 to later 
versions requires a <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/Migrating+from+0.7+to+0.8";>special
 tool</a> for migration. This migration can be done without downtime.

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/a7c3675d/0102/uses.html
----------------------------------------------------------------------
diff --git a/0102/uses.html b/0102/uses.html
new file mode 100644
index 0000000..b86d917
--- /dev/null
+++ b/0102/uses.html
@@ -0,0 +1,54 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<p> Here is a description of a few of the popular use cases for Apache Kafka. 
For an overview of a number of these areas in action, see <a 
href="http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying";>this
 blog post</a>. </p>
+
+<h4><a id="uses_messaging" href="#uses_messaging">Messaging</a></h4>
+
+Kafka works well as a replacement for a more traditional message broker. 
Message brokers are used for a variety of reasons (to decouple processing from 
data producers, to buffer unprocessed messages, etc). In comparison to most 
messaging systems Kafka has better throughput, built-in partitioning, 
replication, and fault-tolerance which makes it a good solution for large scale 
message processing applications.
+<p>
+In our experience messaging uses are often comparatively low-throughput, but 
may require low end-to-end latency and often depend on the strong durability 
guarantees Kafka provides.
+<p>
+In this domain Kafka is comparable to traditional messaging systems such as <a 
href="http://activemq.apache.org";>ActiveMQ</a> or <a 
href="https://www.rabbitmq.com";>RabbitMQ</a>.
+
+<h4><a id="uses_website" href="#uses_website">Website Activity 
Tracking</a></h4>
+
+The original use case for Kafka was to be able to rebuild a user activity 
tracking pipeline as a set of real-time publish-subscribe feeds. This means 
site activity (page views, searches, or other actions users may take) is 
published to central topics with one topic per activity type. These feeds are 
available for subscription for a range of use cases including real-time 
processing, real-time monitoring, and loading into Hadoop or offline data 
warehousing systems for offline processing and reporting.
+<p>
+Activity tracking is often very high volume as many activity messages are 
generated for each user page view.
+
+<h4><a id="uses_metrics" href="#uses_metrics">Metrics</a></h4>
+
+Kafka is often used for operational monitoring data. This involves aggregating 
statistics from distributed applications to produce centralized feeds of 
operational data.
+
+<h4><a id="uses_logs" href="#uses_logs">Log Aggregation</a></h4>
+
+Many people use Kafka as a replacement for a log aggregation solution. Log 
aggregation typically collects physical log files off servers and puts them in 
a central place (a file server or HDFS perhaps) for processing. Kafka abstracts 
away the details of files and gives a cleaner abstraction of log or event data 
as a stream of messages. This allows for lower-latency processing and easier 
support for multiple data sources and distributed data consumption.
+
+In comparison to log-centric systems like Scribe or Flume, Kafka offers 
equally good performance, stronger durability guarantees due to replication, 
and much lower end-to-end latency.
+
+<h4><a id="uses_streamprocessing" href="#uses_streamprocessing">Stream 
Processing</a></h4>
+
+Many users of Kafka process data in processing pipelines consisting of 
multiple stages, where raw input data is consumed from Kafka topics and then 
aggregated, enriched, or otherwise transformed into new topics for further 
consumption or follow-up processing. For example, a processing pipeline for 
recommending news articles might crawl article content from RSS feeds and 
publish it to an "articles" topic; further processing might normalize or 
deduplicate this content and published the cleansed article content to a new 
topic; a final processing stage might attempt to recommend this content to 
users. Such processing pipelines create graphs of real-time data flows based on 
the individual topics. Starting in 0.10.0.0, a light-weight but powerful stream 
processing library called <a href="#streams_overview">Kafka Streams</a> is 
available in Apache Kafka to perform such data processing as described above. 
Apart from Kafka Streams, alternative open source stream processing tools 
include <a h
 ref="https://storm.apache.org/";>Apache Storm</a> and <a 
href="http://samza.apache.org/";>Apache Samza</a>.
+
+<h4><a id="uses_eventsourcing" href="#uses_eventsourcing">Event 
Sourcing</a></h4>
+
+<a href="http://martinfowler.com/eaaDev/EventSourcing.html";>Event sourcing</a> 
is a style of application design where state changes are logged as a 
time-ordered sequence of records. Kafka's support for very large stored log 
data makes it an excellent backend for an application built in this style.
+
+<h4><a id="uses_commitlog" href="#uses_commitlog">Commit Log</a></h4>
+
+Kafka can serve as a kind of external commit-log for a distributed system. The 
log helps replicate data between nodes and acts as a re-syncing mechanism for 
failed nodes to restore their data. The <a 
href="/documentation.html#compaction">log compaction</a> feature in Kafka helps 
support this usage. In this usage Kafka is similar to <a 
href="http://zookeeper.apache.org/bookkeeper/";>Apache BookKeeper</a> project.

Reply via email to