Repository: spark
Updated Branches:
  refs/heads/branch-2.0 dae08fb5a -> b4a89c1c1


[SPARK-16312][STREAMING][KAFKA][DOC] Doc for Kafka 0.10 integration

## What changes were proposed in this pull request?
Doc for the Kafka 0.10 integration

## How was this patch tested?
Scala code examples were taken from my example repo, so hopefully they compile.

Author: cody koeninger <c...@koeninger.org>

Closes #14385 from koeninger/SPARK-16312.

(cherry picked from commit c9f2501af278241f780a38b9562e193755ed5af3)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4a89c1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4a89c1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4a89c1c

Branch: refs/heads/branch-2.0
Commit: b4a89c1c1245cb764e7e214220518442b2225ce5
Parents: dae08fb
Author: cody koeninger <c...@koeninger.org>
Authored: Fri Aug 5 10:13:32 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Aug 5 10:13:41 2016 +0100

----------------------------------------------------------------------
 docs/streaming-kafka-0-10-integration.md | 192 +++++++++++++++++++
 docs/streaming-kafka-0-8-integration.md  | 210 +++++++++++++++++++++
 docs/streaming-kafka-integration.md      | 253 +++++---------------------
 docs/streaming-programming-guide.md      |   4 +-
 4 files changed, 452 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b4a89c1c/docs/streaming-kafka-0-10-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-0-10-integration.md 
b/docs/streaming-kafka-0-10-integration.md
new file mode 100644
index 0000000..44c39e3
--- /dev/null
+++ b/docs/streaming-kafka-0-10-integration.md
@@ -0,0 +1,192 @@
+---
+layout: global
+title: Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 
or higher)
+---
+
+The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 
[Direct Stream 
approach](streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers).
  It provides simple parallelism,  1:1 correspondence between Kafka partitions 
