Repository: kafka-site Updated Branches: refs/heads/asf-site 9a36603f4 -> a7c3675d3
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/a7c3675d/0102/streams.html ---------------------------------------------------------------------- diff --git a/0102/streams.html b/0102/streams.html new file mode 100644 index 0000000..dec17ef --- /dev/null +++ b/0102/streams.html @@ -0,0 +1,424 @@ +<!--~ + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~--> + +<script><!--#include virtual="js/templateData.js" --></script> + +<script id="streams-template" type="text/x-handlebars-template"> + <h1>Streams</h1> + + <ol class="toc"> + <li> + <a href="#streams_overview">Overview</a> + </li> + <li> + <a href="#streams_developer">Developer guide</a> + <ul> + <li><a href="#streams_concepts">Core concepts</a> + <li><a href="#streams_processor">Low-level processor API</a> + <li><a href="#streams_dsl">High-level streams DSL</a> + </ul> + </li> + </ol> + + <h2><a id="streams_overview" href="#overview">Overview</a></h2> + + <p> + Kafka Streams is a client library for processing and analyzing data stored in Kafka and either write the resulting data back to Kafka or send the final output to an external system. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management of application state. + Kafka Streams has a <b>low barrier to entry</b>: You can quickly write and run a small-scale proof-of-concept on a single machine; and you only need to run additional instances of your application on multiple machines to scale up to high-volume production workloads. Kafka Streams transparently handles the load balancing of multiple instances of the same application by leveraging Kafka's parallelism model. + </p> + <p> + Some highlights of Kafka Streams: + </p> + + <ul> + <li>Designed as a <b>simple and lightweight client library</b>, which can be easily embedded in any Java application and integrated with any existing packaging, deployment and operational tools that users have for their streaming applications.</li> + <li>Has <b>no external dependencies on systems other than Apache Kafka itself</b> as the internal messaging layer; notably, it uses Kafka's partitioning model to horizontally scale processing while maintaining strong ordering guarantees.</li> + <li>Supports <b>fault-tolerant local state</b>, which enables very fast and efficient stateful operations like joins and windowed aggregations.</li> + <li>Employs <b>one-record-at-a-time processing</b> to achieve low processing latency, and supports <b>event-time based windowing operations</b>.</li> + <li>Offers necessary stream processing primitives, along with a <b>high-level Streams DSL</b> and a <b>low-level Processor API</b>.</li> + + </ul> + + <h2><a id="streams_developer" href="#streams_developer">Developer Guide</a></h2> + + <p> + There is a <a href="#quickstart_kafkastreams">quickstart</a> example that provides how to run a stream processing program coded in the Kafka Streams library. + This section focuses on how to write, configure, and execute a Kafka Streams application. + </p> + + <h4><a id="streams_concepts" href="#streams_concepts">Core Concepts</a></h4> + + <p> + We first summarize the key concepts of Kafka Streams. + </p> + + <h5><a id="streams_topology" href="#streams_topology">Stream Processing Topology</a></h5> + + <ul> + <li>A <b>stream</b> is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a <b>data record</b> is defined as a key-value pair.</li> + <li>A <b>stream processing application</b> written in Kafka Streams defines its computational logic through one or more <b>processor topologies</b>, where a processor topology is a graph of stream processors (nodes) that are connected by streams (edges).</li> + <li>A <b>stream processor</b> is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently producing one or more output records to its downstream processors.</li> + </ul> + + <p> + Kafka Streams offers two ways to define the stream processing topology: the <a href="#streams_dsl"><b>Kafka Streams DSL</b></a> provides + the most common data transformation operations such as <code>map</code> and <code>filter</code>; the lower-level <a href="#streams_processor"><b>Processor API</b></a> allows + developers define and connect custom processors as well as to interact with <a href="#streams_state">state stores</a>. + </p> + + <h5><a id="streams_time" href="#streams_time">Time</a></h5> + + <p> + A critical aspect in stream processing is the notion of <b>time</b>, and how it is modeled and integrated. + For example, some operations such as <b>windowing</b> are defined based on time boundaries. + </p> + <p> + Common notions of time in streams are: + </p> + + <ul> + <li><b>Event time</b> - The point in time when an event or data record occurred, i.e. was originally created "at the source".</li> + <li><b>Processing time</b> - The point in time when the event or data record happens to be processed by the stream processing application, i.e. when the record is being consumed. The processing time may be milliseconds, hours, or days etc. later than the original event time.</li> + <li><b>Ingestion time</b> - The point in time when an event or data record is stored in a topic partition by a Kafka broker. The difference to event time is that this ingestion timestamp is generated when the record is appended to the target topic by the Kafka broker, not when the record is created "at the source". The difference to processing time is that processing time is when the stream processing application processes the record. For example, if a record is never processed, there is no notion of processing time for it, but it still has an ingestion time. + </ul> + <p> + The choice between event-time and ingestion-time is actually done through the configuration of Kafka (not Kafka Streams): From Kafka 0.10.x onwards, timestamps are automatically embedded into Kafka messages. Depending on Kafka's configuration these timestamps represent event-time or ingestion-time. The respective Kafka configuration setting can be specified on the broker level or per topic. The default timestamp extractor in Kafka Streams will retrieve these embedded timestamps as-is. Hence, the effective time semantics of your application depend on the effective Kafka configuration for these embedded timestamps. + </p> + <p> + Kafka Streams assigns a <b>timestamp</b> to every data record + via the <code>TimestampExtractor</code> interface. + Concrete implementations of this interface may retrieve or compute timestamps based on the actual contents of data records such as an embedded timestamp field + to provide event-time semantics, or use any other approach such as returning the current wall-clock time at the time of processing, + thereby yielding processing-time semantics to stream processing applications. + Developers can thus enforce different notions of time depending on their business needs. For example, + per-record timestamps describe the progress of a stream with regards to time (although records may be out-of-order within the stream) and + are leveraged by time-dependent operations such as joins. + </p> + + <p> + Finally, whenever a Kafka Streams application writes records to Kafka, then it will also assign timestamps to these new records. The way the timestamps are assigned depends on the context: + <ul> + <li> When new output records are generated via processing some input record, for example, <code>context.forward()</code> triggered in the <code>process()</code> function call, output record timestamps are inherited from input record timestamps directly.</li> + <li> When new output records are generated via periodic functions such as <code>punctuate()</code>, the output record timestamp is defined as the current internal time (obtained through <code>context.timestamp()</code>) of the stream task.</li> + <li> For aggregations, the timestamp of a resulting aggregate update record will be that of the latest arrived input record that triggered the update.</li> + </ul> + </p> + + <h5><a id="streams_state" href="#streams_state">States</a></h5> + + <p> + Some stream processing applications don't require state, which means the processing of a message is independent from + the processing of all other messages. + However, being able to maintain state opens up many possibilities for sophisticated stream processing applications: you + can join input streams, or group and aggregate data records. Many such stateful operators are provided by the <a href="#streams_dsl"><b>Kafka Streams DSL</b></a>. + </p> + <p> + Kafka Streams provides so-called <b>state stores</b>, which can be used by stream processing applications to store and query data. + This is an important capability when implementing stateful operations. + Every task in Kafka Streams embeds one or more state stores that can be accessed via APIs to store and query data required for processing. + These state stores can either be a persistent key-value store, an in-memory hashmap, or another convenient data structure. + Kafka Streams offers fault-tolerance and automatic recovery for local state stores. + </p> + <p> + Kafka Streams allows direct read-only queries of the state stores by methods, threads, processes or applications external to the stream processing application that created the state stores. This is provided through a feature called <b>Interactive Queries</b>. All stores are named and Interactive Queries exposes only the read operations of the underlying implementation. + </p> + <br> + <p> + As we have mentioned above, the computational logic of a Kafka Streams application is defined as a <a href="#streams_topology">processor topology</a>. + Currently Kafka Streams provides two sets of APIs to define the processor topology, which will be described in the subsequent sections. + </p> + + <h4><a id="streams_processor" href="#streams_processor">Low-Level Processor API</a></h4> + + <h5><a id="streams_processor_process" href="#streams_processor_process">Processor</a></h5> + + <p> + Developers can define their customized processing logic by implementing the <code>Processor</code> interface, which + provides <code>process</code> and <code>punctuate</code> methods. The <code>process</code> method is performed on each + of the received record; and the <code>punctuate</code> method is performed periodically based on elapsed time. + In addition, the processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the + <code>init</code> method, and use the context to schedule the punctuation period (<code>context().schedule</code>), to + forward the modified / new key-value pair to downstream processors (<code>context().forward</code>), to commit the current + processing progress (<code>context().commit</code>), etc. + </p> + + <pre> + public class MyProcessor extends Processor<String, String> { + private ProcessorContext context; + private KeyValueStore<String, Integer> kvStore; + + @Override + @SuppressWarnings("unchecked") + public void init(ProcessorContext context) { + this.context = context; + this.context.schedule(1000); + this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts"); + } + + @Override + public void process(String dummy, String line) { + String[] words = line.toLowerCase().split(" "); + + for (String word : words) { + Integer oldValue = this.kvStore.get(word); + + if (oldValue == null) { + this.kvStore.put(word, 1); + } else { + this.kvStore.put(word, oldValue + 1); + } + } + } + + @Override + public void punctuate(long timestamp) { + KeyValueIterator<String, Integer> iter = this.kvStore.all(); + + while (iter.hasNext()) { + KeyValue<String, Integer> entry = iter.next(); + context.forward(entry.key, entry.value.toString()); + } + + iter.close(); + context.commit(); + } + + @Override + public void close() { + this.kvStore.close(); + } + }; + </pre> + + <p> + In the above implementation, the following actions are performed: + + <ul> + <li>In the <code>init</code> method, schedule the punctuation every 1 second and retrieve the local state store by its name "Counts".</li> + <li>In the <code>process</code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this feature later in the section).</li> + <li>In the <code>punctuate</code> method, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.</li> + </ul> + </p> + + <h5><a id="streams_processor_topology" href="#streams_processor_topology">Processor Topology</a></h5> + + <p> + With the customized processors defined in the Processor API, developers can use the <code>TopologyBuilder</code> to build a processor topology + by connecting these processors together: + + <pre> + TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("SOURCE", "src-topic") + + .addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE") + .addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1") + .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1") + + .addSink("SINK1", "sink-topic1", "PROCESS1") + .addSink("SINK2", "sink-topic2", "PROCESS2") + .addSink("SINK3", "sink-topic3", "PROCESS3"); + </pre> + + There are several steps in the above code to build the topology, and here is a quick walk through: + + <ul> + <li>First of all a source node named "SOURCE" is added to the topology using the <code>addSource</code> method, with one Kafka topic "src-topic" fed to it.</li> + <li>Three processor nodes are then added using the <code>addProcessor</code> method; here the first processor is a child of the "SOURCE" node, but is the parent of the other two processors.</li> + <li>Finally three sink nodes are added to complete the topology using the <code>addSink</code> method, each piping from a different parent processor node and writing to a separate topic.</li> + </ul> + </p> + + <h5><a id="streams_processor_statestore" href="#streams_processor_statestore">Local State Store</a></h5> + + <p> + Note that the Processor API is not limited to only accessing the current records as they arrive, but can also maintain local state stores + that keep recently arrived records to use in stateful processing operations such as aggregation or windowed joins. + To take advantage of this local states, developers can use the <code>TopologyBuilder.addStateStore</code> method when building the + processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created + local state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>. + + <pre> + TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("SOURCE", "src-topic") + + .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE") + // create the in-memory state store "COUNTS" associated with processor "PROCESS1" + .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1") + .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1") + .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1") + + // connect the state store "COUNTS" with processor "PROCESS2" + .connectProcessorAndStateStores("PROCESS2", "COUNTS"); + + .addSink("SINK1", "sink-topic1", "PROCESS1") + .addSink("SINK2", "sink-topic2", "PROCESS2") + .addSink("SINK3", "sink-topic3", "PROCESS3"); + </pre> + + </p> + + In the next section we present another way to build the processor topology: the Kafka Streams DSL. + + <h4><a id="streams_dsl" href="#streams_dsl">High-Level Streams DSL</a></h4> + + To build a processor topology using the Streams DSL, developers can apply the <code>KStreamBuilder</code> class, which is extended from the <code>TopologyBuilder</code>. + A simple example is included with the source code for Kafka in the <code>streams/examples</code> package. The rest of this section will walk + through some code to demonstrate the key steps in creating a topology using the Streams DSL, but we recommend developers to read the full example source + codes for details. + + <h5><a id="streams_kstream_ktable" href="#streams_kstream_ktable">KStream and KTable</a></h5> + The DSL uses two main abstractions. A <b>KStream</b> is an abstraction of a record stream, where each data record represents a self-contained datum in the unbounded data set. A <b>KTable</b> is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is considered to be an update of the last value for the same record key, if any (if a corresponding key doesn't exist yet, the update will be considered a create). To illustrate the difference between KStreams and KTables, letâs imagine the following two data records are being sent to the stream: <code>("alice", 1) --> ("alice", 3)</code>. If these records a KStream and the stream processing application were to sum the values it would return <code>4</code>. If these records were a KTable, the return would be <code>3</code>, since the last record would be considered as an update. + + + + <h5><a id="streams_dsl_source" href="#streams_dsl_source">Create Source Streams from Kafka</a></h5> + + <p> + Either a <b>record stream</b> (defined as <code>KStream</code>) or a <b>changelog stream</b> (defined as <code>KTable</code>) + can be created as a source stream from one or more Kafka topics (for <code>KTable</code> you can only create the source stream + from a single topic). + </p> + + <pre> + KStreamBuilder builder = new KStreamBuilder(); + + KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2"); + KTable<String, GenericRecord> source2 = builder.table("topic3", "stateStoreName"); + </pre> + + <h5><a id="streams_dsl_windowing" href="#streams_dsl_windowing">Windowing a stream</a></h5> + A stream processor may need to divide data records into time buckets, i.e. to <b>window</b> the stream by time. This is usually needed for join and aggregation operations, etc. Kafka Streams currently defines the following types of windows: + <ul> + <li><b>Hopping time windows</b> are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window's size and its advance interval (aka "hop"). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap a data record may belong to more than one such windows.</li> + <li><b>Tumbling time windows</b> are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window's size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.</li> + <li><b>Sliding windows</b> model a fixed-size window that slides continuously over the time axis; here, two data records are said to be included in the same window if the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but on the data record timestamps. In Kafka Streams, sliding windows are used only for join operations, and can be specified through the <code>JoinWindows</code> class.</li> + </ul> + + <h5><a id="streams_dsl_joins" href="#streams_dsl_joins">Joins</a></h5> + A <b>join</b> operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely. In Kafka Streams, you may perform the following join operations: + <ul> + <li><b>KStream-to-KStream Joins</b> are always windowed joins, since otherwise the memory and state required to compute the join would grow infinitely in size. Here, a newly received record from one of the streams is joined with the other stream's records within the specified window interval to produce one result for each matching pair based on user-provided <code>ValueJoiner</code>. A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li> + + <li><b>KTable-to-KTable Joins</b> are join operations designed to be consistent with the ones in relational databases. Here, both changelog streams are materialized into local state stores first. When a new record is received from one of the streams, it is joined with the other stream's materialized state stores to produce one result for each matching pair based on user-provided ValueJoiner. A new <code>KTable</code> instance representing the result stream of the join, which is also a changelog stream of the represented table, is returned from this operator.</li> + <li><b>KStream-to-KTable Joins</b> allow you to perform table lookups against a changelog stream (<code>KTable</code>) upon receiving a new record from another record stream (KStream). An example use case would be to enrich a stream of user activities (<code>KStream</code>) with the latest user profile information (<code>KTable</code>). Only records received from the record stream will trigger the join and produce results via <code>ValueJoiner</code>, not vice versa (i.e., records received from the changelog stream will be used only to update the materialized state store). A new <code>KStream</code> instance representing the result stream of the join is returned from this operator.</li> + </ul> + + Depending on the operands the following join operations are supported: <b>inner joins</b>, <b>outer joins</b> and <b>left joins</b>. Their semantics are similar to the corresponding operators in relational databases. + a + <h5><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform a stream</a></h5> + + <p> + There is a list of transformation operations provided for <code>KStream</code> and <code>KTable</code> respectively. + Each of these operations may generate either one or more <code>KStream</code> and <code>KTable</code> objects and + can be translated into one or more connected processors into the underlying processor topology. + All these transformation methods can be chained together to compose a complex processor topology. + Since <code>KStream</code> and <code>KTable</code> are strongly typed, all these transformation operations are defined as + generics functions where users could specify the input and output data types. + </p> + + <p> + Among these transformations, <code>filter</code>, <code>map</code>, <code>mapValues</code>, etc, are stateless + transformation operations and can be applied to both <code>KStream</code> and <code>KTable</code>, + where users can usually pass a customized function to these functions as a parameter, such as <code>Predicate</code> for <code>filter</code>, + <code>KeyValueMapper</code> for <code>map</code>, etc: + + </p> + + <pre> + // written in Java 8+, using lambda expressions + KStream<String, GenericRecord> mapped = source1.mapValue(record -> record.get("category")); + </pre> + + <p> + Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise + they do not require a state store associated with the stream processor; Stateful transformations, on the other hand, + require accessing an associated state for processing and producing outputs. + For example, in <code>join</code> and <code>aggregate</code> operations, a windowing state is usually used to store all the received records + within the defined window boundary so far. The operators can then access these accumulated records in the store and compute + based on them. + </p> + + <pre> + // written in Java 8+, using lambda expressions + KTable<Windowed<String>, Long> counts = source1.groupByKey().aggregate( + () -> 0L, // initial value + (aggKey, value, aggregate) -> aggregate + 1L, // aggregating value + TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds + Serdes.Long() // serde for aggregated value + ); + + KStream<String, String> joined = source1.leftJoin(source2, + (record1, record2) -> record1.get("user") + "-" + record2.get("region"); + ); + </pre> + + <h5><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams back to Kafka</a></h5> + + <p> + At the end of the processing, users can choose to (continuously) write the final resulted streams back to a Kafka topic through + <code>KStream.to</code> and <code>KTable.to</code>. + </p> + + <pre> + joined.to("topic4"); + </pre> + + If your application needs to continue reading and processing the records after they have been materialized + to a topic via <code>to</code> above, one option is to construct a new stream that reads from the output topic; + Kafka Streams provides a convenience method called <code>through</code>: + + <pre> + // equivalent to + // + // joined.to("topic4"); + // materialized = builder.stream("topic4"); + KStream<String, String> materialized = joined.through("topic4"); + </pre> + + + <br> + <p> + Besides defining the topology, developers will also need to configure their applications + in <code>StreamsConfig</code> before running it. A complete list of + Kafka Streams configs can be found <a href="#streamsconfigs"><b>here</b></a>. + </p> +</script> + +<!--#include virtual="../includes/_header.htm" --> +<!--#include virtual="../includes/_top.htm" --> +<div class="content documentation documentation--current"> + <!--#include virtual="../includes/_nav.htm" --> + <div class="right"> + <!--#include virtual="../includes/_docs_banner.htm" --> + <ul class="breadcrumbs"> + <li><a href="/documentation">Documentation</a></li> + </ul> + <div class="p-streams"></div> + </div> +</div> +<!--#include virtual="../includes/_footer.htm" --> +<script> +$(function() { + // Show selected style on nav item + $('.b-nav__streams').addClass('selected'); + + // Display docs subnav items + $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded'); +}); +</script> http://git-wip-us.apache.org/repos/asf/kafka-site/blob/a7c3675d/0102/upgrade.html ---------------------------------------------------------------------- diff --git a/0102/upgrade.html b/0102/upgrade.html new file mode 100644 index 0000000..ebc61db --- /dev/null +++ b/0102/upgrade.html @@ -0,0 +1,367 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.x to 0.10.2.0</a></h4> +<p>0.10.2.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. +However, please review the <a href="#upgrade_1020_notable">notable changes in 0.10.2.0</a> before upgrading. +</p> + +<p>Starting with version 0.10.2, Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.10.2 +clients can talk to version 0.10.0 or newer brokers. However, if your brokers are older than 0.10.0, you must upgrade all the brokers in the +Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8.x and newer clients. +</p> + +<p><b>For a rolling upgrade:</b></p> + +<ol> + <li> Update server.properties file on all brokers and add the following properties: + <ul> + <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0 or 0.10.1).</li> + <li>log.message.format.version=CURRENT_KAFKA_VERSION (See <a href="#upgrade_10_performance_impact">potential performance impact following the upgrade</a> for the details on what this configuration does.) + </ul> + </li> + <li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. </li> + <li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.2. </li> + <li> If your previous message format is 0.10.0, change log.message.format.version to 0.10.2 (this is a no-op as the message format is the same for 0.10.0, 0.10.1 and 0.10.2). + If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.</li> + <li> Restart the brokers one by one for the new protocol version to take effect. </li> + <li> If log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later, + then change log.message.format.version to 0.10.2 on each broker and restart them one by one. </li> +</ol> + +<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default. + +<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after. + +<ol> + <li>Upgrading a Kafka Streams Applications: + <ul> + <li>You need to recompile your code. Just swapping the jar file will not work and will break your appliation.</li> + <li>If you use a custom timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface was changed.</li> + </ul> + </li> +</ol> + +<h5><a id="upgrade_1020_notable" href="#upgrade_1020_notable">Notable changes in 0.10.2.0</a></h5> +<ul> + <li>The Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.10.2 clients + can talk to version 0.10.0 or newer brokers. Note that some features are not available or are limited when older brokers + are used. </li> + <li>Several methods on the Java consumer may now throw <code>InterruptException</code> if the calling thread is interrupted. + Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li> + <li>Java consumer now shuts down gracefully. By default, the consumer waits up to 30 seconds to complete pending requests. + A new close API with timeout has been added to <code>KafkaConsumer</code> to control the maximum wait time.</li> + <li>Multiple regular expressions separated by commas can be passed to MirrorMaker with the new Java consumer via the --whitelist option. This + makes the behaviour consistent with MirrorMaker when used the old Scala consumer.</li> + <li>The Zookeeper dependency was removed from the Streams API. The Streams API now uses the Kafka protocol to manage internal topics instead of + modifying Zookeeper directly. This eliminates the need for privileges to access Zookeeper directly and "StreamsConfig.ZOOKEEPER_CONFIG" + should not be set in the Streams app any more. If the Kafka cluster is secured, Streams apps must have the required security privileges to create new topics.</li> + <li>Several new fields including "security.protocol", "connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" and "request.timeout.ms" were added to + StreamsConfig class. User should pay attenntion to the default values and set these if needed. For more details please refer to <a id="streamsconfigs" href="#streamsconfigs">3.5 Kafka Streams Configs</a>.</li> +</ul> + +<h5><a id="upgrade_1020_new_protocols" href="#upgrade_1020_new_protocols">New Protocol Versions</a></h5> +<ul> + <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update">KIP-88</a>: OffsetFetchRequest v2 supports retrieval of offsets for all topics if the <code>topics</code> array is set to <code>null</code>. </li> + <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update">KIP-88</a>: OffsetFetchResponse v2 introduces a top-level <code>error_code</code> field. </li> + <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic">KIP-103</a>: UpdateMetadataRequest v3 introduces a <code>listener_name</code> field to the elements of the <code>end_points</code> array. </li> + <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-108%3A+Create+Topic+Policy">KIP-108</a>: CreateTopicsRequest v1 introduces a <code>validate_only</code> field. </li> + <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-108%3A+Create+Topic+Policy">KIP-108</a>: CreateTopicsResponse v1 introduces an <code>error_message</code> field to the elements of the <code>topic_errors</code> array. </li> +</ul> + +<h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0</a></h4> +0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. +However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a> before upgrade. +<br> +Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients (i.e. 0.10.1.x clients +only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older clients). + +<p><b>For a rolling upgrade:</b></p> + +<ol> + <li> Update server.properties file on all brokers and add the following properties: + <ul> + <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2.0, 0.9.0.0 or 0.10.0.0).</li> + <li>log.message.format.version=CURRENT_KAFKA_VERSION (See <a href="#upgrade_10_performance_impact">potential performance impact following the upgrade</a> for the details on what this configuration does.) + </ul> + </li> + <li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. </li> + <li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.1.0. </li> + <li> If your previous message format is 0.10.0, change log.message.format.version to 0.10.1 (this is a no-op as the message format is the same for both 0.10.0 and 0.10.1). + If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.</li> + <li> Restart the brokers one by one for the new protocol version to take effect. </li> + <li> If log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later, + then change log.message.format.version to 0.10.1 on each broker and restart them one by one. </li> +</ol> + +<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default. + +<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after. + +<h5><a id="upgrade_10_1_breaking" href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a></h5> +<ul> + <li> The log retention time is no longer based on last modified time of the log segments. Instead it will be based on the largest timestamp of the messages in a log segment.</li> + <li> The log rolling time is no longer depending on log segment create time. Instead it is now based on the timestamp in the messages. More specifically. if the timestamp of the first message in the segment is T, the log will be rolled out when a new message has a timestamp greater than or equal to T + log.roll.ms </li> + <li> The open file handlers of 0.10.0 will increase by ~33% because of the addition of time index files for each segment.</li> + <li> The time index and offset index share the same index size configuration. Since each time index entry is 1.5x the size of offset index entry. User may need to increase log.index.size.max.bytes to avoid potential frequent log rolling. </li> + <li> Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time. </li> +</ul> + +<h5><a id="upgrade_1010_streams" href="#upgrade_1010_streams">Streams API changes in 0.10.1.0</a></h5> +<ul> + <li> Stream grouping and aggregation split into two methods: + <ul> + <li> old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey() </li> + <li> new: KStream#groupByKey() plus KGroupedStream #aggregate(), #reduce(), and #count() </li> + <li> Example: stream.countByKey() changes to stream.groupByKey().count() </li> + </ul> + </li> + <li> Auto Repartitioning: + <ul> + <li> a call to through() after a key-changing operator and before an aggregation/join is no longer required </li> + <li> Example: stream.selectKey(...).through(...).countByKey() changes to stream.selectKey().groupByKey().count() </li> + </ul> + </li> + <li> TopologyBuilder: + <ul> + <li> methods #sourceTopics(String applicationId) and #topicGroups(String applicationId) got simplified to #sourceTopics() and #topicGroups() </li> + </ul> + </li> + <li> DSL: new parameter to specify state store names: + <ul> + <li> The new Interactive Queries feature requires to specify a store name for all source KTables and window aggregation result KTables (previous parameter "operator/window name" is now the storeName) </li> + <li> KStreamBuilder#table(String topic) changes to #topic(String topic, String storeName) </li> + <li> KTable#through(String topic) changes to #through(String topic, String storeName) </li> + <li> KGroupedStream #aggregate(), #reduce(), and #count() require additional parameter "String storeName"</li> + <li> Example: stream.countByKey(TimeWindows.of("windowName", 1000)) changes to stream.groupByKey().count(TimeWindows.of(1000), "countStoreName") </li> + </ul> + </li> + <li> Windowing: + <ul> + <li> Windows are not named anymore: TimeWindows.of("name", 1000) changes to TimeWindows.of(1000) (cf. DSL: new parameter to specify state store names) </li> + <li> JoinWindows has no default size anymore: JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) </li> + </ul> + </li> +</ul> + +<h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5> +<ul> + <li> The new Java consumer is no longer in beta and we recommend it for all new development. The old Scala consumers are still supported, but they will be deprecated in the next release + and will be removed in a future major release. </li> + <li> The <code>--new-consumer</code>/<code>--new.consumer</code> switch is no longer required to use tools like MirrorMaker and the Console Consumer with the new consumer; one simply + needs to pass a Kafka broker to connect to instead of the ZooKeeper ensemble. In addition, usage of the Console Consumer with the old consumer has been deprecated and it will be + removed in a future major release. </li> + <li> Kafka clusters can now be uniquely identified by a cluster id. It will be automatically generated when a broker is upgraded to 0.10.1.0. The cluster id is available via the kafka.server:type=KafkaServer,name=ClusterId metric and it is part of the Metadata response. Serializers, client interceptors and metric reporters can receive the cluster id by implementing the ClusterResourceListener interface. </li> + <li> The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric. </li> + <li> The new Java Consumer now allows users to search offsets by timestamp on partitions. </li> + <li> The new Java Consumer now supports heartbeating from a background thread. There is a new configuration + <code>max.poll.interval.ms</code> which controls the maximum time between poll invocations before the consumer + will proactively leave the group (5 minutes by default). The value of the configuration + <code>request.timeout.ms</code> must always be larger than <code>max.poll.interval.ms</code> because this is the maximum + time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default + value to just above 5 minutes. Finally, the default value of <code>session.timeout.ms</code> has been adjusted down to + 10 seconds, and the default value of <code>max.poll.records</code> has been changed to 500.</li> + <li> When using an Authorizer and a user doesn't have <b>Describe</b> authorization on a topic, the broker will no + longer return TOPIC_AUTHORIZATION_FAILED errors to requests since this leaks topic names. Instead, the UNKNOWN_TOPIC_OR_PARTITION + error code will be returned. This may cause unexpected timeouts or delays when using the producer and consumer since + Kafka clients will typically retry automatically on unknown topic errors. You should consult the client logs if you + suspect this could be happening.</li> + <li> Fetch responses have a size limit by default (50 MB for consumers and 10 MB for replication). The existing per partition limits also apply (1 MB for consumers + and replication). Note that neither of these limits is an absolute maximum as explained in the next point. </li> + <li> Consumers and replicas can make progress if a message larger than the response/partition size limit is found. More concretely, if the first message in the + first non-empty partition of the fetch is larger than either or both limits, the message will still be returned. </li> + <li> Overloaded constructors were added to <code>kafka.api.FetchRequest</code> and <code>kafka.javaapi.FetchRequest</code> to allow the caller to specify the + order of the partitions (since order is significant in v3). The previously existing constructors were deprecated and the partitions are shuffled before + the request is sent to avoid starvation issues. </li> +</ul> + +<h5><a id="upgrade_1010_new_protocols" href="#upgrade_1010_new_protocols">New Protocol Versions</a></h5> +<ul> + <li> ListOffsetRequest v1 supports accurate offset search based on timestamps. </li> + <li> MetadataResponse v2 introduces a new field: "cluster_id". </li> + <li> FetchRequest v3 supports limiting the response size (in addition to the existing per partition limit), it returns messages + bigger than the limits if required to make progress and the order of partitions in the request is now significant. </li> + <li> JoinGroup v1 introduces a new field: "rebalance_timeout". </li> +</ul> + +<h4><a id="upgrade_10" href="#upgrade_10">Upgrading from 0.8.x or 0.9.x to 0.10.0.0</a></h4> +0.10.0.0 has <a href="#upgrade_10_breaking">potential breaking changes</a> (please review before upgrading) and possible <a href="#upgrade_10_performance_impact"> performance impact following the upgrade</a>. By following the recommended rolling upgrade plan below, you guarantee no downtime and no performance impact during and following the upgrade. +<br> +Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients. +<p/> +<b>Notes to clients with version 0.9.0.0: </b>Due to a bug introduced in 0.9.0.0, +clients that depend on ZooKeeper (old Scala high-level Consumer and MirrorMaker if used with the old consumer) will not +work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9.0.1 <b>before</b> brokers are upgraded to +0.10.0.x. This step is not necessary for 0.8.X or 0.9.0.1 clients. + +<p><b>For a rolling upgrade:</b></p> + +<ol> + <li> Update server.properties file on all brokers and add the following properties: + <ul> + <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2 or 0.9.0.0).</li> + <li>log.message.format.version=CURRENT_KAFKA_VERSION (See <a href="#upgrade_10_performance_impact">potential performance impact following the upgrade</a> for the details on what this configuration does.) + </ul> + </li> + <li> Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it. </li> + <li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.0.0. NOTE: You shouldn't touch log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 </li> + <li> Restart the brokers one by one for the new protocol version to take effect. </li> + <li> Once all consumers have been upgraded to 0.10.0, change log.message.format.version to 0.10.0 on each broker and restart them one by one. + </li> +</ol> + +<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default. + +<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after. + +<h5><a id="upgrade_10_performance_impact" href="#upgrade_10_performance_impact">Potential performance impact following upgrade to 0.10.0.0</a></h5> +<p> + The message format in 0.10.0 includes a new timestamp field and uses relative offsets for compressed messages. + The on disk message format can be configured through log.message.format.version in the server.properties file. + The default on-disk message format is 0.10.0. If a consumer client is on a version before 0.10.0.0, it only understands + message formats before 0.10.0. In this case, the broker is able to convert messages from the 0.10.0 format to an earlier format + before sending the response to the consumer on an older version. However, the broker can't use zero-copy transfer in this case. + + Reports from the Kafka community on the performance impact have shown CPU utilization going from 20% before to 100% after an upgrade, which forced an immediate upgrade of all clients to bring performance back to normal. + + To avoid such message conversion before consumers are upgraded to 0.10.0.0, one can set log.message.format.version to 0.8.2 or 0.9.0 when upgrading the broker to 0.10.0.0. This way, the broker can still use zero-copy transfer to send the data to the old consumers. Once consumers are upgraded, one can change the message format to 0.10.0 on the broker and enjoy the new message format that includes new timestamp and improved compression. + + The conversion is supported to ensure compatibility and can be useful to support a few apps that have not updated to newer clients yet, but is impractical to support all consumer traffic on even an overprovisioned cluster. Therefore, it is critical to avoid the message conversion as much as possible when brokers have been upgraded but the majority of clients have not. +</p> +<p> + For clients that are upgraded to 0.10.0.0, there is no performance impact. +</p> +<p> + <b>Note:</b> By setting the message format version, one certifies that all existing messages are on or below that + message format version. Otherwise consumers before 0.10.0.0 might break. In particular, after the message format + is set to 0.10.0, one should not change it back to an earlier format as it may break consumers on versions before 0.10.0.0. +</p> +<p> + <b>Note:</b> Due to the additional timestamp introduced in each message, producers sending small messages may see a + message throughput degradation because of the increased overhead. + Likewise, replication now transmits an additional 8 bytes per message. + If you're running close to the network capacity of your cluster, it's possible that you'll overwhelm the network cards + and see failures and performance issues due to the overload. +</p> + <b>Note:</b> If you have enabled compression on producers, you may notice reduced producer throughput and/or + lower compression rate on the broker in some cases. When receiving compressed messages, 0.10.0 + brokers avoid recompressing the messages, which in general reduces the latency and improves the throughput. In + certain cases, however, this may reduce the batching size on the producer, which could lead to worse throughput. If this + happens, users can tune linger.ms and batch.size of the producer for better throughput. In addition, the producer buffer + used for compressing messages with snappy is smaller than the one used by the broker, which may have a negative + impact on the compression ratio for the messages on disk. We intend to make this configurable in a future Kafka + release. +<p> + +</p> + +<h5><a id="upgrade_10_breaking" href="#upgrade_10_breaking">Potential breaking changes in 0.10.0.0</a></h5> +<ul> + <li> Starting from Kafka 0.10.0.0, the message format version in Kafka is represented as the Kafka version. For example, message format 0.9.0 refers to the highest message version supported by Kafka 0.9.0. </li> + <li> Message format 0.10.0 has been introduced and it is used by default. It includes a timestamp field in the messages and relative offsets are used for compressed messages. </li> + <li> ProduceRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0 </li> + <li> FetchRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0 </li> + <li> MessageFormatter interface was changed from <code>def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)</code> to + <code>def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)</code> </li> + <li> MessageReader interface was changed from <code>def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]</code> to + <code>def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]</code> </li> + </li> + <li> MessageFormatter's package was changed from <code>kafka.tools</code> to <code>kafka.common</code> </li> + <li> MessageReader's package was changed from <code>kafka.tools</code> to <code>kafka.common</code> </li> + <li> MirrorMakerMessageHandler no longer exposes the <code>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never called. </li> + <li> The 0.7 KafkaMigrationTool is no longer packaged with Kafka. If you need to migrate from 0.7 to 0.10.0, please migrate to 0.8 first and then follow the documented upgrade process to upgrade from 0.8 to 0.10.0. </li> + <li> The new consumer has standardized its APIs to accept <code>java.util.Collection</code> as the sequence type for method parameters. Existing code may have to be updated to work with the 0.10.0 client library. </li> + <li> LZ4-compressed message handling was changed to use an interoperable framing specification (LZ4f v1.5.1). + To maintain compatibility with old clients, this change only applies to Message format 0.10.0 and later. + Clients that Produce/Fetch LZ4-compressed messages using v0/v1 (Message format 0.9.0) should continue + to use the 0.9.0 framing implementation. Clients that use Produce/Fetch protocols v2 or later + should use interoperable LZ4f framing. A list of interoperable LZ4 libraries is available at http://www.lz4.org/ +</ul> + +<h5><a id="upgrade_10_notable" href="#upgrade_10_notable">Notable changes in 0.10.0.0</a></h5> + +<ul> + <li> Starting from Kafka 0.10.0.0, a new client library named <b>Kafka Streams</b> is available for stream processing on data stored in Kafka topics. This new client library only works with 0.10.x and upward versioned brokers due to message format changes mentioned above. For more information please read <a href="#streams_overview">this section</a>.</li> + <li> The default value of the configuration parameter <code>receive.buffer.bytes</code> is now 64K for the new consumer.</li> + <li> The new consumer now exposes the configuration parameter <code>exclude.internal.topics</code> to restrict internal topics (such as the consumer offsets topic) from accidentally being included in regular expression subscriptions. By default, it is enabled.</li> + <li> The old Scala producer has been deprecated. Users should migrate their code to the Java producer included in the kafka-clients JAR as soon as possible. </li> + <li> The new consumer API has been marked stable. </li> +</ul> + +<h4><a id="upgrade_9" href="#upgrade_9">Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0</a></h4> + +0.9.0.0 has <a href="#upgrade_9_breaking">potential breaking changes</a> (please review before upgrading) and an inter-broker protocol change from previous versions. This means that upgraded brokers and clients may not be compatible with older versions. It is important that you upgrade your Kafka cluster before upgrading your clients. If you are using MirrorMaker downstream clusters should be upgraded first as well. + +<p><b>For a rolling upgrade:</b></p> + +<ol> + <li> Update server.properties file on all brokers and add the following property: inter.broker.protocol.version=0.8.2.X </li> + <li> Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it. </li> + <li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.9.0.0.</li> + <li> Restart the brokers one by one for the new protocol version to take effect </li> +</ol> + +<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default. + +<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after. + +<h5><a id="upgrade_9_breaking" href="#upgrade_9_breaking">Potential breaking changes in 0.9.0.0</a></h5> + +<ul> + <li> Java 1.6 is no longer supported. </li> + <li> Scala 2.9 is no longer supported. </li> + <li> Broker IDs above 1000 are now reserved by default to automatically assigned broker IDs. If your cluster has existing broker IDs above that threshold make sure to increase the reserved.broker.max.id broker configuration property accordingly. </li> + <li> Configuration parameter replica.lag.max.messages was removed. Partition leaders will no longer consider the number of lagging messages when deciding which replicas are in sync. </li> + <li> Configuration parameter replica.lag.time.max.ms now refers not just to the time passed since last fetch request from replica, but also to time since the replica last caught up. Replicas that are still fetching messages from leaders but did not catch up to the latest messages in replica.lag.time.max.ms will be considered out of sync. </li> + <li> Compacted topics no longer accept messages without key and an exception is thrown by the producer if this is attempted. In 0.8.x, a message without key would cause the log compaction thread to subsequently complain and quit (and stop compacting all compacted topics). </li> + <li> MirrorMaker no longer supports multiple target clusters. As a result it will only accept a single --consumer.config parameter. To mirror multiple source clusters, you will need at least one MirrorMaker instance per source cluster, each with its own consumer configuration. </li> + <li> Tools packaged under <em>org.apache.kafka.clients.tools.*</em> have been moved to <em>org.apache.kafka.tools.*</em>. All included scripts will still function as usual, only custom code directly importing these classes will be affected. </li> + <li> The default Kafka JVM performance options (KAFKA_JVM_PERFORMANCE_OPTS) have been changed in kafka-run-class.sh. </li> + <li> The kafka-topics.sh script (kafka.admin.TopicCommand) now exits with non-zero exit code on failure. </li> + <li> The kafka-topics.sh script (kafka.admin.TopicCommand) will now print a warning when topic names risk metric collisions due to the use of a '.' or '_' in the topic name, and error in the case of an actual collision. </li> + <li> The kafka-console-producer.sh script (kafka.tools.ConsoleProducer) will use the Java producer instead of the old Scala producer be default, and users have to specify 'old-producer' to use the old producer. </li> + <li> By default, all command line tools will print all logging messages to stderr instead of stdout. </li> +</ul> + +<h5><a id="upgrade_901_notable" href="#upgrade_901_notable">Notable changes in 0.9.0.1</a></h5> + +<ul> + <li> The new broker id generation feature can be disabled by setting broker.id.generation.enable to false. </li> + <li> Configuration parameter log.cleaner.enable is now true by default. This means topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based on your usage of compacted topics. </li> + <li> Default value of configuration parameter fetch.min.bytes for the new consumer is now 1 by default. </li> +</ul> + +<h5>Deprecations in 0.9.0.0</h5> + +<ul> + <li> Altering topic configuration from the kafka-topics.sh script (kafka.admin.TopicCommand) has been deprecated. Going forward, please use the kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality. </li> + <li> The kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) has been deprecated. Going forward, please use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) for this functionality. </li> + <li> The kafka.tools.ProducerPerformance class has been deprecated. Going forward, please use org.apache.kafka.tools.ProducerPerformance for this functionality (kafka-producer-perf-test.sh will also be changed to use the new class). </li> + <li> The producer config block.on.buffer.full has been deprecated and will be removed in future release. Currently its default value has been changed to false. The KafkaProducer will no longer throw BufferExhaustedException but instead will use max.block.ms value to block, after which it will throw a TimeoutException. If block.on.buffer.full property is set to true explicitly, it will set the max.block.ms to Long.MAX_VALUE and metadata.fetch.timeout.ms will not be honoured</li> +</ul> + +<h4><a id="upgrade_82" href="#upgrade_82">Upgrading from 0.8.1 to 0.8.2</a></h4> + +0.8.2 is fully compatible with 0.8.1. The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it. + +<h4><a id="upgrade_81" href="#upgrade_81">Upgrading from 0.8.0 to 0.8.1</a></h4> + +0.8.1 is fully compatible with 0.8. The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it. + +<h4><a id="upgrade_7" href="#upgrade_7">Upgrading from 0.7</a></h4> + +Release 0.7 is incompatible with newer releases. Major changes were made to the API, ZooKeeper data structures, and protocol, and configuration in order to add replication (Which was missing in 0.7). The upgrade from 0.7 to later versions requires a <a href="https://cwiki.apache.org/confluence/display/KAFKA/Migrating+from+0.7+to+0.8">special tool</a> for migration. This migration can be done without downtime. http://git-wip-us.apache.org/repos/asf/kafka-site/blob/a7c3675d/0102/uses.html ---------------------------------------------------------------------- diff --git a/0102/uses.html b/0102/uses.html new file mode 100644 index 0000000..b86d917 --- /dev/null +++ b/0102/uses.html @@ -0,0 +1,54 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<p> Here is a description of a few of the popular use cases for Apache Kafka. For an overview of a number of these areas in action, see <a href="http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying">this blog post</a>. </p> + +<h4><a id="uses_messaging" href="#uses_messaging">Messaging</a></h4> + +Kafka works well as a replacement for a more traditional message broker. Message brokers are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed messages, etc). In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications. +<p> +In our experience messaging uses are often comparatively low-throughput, but may require low end-to-end latency and often depend on the strong durability guarantees Kafka provides. +<p> +In this domain Kafka is comparable to traditional messaging systems such as <a href="http://activemq.apache.org">ActiveMQ</a> or <a href="https://www.rabbitmq.com">RabbitMQ</a>. + +<h4><a id="uses_website" href="#uses_website">Website Activity Tracking</a></h4> + +The original use case for Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds. This means site activity (page views, searches, or other actions users may take) is published to central topics with one topic per activity type. These feeds are available for subscription for a range of use cases including real-time processing, real-time monitoring, and loading into Hadoop or offline data warehousing systems for offline processing and reporting. +<p> +Activity tracking is often very high volume as many activity messages are generated for each user page view. + +<h4><a id="uses_metrics" href="#uses_metrics">Metrics</a></h4> + +Kafka is often used for operational monitoring data. This involves aggregating statistics from distributed applications to produce centralized feeds of operational data. + +<h4><a id="uses_logs" href="#uses_logs">Log Aggregation</a></h4> + +Many people use Kafka as a replacement for a log aggregation solution. Log aggregation typically collects physical log files off servers and puts them in a central place (a file server or HDFS perhaps) for processing. Kafka abstracts away the details of files and gives a cleaner abstraction of log or event data as a stream of messages. This allows for lower-latency processing and easier support for multiple data sources and distributed data consumption. + +In comparison to log-centric systems like Scribe or Flume, Kafka offers equally good performance, stronger durability guarantees due to replication, and much lower end-to-end latency. + +<h4><a id="uses_streamprocessing" href="#uses_streamprocessing">Stream Processing</a></h4> + +Many users of Kafka process data in processing pipelines consisting of multiple stages, where raw input data is consumed from Kafka topics and then aggregated, enriched, or otherwise transformed into new topics for further consumption or follow-up processing. For example, a processing pipeline for recommending news articles might crawl article content from RSS feeds and publish it to an "articles" topic; further processing might normalize or deduplicate this content and published the cleansed article content to a new topic; a final processing stage might attempt to recommend this content to users. Such processing pipelines create graphs of real-time data flows based on the individual topics. Starting in 0.10.0.0, a light-weight but powerful stream processing library called <a href="#streams_overview">Kafka Streams</a> is available in Apache Kafka to perform such data processing as described above. Apart from Kafka Streams, alternative open source stream processing tools include <a h ref="https://storm.apache.org/">Apache Storm</a> and <a href="http://samza.apache.org/">Apache Samza</a>. + +<h4><a id="uses_eventsourcing" href="#uses_eventsourcing">Event Sourcing</a></h4> + +<a href="http://martinfowler.com/eaaDev/EventSourcing.html">Event sourcing</a> is a style of application design where state changes are logged as a time-ordered sequence of records. Kafka's support for very large stored log data makes it an excellent backend for an application built in this style. + +<h4><a id="uses_commitlog" href="#uses_commitlog">Commit Log</a></h4> + +Kafka can serve as a kind of external commit-log for a distributed system. The log helps replicate data between nodes and acts as a re-syncing mechanism for failed nodes to restore their data. The <a href="/documentation.html#compaction">log compaction</a> feature in Kafka helps support this usage. In this usage Kafka is similar to <a href="http://zookeeper.apache.org/bookkeeper/">Apache BookKeeper</a> project.