and Spark partitions, and access to offsets and metadata. However, because the 
newer integration uses the [new Kafka consumer 
API](http://kafka.apache.org/documentation.html#newconsumerapi) instead of the 
simple API, there are notable differences in usage. This version of the 
integration is marked as experimental, so the API is potentially subject to 
change.
+
+### Linking
+For Scala/Java applications using SBT/Maven project definitions, link your 
streaming application with the following artifact (see [Linking 
section](streaming-programming-guide.html#linking) in the main programming 
guide for further information).
+
+               groupId = org.apache.spark
+               artifactId = 
spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}
+               version = {{site.SPARK_VERSION_SHORT}}
+
+### Creating a Direct Stream
+ Note that the namespace for the import includes the version, 
org.apache.spark.streaming.kafka010
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+       import org.apache.kafka.clients.consumer.ConsumerRecord
+       import org.apache.kafka.common.serialization.StringDeserializer
+       import org.apache.spark.streaming.kafka010._
+       import 
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
+       import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
+
+       val kafkaParams = Map[String, Object](
+         "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
+         "key.deserializer" -> classOf[StringDeserializer],
+         "value.deserializer" -> classOf[StringDeserializer],
+         "group.id" -> "example",
+         "auto.offset.reset" -> "latest",
+         "enable.auto.commit" -> (false: java.lang.Boolean)
+       )
+
+       val topics = Array("topicA", "topicB")
+       val stream = KafkaUtils.createDirectStream[String, String](
+         streamingContext,
+         PreferConsistent,
+         Subscribe[String, String](topics, kafkaParams)
+       )
+
+       stream.map(record => (record.key, record.value))
+
+Each item in the stream is a 
[ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)
+</div>
+<div data-lang="java" markdown="1">
+</div>
+</div>
+
+For possible kafkaParams, see [Kafka consumer config 
docs](http://kafka.apache.org/documentation.html#newconsumerconfigs).
+Note that enable.auto.commit is disabled, for discussion see [Storing 
Offsets](streaming-kafka-0-10-integration.html#storing-offsets) below.
+
+### LocationStrategies
+The new Kafka consumer API will pre-fetch messages into buffers.  Therefore it 
is important for performance reasons that the Spark integration keep cached 
consumers on executors (rather than recreating them for each batch), and prefer 
to schedule partitions on the host locations that have the appropriate 
consumers.
+
+In most cases, you should use `LocationStrategies.PreferConsistent` as shown 
above.  This will distribute partitions evenly across available executors.  If 
your executors are on the same hosts as your Kafka brokers, use 
`PreferBrokers`, which will prefer to schedule partitions on the Kafka leader 
for that partition.  Finally, if you have a significant skew in load among 
partitions, use `PreferFixed`. This allows you to specify an explicit mapping 
of partitions to hosts (any unspecified partitions will use a consistent 
location).
+
+The cache for consumers has a default maximum size of 64.  If you expect to be 
handling more than (64 * number of executors) Kafka partitions, you can change 
this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`
+
+### ConsumerStrategies
+The new Kafka consumer API has a number of different ways to specify topics, 
some of which require considerable post-object-instantiation setup.  
`ConsumerStrategies` provides an abstraction that allows Spark to obtain 
properly configured consumers even after restart from checkpoint.
+
+`ConsumerStrategies.Subscribe`, as shown above, allows you to subscribe to a 
fixed collection of topics. `SubscribePattern` allows you to use a regex to 
specify topics of interest. Note that unlike the 0.8 integration, using 
`Subscribe` or `SubscribePattern` should respond to adding partitions during a 
running stream. Finally, `Assign` allows you to specify a fixed collection of 
partitions.  All three strategies have overloaded constructors that allow you 
to specify the starting offset for a particular partition.
+
+If you have specific consumer setup needs that are not met by the options 
above, `ConsumerStrategy` is a public class that you can extend.
+
+### Creating an RDD
+If you have a use case that is better suited to batch processing, you can 
create an RDD for a defined range of offsets.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+       // Import dependencies and create kafka params as in Create Direct 
Stream above
+
+       val offsetRanges = Array(
+         // topic, partition, inclusive starting offset, exclusive ending 
offset
+         OffsetRange("test", 0, 0, 100),
+         OffsetRange("test", 1, 0, 100)
+       )
+
+       val rdd = KafkaUtils.createRDD[String, String](sparkContext, 
kafkaParams, offsetRanges, PreferConsistent)
+
+</div>
+<div data-lang="java" markdown="1">
+</div>
+</div>
+
+Note that you cannot use `PreferBrokers`, because without the stream there is 
not a driver-side consumer to automatically look up broker metadata for you.  
Use `PreferFixed` with your own metadata lookups if necessary.
+
+### Obtaining Offsets
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+       stream.foreachRDD { rdd =>
+         val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+         rdd.foreachPartition { iter =>
+           val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
+           println(s"${o.topic} ${o.partition} ${o.fromOffset} 
${o.untilOffset}")
+         }
+       }
+</div>
+<div data-lang="java" markdown="1">
+</div>
+</div>
+
+Note that the typecast to `HasOffsetRanges` will only succeed if it is done in 
the first method called on the result of `createDirectStream`, not later down a 
chain of methods. Be aware that the one-to-one mapping between RDD partition 
and Kafka partition does not remain after any methods that shuffle or 
repartition, e.g. reduceByKey() or window().
+
+### Storing Offsets
+Kafka delivery semantics in the case of failure depend on how and when offsets 
are stored.  Spark output operations are 
[at-least-once](streaming-programming-guide.html#semantics-of-output-operations).
  So if you want the equivalent of exactly-once semantics, you must either 
store offsets after an idempotent output, or store offsets in an atomic 
transaction alongside output. With this integration, you have 3 options, in 
order of increasing reliablity (and code complexity), for how to store offsets.
+
+#### Checkpoints
+If you enable Spark 
[checkpointing](streaming-programming-guide.html#checkpointing), offsets will 
be stored in the checkpoint.  This is easy to enable, but there are drawbacks. 
Your output operation must be idempotent, since you will get repeated outputs; 
transactions are not an option.  Furthermore, you cannot recover from a 
checkpoint if your application code has changed.  For planned upgrades, you can 
mitigate this by running the new code at the same time as the old code (since 
outputs need to be idempotent anyway, they should not clash).  But for 
unplanned failures that require code changes, you will lose data unless you 
have another way to identify known good starting offsets.
+
+#### Kafka itself
+Kafka has an offset commit API that stores offsets in a special Kafka topic.  
By default, the new consumer will periodically auto-commit offsets. This is 
almost certainly not what you want, because messages successfully polled by the 
consumer may not yet have resulted in a Spark output operation, resulting in 
undefined semantics. This is why the stream example above sets 
"enable.auto.commit" to false.  However, you can commit offsets to Kafka after 
you know your output has been stored, using the `commitAsync` API. The benefit 
as compared to checkpoints is that Kafka is a durable store regardless of 
changes to your application code.  However, Kafka is not transactional, so your 
outputs must still be idempotent.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+       stream.foreachRDD { rdd =>
+         val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+
+         // some time later, after outputs have completed
+         stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
+       }
+
+As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if 
called on the result of createDirectStream, not after transformations.  The 
commitAsync call is threadsafe, but must occur after outputs if you want 
meaningful semantics.
+</div>
+<div data-lang="java" markdown="1">
+</div>
+</div>
+
+#### Your own data store
+For data stores that support transactions, saving offsets in the same 
transaction as the results can keep the two in sync, even in failure 
situations.  If you're careful about detecting repeated or skipped offset 
ranges, rolling back the transaction prevents duplicated or lost messages from 
affecting results.  This gives the equivalent of exactly-once semantics.  It is 
also possible to use this tactic even for outputs that result from 
aggregations, which are typically hard to make idempotent.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+       // The details depend on your data store, but the general idea looks 
like this
+
+       // begin from the the offsets committed to the database
+       val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
+         new TopicPartition(resultSet.string("topic")), 
resultSet.int("partition")) -> resultSet.long("offset")
+       }.toMap
+
+       val stream = KafkaUtils.createDirectStream[String, String](
+         streamingContext,
+         PreferConsistent,
+         Assign[String, String](fromOffsets.keys.toList, kafkaParams, 
fromOffsets)
+       )
+
+       stream.foreachRDD { rdd =>
+         val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+
+         val results = yourCalculation(rdd)
+
+         yourTransactionBlock {
+           // update results
+
+           // update offsets where the end of existing offsets matches the 
beginning of this batch of offsets
+
+           // assert that offsets were updated correctly
+         }
+       }
+</div>
+<div data-lang="java" markdown="1">
+</div>
+</div>
+
+### SSL / TLS
+The new Kafka consumer [supports 
SSL](http://kafka.apache.org/documentation.html#security_ssl).  To enable it, 
set kafkaParams appropriately before passing to `createDirectStream` / 
`createRDD`.  Note that this only applies to communication between Spark and 
Kafka brokers; you are still responsible for separately 
[securing](security.html) Spark inter-node communication.
+
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+       val kafkaParams = Map[String, Object](
+         // the usual params, make sure to change the port in 
bootstrap.servers if 9092 is not TLS
+         "security.protocol" -> "SSL",
+         "ssl.truststore.location" -> 
"/some-directory/kafka.client.truststore.jks",
+         "ssl.truststore.password" -> "test1234",
+         "ssl.keystore.location" -> 
"/some-directory/kafka.client.keystore.jks",
+         "ssl.keystore.password" -> "test1234",
+         "ssl.key.password" -> "test1234"
+       )
+</div>
+<div data-lang="java" markdown="1">
+</div>
+</div>
+
+### Deploying
+
+As with any Spark applications, `spark-submit` is used to launch your 
application.
+
+For Scala and Java applications, if you are using SBT or Maven for project 
management, then package 
`spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies 
into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` 
and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` 
dependencies as those are already present in a Spark installation. Then use 
`spark-submit` to launch your application (see [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the main 
programming guide).
+

http://git-wip-us.apache.org/repos/asf/spark/blob/b4a89c1c/docs/streaming-kafka-0-8-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-0-8-integration.md 
b/docs/streaming-kafka-0-8-integration.md
new file mode 100644
index 0000000..da4a845
--- /dev/null
+++ b/docs/streaming-kafka-0-8-integration.md
@@ -0,0 +1,210 @@
+---
+layout: global
+title: Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 
or higher)
+---
+Here we explain how to configure Spark Streaming to receive data from Kafka. 
There are two approaches to this - the old approach using Receivers and Kafka's 
high-level API, and a new approach (introduced in Spark 1.3) without using 
Receivers. They have different programming models, performance characteristics, 
and semantics guarantees, so read on for more details.  Both approaches are 
considered stable APIs as of the current version of Spark.
+
+## Approach 1: Receiver-based Approach
+This approach uses a Receiver to receive the data. The Receiver is implemented 
using the Kafka high-level consumer API. As with all receivers, the data 
received from Kafka through a Receiver is stored in Spark executors, and then 
jobs launched by Spark Streaming processes the data.
+
+However, under default configuration, this approach can lose data under 
failures (see [receiver 
reliability](streaming-programming-guide.html#receiver-reliability). To ensure 
zero-data loss, you have to additionally enable Write Ahead Logs in Spark 
Streaming (introduced in Spark 1.2). This synchronously saves all the received 
Kafka data into write ahead logs on a distributed file system (e.g HDFS), so 
that all the data can be recovered on failure. See [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the 
streaming programming guide for more details on Write Ahead Logs.
+
+Next, we discuss how to use this approach in your streaming application.
+
+1. **Linking:** For Scala/Java applications using SBT/Maven project 
definitions, link your streaming application with the following artifact (see 
[Linking section](streaming-programming-guide.html#linking) in the main 
programming guide for further information).
+
+               groupId = org.apache.spark
+               artifactId = 
spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}
+               version = {{site.SPARK_VERSION_SHORT}}
+
+       For Python applications, you will have to add this above library and 
its dependencies when deploying your application. See the *Deploying* 
subsection below.
+
+2. **Programming:** In the streaming application code, import `KafkaUtils` and 
create an input DStream as follows.
+
+       <div class="codetabs">
+       <div data-lang="scala" markdown="1">
+               import org.apache.spark.streaming.kafka._
+
+               val kafkaStream = KafkaUtils.createStream(streamingContext,
+            [ZK quorum], [consumer group id], [per-topic number of Kafka 
partitions to consume])
+
+    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
+       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
+       </div>
+       <div data-lang="java" markdown="1">
+               import org.apache.spark.streaming.kafka.*;
+
+               JavaPairReceiverInputDStream<String, String> kafkaStream =
+                       KafkaUtils.createStream(streamingContext,
+            [ZK quorum], [consumer group id], [per-topic number of Kafka 
partitions to consume]);
+
+    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
+       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
+
+       </div>
+       <div data-lang="python" markdown="1">
+               from pyspark.streaming.kafka import KafkaUtils
+
+               kafkaStream = KafkaUtils.createStream(streamingContext, \
+                       [ZK quorum], [consumer group id], [per-topic number of 
Kafka partitions to consume])
+
+       By default, the Python API will decode Kafka data as UTF8 encoded 
strings. You can specify your custom decoding function to decode the byte 
arrays in Kafka records to any arbitrary data type. See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
+       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/kafka_wordcount.py).
+       </div>
+       </div>
+
+       **Points to remember:**
+
+       - Topic partitions in Kafka does not correlate to partitions of RDDs 
generated in Spark Streaming. So increasing the number of topic-specific 
partitions in the `KafkaUtils.createStream()` only increases the number of 
threads using which topics that are consumed within a single receiver. It does 
not increase the parallelism of Spark in processing the data. Refer to the main 
document for more information on that.
+
+       - Multiple Kafka input DStreams can be created with different groups 
and topics for parallel receiving of data using multiple receivers.
+
+       - If you have enabled Write Ahead Logs with a replicated file system 
like HDFS, the received data is already being replicated in the log. Hence, the 
storage level in storage level for the input stream to 
`StorageLevel.MEMORY_AND_DISK_SER` (that is, use
+`KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)`).
+
+3. **Deploying:** As with any Spark applications, `spark-submit` is used to 
launch your application. However, the details are slightly different for 
Scala/Java applications and Python applications.
+
+       For Scala and Java applications, if you are using SBT or Maven for 
project management, then package 
`spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}` and its dependencies 
into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` 
and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` 
dependencies as those are already present in a Spark installation. Then use 
`spark-submit` to launch your application (see [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the main 
programming guide).
+
+       For Python applications which lack SBT/Maven project management, 
`spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}` and its dependencies 
can be directly added to `spark-submit` using `--packages` (see [Application 
Submission Guide](submitting-applications.html)). That is,
+
+           ./bin/spark-submit --packages 
org.apache.spark:spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
+
+       Alternatively, you can also download the JAR of the Maven artifact 
`spark-streaming-kafka-0-8-assembly` from the
+       [Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-0-8-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
+
+## Approach 2: Direct Approach (No Receivers)
+This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this feature was introduced in Spark 1.3 
for the Scala and Java API, in Spark 1.4 for the Python API.
+
+This approach has the following advantages over the receiver-based approach 
(i.e. Approach 1).
+
+- *Simplified Parallelism:* No need to create multiple input Kafka streams and 
union them. With `directStream`, Spark Streaming will create as many RDD 
partitions as there are Kafka partitions to consume, which will all read data 
from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD 
partitions, which is easier to understand and tune.
+
+- *Efficiency:* Achieving zero-data loss in the first approach required the 
data to be stored in a Write Ahead Log, which further replicated the data. This 
is actually inefficient as the data effectively gets replicated twice - once by 
Kafka, and a second time by the Write Ahead Log. This second approach 
eliminates the problem as there is no receiver, and hence no need for Write 
Ahead Logs. As long as you have sufficient Kafka retention, messages can be 
recovered from Kafka.
+
+- *Exactly-once semantics:* The first approach uses Kafka's high level API to 
store consumed offsets in Zookeeper. This is traditionally the way to consume 
data from Kafka. While this approach (in combination with write ahead logs) can 
ensure zero data loss (i.e. at-least once semantics), there is a small chance 
some records may get consumed twice under some failures. This occurs because of 
inconsistencies between data reliably received by Spark Streaming and offsets 
tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API 
that does not use Zookeeper. Offsets are tracked by Spark Streaming within its 
checkpoints. This eliminates inconsistencies between Spark Streaming and 
Zookeeper/Kafka, and so each record is received by Spark Streaming effectively 
exactly once despite failures. In order to achieve exactly-once semantics for 
output of your results, your output operation that saves the data to an 
external data store must be either idempotent, or an atomic transa
 ction that saves results and offsets (see [Semantics of output 
operations](streaming-programming-guide.html#semantics-of-output-operations) in 
the main programming guide for further information).
+
+Note that one disadvantage of this approach is that it does not update offsets 
in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show 
progress. However, you can access the offsets processed by this approach in 
each batch and update Zookeeper yourself (see below).
+
+Next, we discuss how to use this approach in your streaming application.
+
+1. **Linking:** This approach is supported only in Scala/Java application. 
Link your SBT/Maven project with the following artifact (see [Linking 
section](streaming-programming-guide.html#linking) in the main programming 
guide for further information).
+
+               groupId = org.apache.spark
+               artifactId = 
spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}
+               version = {{site.SPARK_VERSION_SHORT}}
+
+2. **Programming:** In the streaming application code, import `KafkaUtils` and 
create an input DStream as follows.
+
+       <div class="codetabs">
+       <div data-lang="scala" markdown="1">
+               import org.apache.spark.streaming.kafka._
+
+               val directKafkaStream = KafkaUtils.createDirectStream[
+                       [key class], [value class], [key decoder class], [value 
decoder class] ](
+                       streamingContext, [map of Kafka parameters], [set of 
topics to consume])
+
+       You can also pass a `messageHandler` to `createDirectStream` to access 
`MessageAndMetadata` that contains metadata about the current message and 
transform it to any desired type.
+       See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
+       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala).
+       </div>
+       <div data-lang="java" markdown="1">
+               import org.apache.spark.streaming.kafka.*;
+
+               JavaPairInputDStream<String, String> directKafkaStream =
+                       KafkaUtils.createDirectStream(streamingContext,
+                               [key class], [value class], [key decoder 
class], [value decoder class],
+                               [map of Kafka parameters], [set of topics to 
consume]);
+
+       You can also pass a `messageHandler` to `createDirectStream` to access 
`MessageAndMetadata` that contains metadata about the current message and 
transform it to any desired type.
+       See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
+       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).
+
+       </div>
+       <div data-lang="python" markdown="1">
+               from pyspark.streaming.kafka import KafkaUtils
+               directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})
+
+       You can also pass a `messageHandler` to `createDirectStream` to access 
`KafkaMessageAndMetadata` that contains metadata about the current message and 
transform it to any desired type.
+       By default, the Python API will decode Kafka data as UTF8 encoded 
strings. You can specify your custom decoding function to decode the byte 
arrays in Kafka records to any arbitrary data type. See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
+       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py).
+       </div>
+       </div>
+
+       In the Kafka parameters, you must specify either `metadata.broker.list` 
or `bootstrap.servers`.
+       By default, it will start consuming from the latest offset of each 
Kafka partition. If you set configuration `auto.offset.reset` in Kafka 
parameters to `smallest`, then it will start consuming from the smallest offset.
+
+       You can also start consuming from any arbitrary offset using other 
variations of `KafkaUtils.createDirectStream`. Furthermore, if you want to 
access the Kafka offsets consumed in each batch, you can do the following.
+
+       <div class="codetabs">
+       <div data-lang="scala" markdown="1">
+               // Hold a reference to the current offset ranges, so it can be 
used downstream
+               var offsetRanges = Array[OffsetRange]()
+
+               directKafkaStream.transform { rdd =>
+                 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+                 rdd
+               }.map {
+                  ...
+               }.foreachRDD { rdd =>
+                 for (o <- offsetRanges) {
+                   println(s"${o.topic} ${o.partition} ${o.fromOffset} 
${o.untilOffset}")
+                 }
+                 ...
+               }
+       </div>
+       <div data-lang="java" markdown="1">
+               // Hold a reference to the current offset ranges, so it can be 
used downstream
+               final AtomicReference<OffsetRange[]> offsetRanges = new 
AtomicReference<>();
+
+               directKafkaStream.transformToPair(
+                 new Function<JavaPairRDD<String, String>, JavaPairRDD<String, 
String>>() {
+                   @Override
+                   public JavaPairRDD<String, String> call(JavaPairRDD<String, 
String> rdd) throws Exception {
+                     OffsetRange[] offsets = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+                     offsetRanges.set(offsets);
+                     return rdd;
+                   }
+                 }
+               ).map(
+                 ...
+               ).foreachRDD(
+                 new Function<JavaPairRDD<String, String>, Void>() {
+                   @Override
+                   public Void call(JavaPairRDD<String, String> rdd) throws 
IOException {
+                     for (OffsetRange o : offsetRanges.get()) {
+                       System.out.println(
+                         o.topic() + " " + o.partition() + " " + 
o.fromOffset() + " " + o.untilOffset()
+                       );
+                     }
+                     ...
+                     return null;
+                   }
+                 }
+               );
+       </div>
+       <div data-lang="python" markdown="1">
+               offsetRanges = []
+
+               def storeOffsetRanges(rdd):
+                   global offsetRanges
+                   offsetRanges = rdd.offsetRanges()
+                   return rdd
+
+               def printOffsetRanges(rdd):
+                   for o in offsetRanges:
+                       print "%s %s %s %s" % (o.topic, o.partition, 
o.fromOffset, o.untilOffset)
+
+               directKafkaStream\
+                   .transform(storeOffsetRanges)\
+                   .foreachRDD(printOffsetRanges)
+       </div>
+       </div>
+
+       You can use this to update Zookeeper yourself if you want 
Zookeeper-based Kafka monitoring tools to show progress of the streaming 
application.
+
+       Note that the typecast to HasOffsetRanges will only succeed if it is 
done in the first method called on the directKafkaStream, not later down a 
chain of methods. You can use transform() instead of foreachRDD() as your first 
method call in order to access offsets, then call further Spark methods. 
However, be aware that the one-to-one mapping between RDD partition and Kafka 
partition does not remain after any methods that shuffle or repartition, e.g. 
reduceByKey() or window().
+
+       Another thing to note is that since this approach does not use 
Receivers, the standard receiver-related (that is, 
[configurations](configuration.html) of the form `spark.streaming.receiver.*` ) 
will not apply to the input DStreams created by this approach (will apply to 
other input DStreams though). Instead, use the 
[configurations](configuration.html) `spark.streaming.kafka.*`. An important 
one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate 
(in messages per second) at which each Kafka partition will be read by this 
direct API.
+
+3. **Deploying:** This is same as the first approach.

http://git-wip-us.apache.org/repos/asf/spark/blob/b4a89c1c/docs/streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index e0d3f4f..a8f3667 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -2,209 +2,52 @@
 layout: global
 title: Spark Streaming + Kafka Integration Guide
 ---
-[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging 
rethought as a distributed, partitioned, replicated commit log service. Here we 
explain how to configure Spark Streaming to receive data from Kafka. There are 
two approaches to this - the old approach using Receivers and Kafka's 
high-level API, and a new experimental approach (introduced in Spark 1.3) 
without using Receivers. They have different programming models, performance 
characteristics, and semantics guarantees, so read on for more details.
 
-## Approach 1: Receiver-based Approach
-This approach uses a Receiver to receive the data. The Receiver is implemented 
using the Kafka high-level consumer API. As with all receivers, the data 
received from Kafka through a Receiver is stored in Spark executors, and then 
jobs launched by Spark Streaming processes the data. 
-
-However, under default configuration, this approach can lose data under 
failures (see [receiver 
reliability](streaming-programming-guide.html#receiver-reliability). To ensure 
zero-data loss, you have to additionally enable Write Ahead Logs in Spark 
Streaming (introduced in Spark 1.2). This synchronously saves all the received 
Kafka data into write ahead logs on a distributed file system (e.g HDFS), so 
that all the data can be recovered on failure. See [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the 
streaming programming guide for more details on Write Ahead Logs.
-
-Next, we discuss how to use this approach in your streaming application.
-
-1. **Linking:** For Scala/Java applications using SBT/Maven project 
definitions, link your streaming application with the following artifact (see 
[Linking section](streaming-programming-guide.html#linking) in the main 
programming guide for further information).
-
-               groupId = org.apache.spark
-               artifactId = 
spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}
-               version = {{site.SPARK_VERSION_SHORT}}
-
-       For Python applications, you will have to add this above library and 
its dependencies when deploying your application. See the *Deploying* 
subsection below.
-
-2. **Programming:** In the streaming application code, import `KafkaUtils` and 
create an input DStream as follows.
-
-       <div class="codetabs">
-       <div data-lang="scala" markdown="1">
-               import org.apache.spark.streaming.kafka._
-
-               val kafkaStream = KafkaUtils.createStream(streamingContext, 
-            [ZK quorum], [consumer group id], [per-topic number of Kafka 
partitions to consume])
-
-    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
-       </div>
-       <div data-lang="java" markdown="1">
-               import org.apache.spark.streaming.kafka.*;
-
-               JavaPairReceiverInputDStream<String, String> kafkaStream = 
-                       KafkaUtils.createStream(streamingContext,
-            [ZK quorum], [consumer group id], [per-topic number of Kafka 
partitions to consume]);
-
-    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
-
-       </div>
-       <div data-lang="python" markdown="1">
-               from pyspark.streaming.kafka import KafkaUtils
-
-               kafkaStream = KafkaUtils.createStream(streamingContext, \
-                       [ZK quorum], [consumer group id], [per-topic number of 
Kafka partitions to consume])
-
-       By default, the Python API will decode Kafka data as UTF8 encoded 
strings. You can specify your custom decoding function to decode the byte 
arrays in Kafka records to any arbitrary data type. See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/kafka_wordcount.py).
 
-       </div>
-       </div>
-
-       **Points to remember:**
-
-       - Topic partitions in Kafka does not correlate to partitions of RDDs 
generated in Spark Streaming. So increasing the number of topic-specific 
partitions in the `KafkaUtils.createStream()` only increases the number of 
threads using which topics that are consumed within a single receiver. It does 
not increase the parallelism of Spark in processing the data. Refer to the main 
document for more information on that.
-
-       - Multiple Kafka input DStreams can be created with different groups 
and topics for parallel receiving of data using multiple receivers.
-
-       - If you have enabled Write Ahead Logs with a replicated file system 
like HDFS, the received data is already being replicated in the log. Hence, the 
storage level in storage level for the input stream to 
`StorageLevel.MEMORY_AND_DISK_SER` (that is, use
-`KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)`).
-
-3. **Deploying:** As with any Spark applications, `spark-submit` is used to 
launch your application. However, the details are slightly different for 
Scala/Java applications and Python applications.
-
-       For Scala and Java applications, if you are using SBT or Maven for 
project management, then package 
`spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}` and its dependencies 
into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` 
and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` 
dependencies as those are already present in a Spark installation. Then use 
`spark-submit` to launch your application (see [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the main 
programming guide).
-
-       For Python applications which lack SBT/Maven project management, 
`spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}` and its dependencies 
can be directly added to `spark-submit` using `--packages` (see [Application 
Submission Guide](submitting-applications.html)). That is,
-
-           ./bin/spark-submit --packages 
org.apache.spark:spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
-
-       Alternatively, you can also download the JAR of the Maven artifact 
`spark-streaming-kafka-0-8-assembly` from the
-       [Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-0-8-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
-
-## Approach 2: Direct Approach (No Receivers)
-This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
-
-This approach has the following advantages over the receiver-based approach 
(i.e. Approach 1).
-
-- *Simplified Parallelism:* No need to create multiple input Kafka streams and 
union them. With `directStream`, Spark Streaming will create as many RDD 
partitions as there are Kafka partitions to consume, which will all read data 
from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD 
partitions, which is easier to understand and tune.
-
-- *Efficiency:* Achieving zero-data loss in the first approach required the 
data to be stored in a Write Ahead Log, which further replicated the data. This 
is actually inefficient as the data effectively gets replicated twice - once by 
Kafka, and a second time by the Write Ahead Log. This second approach 
eliminates the problem as there is no receiver, and hence no need for Write 
Ahead Logs. As long as you have sufficient Kafka retention, messages can be 
recovered from Kafka.
-
-- *Exactly-once semantics:* The first approach uses Kafka's high level API to 
store consumed offsets in Zookeeper. This is traditionally the way to consume 
data from Kafka. While this approach (in combination with write ahead logs) can 
ensure zero data loss (i.e. at-least once semantics), there is a small chance 
some records may get consumed twice under some failures. This occurs because of 
inconsistencies between data reliably received by Spark Streaming and offsets 
tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API 
that does not use Zookeeper. Offsets are tracked by Spark Streaming within its 
checkpoints. This eliminates inconsistencies between Spark Streaming and 
Zookeeper/Kafka, and so each record is received by Spark Streaming effectively 
exactly once despite failures. In order to achieve exactly-once semantics for 
output of your results, your output operation that saves the data to an 
external data store must be either idempotent, or an atomic transa
 ction that saves results and offsets (see [Semantics of output 
operations](streaming-programming-guide.html#semantics-of-output-operations) in 
the main programming guide for further information).
-
-Note that one disadvantage of this approach is that it does not update offsets 
in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show 
progress. However, you can access the offsets processed by this approach in 
each batch and update Zookeeper yourself (see below).
-
-Next, we discuss how to use this approach in your streaming application.
-
-1. **Linking:** This approach is supported only in Scala/Java application. 
Link your SBT/Maven project with the following artifact (see [Linking 
section](streaming-programming-guide.html#linking) in the main programming 
guide for further information).
-
-               groupId = org.apache.spark
-               artifactId = 
spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}
-               version = {{site.SPARK_VERSION_SHORT}}
-
-2. **Programming:** In the streaming application code, import `KafkaUtils` and 
create an input DStream as follows.
-
-       <div class="codetabs">
-       <div data-lang="scala" markdown="1">
-               import org.apache.spark.streaming.kafka._
-
-               val directKafkaStream = KafkaUtils.createDirectStream[
-                       [key class], [value class], [key decoder class], [value 
decoder class] ](
-                       streamingContext, [map of Kafka parameters], [set of 
topics to consume])
-
-       You can also pass a `messageHandler` to `createDirectStream` to access 
`MessageAndMetadata` that contains metadata about the current message and 
transform it to any desired type.
-       See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala).
-       </div>
-       <div data-lang="java" markdown="1">
-               import org.apache.spark.streaming.kafka.*;
-
-               JavaPairInputDStream<String, String> directKafkaStream =
-                       KafkaUtils.createDirectStream(streamingContext,
-                               [key class], [value class], [key decoder 
class], [value decoder class],
-                               [map of Kafka parameters], [set of topics to 
consume]);
-
-       You can also pass a `messageHandler` to `createDirectStream` to access 
`MessageAndMetadata` that contains metadata about the current message and 
transform it to any desired type.
-       See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).
-
-       </div>
-       <div data-lang="python" markdown="1">
-               from pyspark.streaming.kafka import KafkaUtils
-               directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})
-
-       You can also pass a `messageHandler` to `createDirectStream` to access 
`KafkaMessageAndMetadata` that contains metadata about the current message and 
transform it to any desired type.
-       By default, the Python API will decode Kafka data as UTF8 encoded 
strings. You can specify your custom decoding function to decode the byte 
arrays in Kafka records to any arbitrary data type. See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py).
-       </div>
-       </div>
-
-       In the Kafka parameters, you must specify either `metadata.broker.list` 
or `bootstrap.servers`.
-       By default, it will start consuming from the latest offset of each 
Kafka partition. If you set configuration `auto.offset.reset` in Kafka 
parameters to `smallest`, then it will start consuming from the smallest 
offset. 
-
-       You can also start consuming from any arbitrary offset using other 
variations of `KafkaUtils.createDirectStream`. Furthermore, if you want to 
access the Kafka offsets consumed in each batch, you can do the following. 
-
-       <div class="codetabs">
-       <div data-lang="scala" markdown="1">
-               // Hold a reference to the current offset ranges, so it can be 
used downstream
-               var offsetRanges = Array[OffsetRange]()
-               
-               directKafkaStream.transform { rdd =>
-                 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-                 rdd
-               }.map {
-                  ...
-               }.foreachRDD { rdd =>
-                 for (o <- offsetRanges) {
-                   println(s"${o.topic} ${o.partition} ${o.fromOffset} 
${o.untilOffset}")
-                 }
-                 ...
-               }
-       </div>
-       <div data-lang="java" markdown="1">
-               // Hold a reference to the current offset ranges, so it can be 
used downstream
-               final AtomicReference<OffsetRange[]> offsetRanges = new 
AtomicReference<>();
-               
-               directKafkaStream.transformToPair(
-                 new Function<JavaPairRDD<String, String>, JavaPairRDD<String, 
String>>() {
-                   @Override
-                   public JavaPairRDD<String, String> call(JavaPairRDD<String, 
String> rdd) throws Exception {
-                     OffsetRange[] offsets = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
-                     offsetRanges.set(offsets);
-                     return rdd;
-                   }
-                 }
-               ).map(
-                 ...
-               ).foreachRDD(
-                 new Function<JavaPairRDD<String, String>, Void>() {
-                   @Override
-                   public Void call(JavaPairRDD<String, String> rdd) throws 
IOException {
-                     for (OffsetRange o : offsetRanges.get()) {
-                       System.out.println(
-                         o.topic() + " " + o.partition() + " " + 
o.fromOffset() + " " + o.untilOffset()
-                       );
-                     }
-                     ...
-                     return null;
-                   }
-                 }
-               );
-       </div>
-       <div data-lang="python" markdown="1">
-               offsetRanges = []
-
-               def storeOffsetRanges(rdd):
-                   global offsetRanges
-                   offsetRanges = rdd.offsetRanges()
-                   return rdd
-
-               def printOffsetRanges(rdd):
-                   for o in offsetRanges:
-                       print "%s %s %s %s" % (o.topic, o.partition, 
o.fromOffset, o.untilOffset)
-
-               directKafkaStream\
-                   .transform(storeOffsetRanges)\
-                   .foreachRDD(printOffsetRanges)
-       </div>
-       </div>
-
-       You can use this to update Zookeeper yourself if you want 
Zookeeper-based Kafka monitoring tools to show progress of the streaming 
application.
-
-       Note that the typecast to HasOffsetRanges will only succeed if it is 
done in the first method called on the directKafkaStream, not later down a 
chain of methods. You can use transform() instead of foreachRDD() as your first 
method call in order to access offsets, then call further Spark methods. 
However, be aware that the one-to-one mapping between RDD partition and Kafka 
partition does not remain after any methods that shuffle or repartition, e.g. 
reduceByKey() or window().
-
-       Another thing to note is that since this approach does not use 
Receivers, the standard receiver-related (that is, 
[configurations](configuration.html) of the form `spark.streaming.receiver.*` ) 
will not apply to the input DStreams created by this approach (will apply to 
other input DStreams though). Instead, use the 
[configurations](configuration.html) `spark.streaming.kafka.*`. An important 
one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate 
(in messages per second) at which each Kafka partition will be read by this 
direct API.
-
-3. **Deploying:** This is same as the first approach.
+[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging 
rethought as a distributed, partitioned, replicated commit log service.  Please 
read the [Kafka documentation](http://kafka.apache.org/documentation.html) 
thoroughly before starting an integration using Spark.
+
+The Kafka project introduced a new consumer api between versions 0.8 and 0.10, 
so there are 2 separate corresponding Spark Streaming packages available.  
Please choose the correct package for your brokers and desired features; note 
that the 0.8 integration is compatible with later 0.9 and 0.10 brokers, but the 
0.10 integration is not compatible with earlier brokers.
+
+
+<table class="table">
+<tr><th></th><th><a 
href="streaming-kafka-0-8-integration.html">spark-streaming-kafka-0-8</a></th><th><a
 
href="streaming-kafka-0-10-integration.html">spark-streaming-kafka-0-10</a></th></tr>
+<tr>
+  <td>Broker Version</td>
+  <td>0.8.2.1 or higher</td>
+  <td>0.10.0 or higher</td>
+</tr>
+<tr>
+  <td>Api Stability</td>
+  <td>Stable</td>
+  <td>Experimental</td>
+</tr>
+<tr>
+  <td>Language Support</td>
+  <td>Scala, Java, Python</td>
+  <td>Scala, Java</td>
+</tr>
+<tr>
+  <td>Receiver DStream</td>
+  <td>Yes</td>
+  <td>No</td>
+</tr>
+<tr>
+  <td>Direct DStream</td>
+  <td>Yes</td>
+  <td>Yes</td>
+</tr>
+<tr>
+  <td>SSL / TLS Support</td>
+  <td>No</td>
+  <td>Yes</td>
+</tr>
+<tr>
+  <td>Offset Commit Api</td>
+  <td>No</td>
+  <td>Yes</td>
+</tr>
+<tr>
+  <td>Dynamic Topic Subscription</td>
+  <td>No</td>
+  <td>Yes</td>
+</tr>
+</table>

http://git-wip-us.apache.org/repos/asf/spark/blob/b4a89c1c/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index e80f1c9..902df6a 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -683,7 +683,7 @@ and add it to the classpath.
 
 Some of these advanced sources are as follows.
 
-- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with 
Kafka 0.8.2.1. See the [Kafka Integration 
Guide](streaming-kafka-integration.html) for more details.
+- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with 
Kafka broker versions 0.8.2.1 or higher. See the [Kafka Integration 
Guide](streaming-kafka-integration.html) for more details.
 
 - **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with 
Flume 1.6.0. See the [Flume Integration 
Guide](streaming-flume-integration.html) for more details.
 
@@ -2350,7 +2350,7 @@ The following table summarizes the semantics under 
failures:
 
 ### With Kafka Direct API
 {:.no_toc}
-In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that 
all the Kafka data is received by Spark Streaming exactly once. Along with 
this, if you implement exactly-once output operation, you can achieve 
end-to-end exactly-once guarantees. This approach (experimental as of Spark 
{{site.SPARK_VERSION_SHORT}}) is further discussed in the [Kafka Integration 
Guide](streaming-kafka-integration.html).
+In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that 
all the Kafka data is received by Spark Streaming exactly once. Along with 
this, if you implement exactly-once output operation, you can achieve 
end-to-end exactly-once guarantees. This approach is further discussed in the 
[Kafka Integration Guide](streaming-kafka-integration.html).
 
 ## Semantics of output operations
 {:.no_toc}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to