Repository: spark
Updated Branches:
  refs/heads/master a1d98c6dc -> 4fbf748bf


[SPARK-21893][BUILD][STREAMING][WIP] Put Kafka 0.8 behind a profile

## What changes were proposed in this pull request?

Put Kafka 0.8 support behind a kafka-0-8 profile.

## How was this patch tested?

Existing tests, but, until PR builder and Jenkins configs are updated the 
effect here is to not build or test Kafka 0.8 support at all.

Author: Sean Owen <so...@cloudera.com>

Closes #19134 from srowen/SPARK-21893.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4fbf748b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4fbf748b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4fbf748b

Branch: refs/heads/master
Commit: 4fbf748bf85b18f32a2cd32b1b1881d24360626e
Parents: a1d98c6
Author: Sean Owen <so...@cloudera.com>
Authored: Wed Sep 13 10:10:40 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Sep 13 10:10:40 2017 +0100

----------------------------------------------------------------------
 dev/create-release/release-build.sh             |  32 +++---
 dev/mima                                        |   2 +-
 dev/scalastyle                                  |   1 +
 dev/sparktestsupport/modules.py                 |   6 ++
 dev/test-dependencies.sh                        |   2 +-
 docs/building-spark.md                          |   9 ++
 docs/streaming-kafka-0-8-integration.md         |  23 ++--
 docs/streaming-kafka-integration.md             |  11 +-
 docs/streaming-programming-guide.md             |   6 +-
 examples/pom.xml                                |   2 +-
 .../streaming/JavaDirectKafkaWordCount.java     |  21 ++--
 .../examples/streaming/JavaKafkaWordCount.java  |  87 ---------------
 .../streaming/DirectKafkaWordCount.scala        |  12 +--
 .../examples/streaming/KafkaWordCount.scala     | 105 -------------------
 .../apache/spark/streaming/kafka/Broker.scala   |   2 +
 .../spark/streaming/kafka/KafkaCluster.scala    |   2 +
 .../spark/streaming/kafka/KafkaUtils.scala      |   1 +
 .../spark/streaming/kafka/OffsetRange.scala     |   3 +
 pom.xml                                         |  10 +-
 project/SparkBuild.scala                        |   8 +-
 python/pyspark/streaming/kafka.py               |  26 ++++-
 python/pyspark/streaming/tests.py               |  14 ++-
 22 files changed, 127 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/dev/create-release/release-build.sh
----------------------------------------------------------------------
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index ee2407a..f4a7f25 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -80,8 +80,17 @@ NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging 
uploads
 BASE_DIR=$(pwd)
 
 MVN="build/mvn --force"
-PUBLISH_PROFILES="-Pmesos -Pyarn -Phive -Phive-thriftserver"
-PUBLISH_PROFILES="$PUBLISH_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
+
+# Hive-specific profiles for some builds
+HIVE_PROFILES="-Phive -Phive-thriftserver"
+# Profiles for publishing snapshots and release to Maven Central
+PUBLISH_PROFILES="-Pmesos -Pyarn $HIVE_PROFILES -Pspark-ganglia-lgpl 
-Pkinesis-asl"
+# Profiles for building binary releases
+BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Psparkr"
+# Scala 2.11 only profiles for some builds
+SCALA_2_11_PROFILES="-Pkafka-0-8"
+# Scala 2.12 only profiles for some builds
+SCALA_2_12_PROFILES="-Pscala-2.12"
 
 rm -rf spark
 git clone https://git-wip-us.apache.org/repos/asf/spark.git
@@ -235,10 +244,9 @@ if [[ "$1" == "package" ]]; then
 
   # We increment the Zinc port each time to avoid OOM's and other craziness if 
multiple builds
   # share the same Zinc server.
-  FLAGS="-Psparkr -Phive -Phive-thriftserver -Pyarn -Pmesos"
-  make_binary_release "hadoop2.6" "-Phadoop-2.6 $FLAGS" "3035" "withr" &
-  make_binary_release "hadoop2.7" "-Phadoop-2.7 $FLAGS" "3036" "withpip" &
-  make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn 
-Pmesos" "3038" &
+  make_binary_release "hadoop2.6" "-Phadoop-2.6 $HIVE_PROFILES 
$SCALA_2_11_PROFILES $BASE_RELEASE_PROFILES" "3035" "withr" &
+  make_binary_release "hadoop2.7" "-Phadoop-2.7 $HIVE_PROFILES 
$SCALA_2_11_PROFILES $BASE_RELEASE_PROFILES" "3036" "withpip" &
+  make_binary_release "without-hadoop" "-Phadoop-provided $SCALA_2_11_PROFILES 
$BASE_RELEASE_PROFILES" "3038" &
   wait
   rm -rf spark-$SPARK_VERSION-bin-*/
 
@@ -304,10 +312,10 @@ if [[ "$1" == "publish-snapshot" ]]; then
   # Generate random point for Zinc
   export ZINC_PORT=$(python -S -c "import random; print 
random.randrange(3030,4030)")
 
-  $MVN -DzincPort=$ZINC_PORT --settings $tmp_settings -DskipTests 
$PUBLISH_PROFILES deploy
+  $MVN -DzincPort=$ZINC_PORT --settings $tmp_settings -DskipTests 
$SCALA_2_11_PROFILES $PUBLISH_PROFILES deploy
   #./dev/change-scala-version.sh 2.12
-  #$MVN -DzincPort=$ZINC_PORT -Pscala-2.12 --settings $tmp_settings \
-  #  -DskipTests $PUBLISH_PROFILES clean deploy
+  #$MVN -DzincPort=$ZINC_PORT --settings $tmp_settings \
+  #  -DskipTests $SCALA_2_12_PROFILES $PUBLISH_PROFILES clean deploy
 
   # Clean-up Zinc nailgun process
   /usr/sbin/lsof -P |grep $ZINC_PORT | grep LISTEN | awk '{ print $2; }' | 
xargs kill
@@ -340,11 +348,11 @@ if [[ "$1" == "publish-release" ]]; then
   # Generate random point for Zinc
   export ZINC_PORT=$(python -S -c "import random; print 
random.randrange(3030,4030)")
 
-  $MVN -DzincPort=$ZINC_PORT -Dmaven.repo.local=$tmp_repo -DskipTests 
$PUBLISH_PROFILES clean install
+  $MVN -DzincPort=$ZINC_PORT -Dmaven.repo.local=$tmp_repo -DskipTests 
$SCALA_2_11_PROFILES $PUBLISH_PROFILES clean install
 
   #./dev/change-scala-version.sh 2.12
-  #$MVN -DzincPort=$ZINC_PORT -Dmaven.repo.local=$tmp_repo -Pscala-2.12 \
-  #  -DskipTests $PUBLISH_PROFILES clean install
+  #$MVN -DzincPort=$ZINC_PORT -Dmaven.repo.local=$tmp_repo \
+  #  -DskipTests $SCALA_2_12_PROFILES §$PUBLISH_PROFILES clean install
 
   # Clean-up Zinc nailgun process
   /usr/sbin/lsof -P |grep $ZINC_PORT | grep LISTEN | awk '{ print $2; }' | 
xargs kill

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/dev/mima
----------------------------------------------------------------------
diff --git a/dev/mima b/dev/mima
index 5501589..fdb21f5 100755
--- a/dev/mima
+++ b/dev/mima
@@ -24,7 +24,7 @@ set -e
 FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
 cd "$FWDIR"
 
-SPARK_PROFILES="-Pmesos -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl 
-Phive-thriftserver -Phive"
+SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl 
-Phive-thriftserver -Phive"
 TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export 
tools/fullClasspath" | tail -n1)"
 OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES 
"export oldDeps/fullClasspath" | tail -n1)"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/dev/scalastyle
----------------------------------------------------------------------
diff --git a/dev/scalastyle b/dev/scalastyle
index f3dec83..e5aa589 100755
--- a/dev/scalastyle
+++ b/dev/scalastyle
@@ -23,6 +23,7 @@ ERRORS=$(echo -e "q\n" \
     | build/sbt \
         -Pkinesis-asl \
         -Pmesos \
+        -Pkafka-0-8 \
         -Pyarn \
         -Phive \
         -Phive-thriftserver \

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 2971e0d..50e14b6 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -249,6 +249,12 @@ streaming_kafka = Module(
         "external/kafka-0-8",
         "external/kafka-0-8-assembly",
     ],
+    build_profile_flags=[
+        "-Pkafka-0-8",
+    ],
+    environ={
+        "ENABLE_KAFKA_0_8_TESTS": "1"
+    },
     sbt_test_goals=[
         "streaming-kafka-0-8/test",
     ]

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/dev/test-dependencies.sh
----------------------------------------------------------------------
diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh
index 2906a81..114b116 100755
--- a/dev/test-dependencies.sh
+++ b/dev/test-dependencies.sh
@@ -29,7 +29,7 @@ export LC_ALL=C
 # TODO: This would be much nicer to do in SBT, once SBT supports Maven-style 
resolution.
 
 # NOTE: These should match those in the release publishing script
-HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pyarn -Phive"
+HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Phive"
 MVN="build/mvn"
 HADOOP_PROFILES=(
     hadoop-2.6

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/docs/building-spark.md
----------------------------------------------------------------------
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 67a2ce7..57baa50 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -90,6 +90,15 @@ like ZooKeeper and Hadoop itself.
 ## Building with Mesos support
 
     ./build/mvn -Pmesos -DskipTests clean package
+    
+## Building with Kafka 0.8 support
+
+Kafka 0.8 support must be explicitly enabled with the `kafka-0-8` profile.
+Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.
+
+    ./build/mvn -Pkafka-0-8 -DskipTests clean package
+
+Kafka 0.10 support is still automatically built.
 
 ## Building submodules individually
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/docs/streaming-kafka-0-8-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-0-8-integration.md 
b/docs/streaming-kafka-0-8-integration.md
index 24a3e4c..9f0671d 100644
--- a/docs/streaming-kafka-0-8-integration.md
+++ b/docs/streaming-kafka-0-8-integration.md
@@ -2,6 +2,9 @@
 layout: global
 title: Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 
or higher)
 ---
+
+**Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.**
+
 Here we explain how to configure Spark Streaming to receive data from Kafka. 
There are two approaches to this - the old approach using Receivers and Kafka's 
high-level API, and a new approach (introduced in Spark 1.3) without using 
Receivers. They have different programming models, performance characteristics, 
and semantics guarantees, so read on for more details.  Both approaches are 
considered stable APIs as of the current version of Spark.
 
 ## Approach 1: Receiver-based Approach
@@ -28,8 +31,7 @@ Next, we discuss how to use this approach in your streaming 
application.
                val kafkaStream = KafkaUtils.createStream(streamingContext,
             [ZK quorum], [consumer group id], [per-topic number of Kafka 
partitions to consume])
 
-    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
+    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$).
        </div>
        <div data-lang="java" markdown="1">
                import org.apache.spark.streaming.kafka.*;
@@ -38,8 +40,7 @@ Next, we discuss how to use this approach in your streaming 
application.
                        KafkaUtils.createStream(streamingContext,
             [ZK quorum], [consumer group id], [per-topic number of Kafka 
partitions to consume]);
 
-    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
+    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html).
 
        </div>
        <div data-lang="python" markdown="1">
@@ -48,8 +49,7 @@ Next, we discuss how to use this approach in your streaming 
application.
                kafkaStream = KafkaUtils.createStream(streamingContext, \
                        [ZK quorum], [consumer group id], [per-topic number of 
Kafka partitions to consume])
 
-       By default, the Python API will decode Kafka data as UTF8 encoded 
strings. You can specify your custom decoding function to decode the byte 
arrays in Kafka records to any arbitrary data type. See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/kafka_wordcount.py).
+       By default, the Python API will decode Kafka data as UTF8 encoded 
strings. You can specify your custom decoding function to decode the byte 
arrays in Kafka records to any arbitrary data type. See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils).
        </div>
        </div>
 
@@ -71,7 +71,7 @@ Next, we discuss how to use this approach in your streaming 
application.
            ./bin/spark-submit --packages 
org.apache.spark:spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
 
        Alternatively, you can also download the JAR of the Maven artifact 
`spark-streaming-kafka-0-8-assembly` from the
-       [Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-0-8-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
+       [Maven 
repository](https://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-0-8-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
 
 ## Approach 2: Direct Approach (No Receivers)
 This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this feature was introduced in Spark 1.3 
for the Scala and Java API, in Spark 1.4 for the Python API.
@@ -105,8 +105,7 @@ Next, we discuss how to use this approach in your streaming 
application.
                        streamingContext, [map of Kafka parameters], [set of 
topics to consume])
 
        You can also pass a `messageHandler` to `createDirectStream` to access 
`MessageAndMetadata` that contains metadata about the current message and 
transform it to any desired type.
-       See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala).
+       See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$).
        </div>
        <div data-lang="java" markdown="1">
                import org.apache.spark.streaming.kafka.*;
@@ -117,8 +116,7 @@ Next, we discuss how to use this approach in your streaming 
application.
                                [map of Kafka parameters], [set of topics to 
consume]);
 
        You can also pass a `messageHandler` to `createDirectStream` to access 
`MessageAndMetadata` that contains metadata about the current message and 
transform it to any desired type.
-       See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).
+       See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html).
 
        </div>
        <div data-lang="python" markdown="1">
@@ -126,8 +124,7 @@ Next, we discuss how to use this approach in your streaming 
application.
                directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})
 
        You can also pass a `messageHandler` to `createDirectStream` to access 
`KafkaMessageAndMetadata` that contains metadata about the current message and 
transform it to any desired type.
-       By default, the Python API will decode Kafka data as UTF8 encoded 
strings. You can specify your custom decoding function to decode the byte 
arrays in Kafka records to any arbitrary data type. See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/direct_kafka_wordcount.py).
+       By default, the Python API will decode Kafka data as UTF8 encoded 
strings. You can specify your custom decoding function to decode the byte 
arrays in Kafka records to any arbitrary data type. See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils).
        </div>
        </div>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/docs/streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index a8f3667..4aca391 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -3,10 +3,11 @@ layout: global
 title: Spark Streaming + Kafka Integration Guide
 ---
 
-[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging 
rethought as a distributed, partitioned, replicated commit log service.  Please 
read the [Kafka documentation](http://kafka.apache.org/documentation.html) 
thoroughly before starting an integration using Spark.
+[Apache Kafka](https://kafka.apache.org/) is publish-subscribe messaging 
rethought as a distributed, partitioned, replicated commit log service.  Please 
read the [Kafka documentation](https://kafka.apache.org/documentation.html) 
thoroughly before starting an integration using Spark.
 
-The Kafka project introduced a new consumer api between versions 0.8 and 0.10, 
so there are 2 separate corresponding Spark Streaming packages available.  
Please choose the correct package for your brokers and desired features; note 
that the 0.8 integration is compatible with later 0.9 and 0.10 brokers, but the 
0.10 integration is not compatible with earlier brokers.
+The Kafka project introduced a new consumer API between versions 0.8 and 0.10, 
so there are 2 separate corresponding Spark Streaming packages available.  
Please choose the correct package for your brokers and desired features; note 
that the 0.8 integration is compatible with later 0.9 and 0.10 brokers, but the 
0.10 integration is not compatible with earlier brokers.
 
+**Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.**
 
 <table class="table">
 <tr><th></th><th><a 
href="streaming-kafka-0-8-integration.html">spark-streaming-kafka-0-8</a></th><th><a
 
href="streaming-kafka-0-10-integration.html">spark-streaming-kafka-0-10</a></th></tr>
@@ -16,9 +17,9 @@ The Kafka project introduced a new consumer api between 
versions 0.8 and 0.10, s
   <td>0.10.0 or higher</td>
 </tr>
 <tr>
-  <td>Api Stability</td>
+  <td>API Maturity</td>
+  <td>Deprecated</td>
   <td>Stable</td>
-  <td>Experimental</td>
 </tr>
 <tr>
   <td>Language Support</td>
@@ -41,7 +42,7 @@ The Kafka project introduced a new consumer api between 
versions 0.8 and 0.10, s
   <td>Yes</td>
 </tr>
 <tr>
-  <td>Offset Commit Api</td>
+  <td>Offset Commit API</td>
   <td>No</td>
   <td>Yes</td>
 </tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index fca0cf8..bc200cd 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -401,14 +401,14 @@ some of the common ones are as follows.
 
 <table class="table">
 <tr><th>Source</th><th>Artifact</th></tr>
-<tr><td> Kafka </td><td> 
spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}} </td></tr>
+<tr><td> Kafka </td><td> 
spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}} </td></tr>
 <tr><td> Flume </td><td> spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} 
</td></tr>
 <tr><td> 
Kinesis<br/></td><td>spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} 
[Amazon Software License] </td></tr>
 <tr><td></td><td></td></tr>
 </table>
 
 For an up-to-date list, please refer to the
-[Maven 
repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
+[Maven 
repository](https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 for the full list of supported sources and artifacts.
 
 ***
@@ -1899,7 +1899,7 @@ To run a Spark Streaming applications, you need to have 
the following.
   if your application uses [advanced sources](#advanced-sources) (e.g. Kafka, 
Flume),
   then you will have to package the extra artifact they link to, along with 
their dependencies,
   in the JAR that is used to deploy the application. For example, an 
application using `KafkaUtils`
-  will have to include 
`spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}` and all its
+  will have to include 
`spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and all its
   transitive dependencies in the application JAR.
 
 - *Configuring sufficient memory for the executors* - Since the received data 
must be stored in

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 81af735..33eca48 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -86,7 +86,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
+      
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
index 5e5ae62..b6b163f 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -26,11 +26,13 @@ import java.util.regex.Pattern;
 
 import scala.Tuple2;
 
-import kafka.serializer.StringDecoder;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.kafka.KafkaUtils;
+import org.apache.spark.streaming.kafka010.ConsumerStrategies;
+import org.apache.spark.streaming.kafka010.KafkaUtils;
+import org.apache.spark.streaming.kafka010.LocationStrategies;
 import org.apache.spark.streaming.Durations;
 
 /**
@@ -65,22 +67,17 @@ public final class JavaDirectKafkaWordCount {
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(2));
 
     Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
-    Map<String, String> kafkaParams = new HashMap<>();
+    Map<String, Object> kafkaParams = new HashMap<>();
     kafkaParams.put("metadata.broker.list", brokers);
 
     // Create direct kafka stream with brokers and topics
-    JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
+    JavaInputDStream<ConsumerRecord<String, String>> messages = 
KafkaUtils.createDirectStream(
         jssc,
-        String.class,
-        String.class,
-        StringDecoder.class,
-        StringDecoder.class,
-        kafkaParams,
-        topicsSet
-    );
+        LocationStrategies.PreferConsistent(),
+        ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
 
     // Get the lines, split them into words, count the words and print
-    JavaDStream<String> lines = messages.map(Tuple2::_2);
+    JavaDStream<String> lines = messages.map(ConsumerRecord::value);
     JavaDStream<String> words = lines.flatMap(x -> 
Arrays.asList(SPACE.split(x)).iterator());
     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new 
Tuple2<>(s, 1))
         .reduceByKey((i1, i2) -> i1 + i2);

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
deleted file mode 100644
index ce5acdc..0000000
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.regex.Pattern;
-
-import scala.Tuple2;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-
-/**
- * Consumes messages from one or more topics in Kafka and does wordcount.
- *
- * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
- *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
- *   <group> is the name of kafka consumer group
- *   <topics> is a list of one or more kafka topics to consume from
- *   <numThreads> is the number of threads the kafka consumer should use
- *
- * To run this example:
- *   `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount 
zoo01,zoo02, \
- *    zoo03 my-consumer-group topic1,topic2 1`
- */
-
-public final class JavaKafkaWordCount {
-  private static final Pattern SPACE = Pattern.compile(" ");
-
-  private JavaKafkaWordCount() {
-  }
-
-  public static void main(String[] args) throws Exception {
-    if (args.length < 4) {
-      System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> 
<topics> <numThreads>");
-      System.exit(1);
-    }
-
-    StreamingExamples.setStreamingLogLevels();
-    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
-    // Create the context with 2 seconds batch size
-    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(2000));
-
-    int numThreads = Integer.parseInt(args[3]);
-    Map<String, Integer> topicMap = new HashMap<>();
-    String[] topics = args[2].split(",");
-    for (String topic: topics) {
-      topicMap.put(topic, numThreads);
-    }
-
-    JavaPairReceiverInputDStream<String, String> messages =
-            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
-
-    JavaDStream<String> lines = messages.map(Tuple2::_2);
-
-    JavaDStream<String> words = lines.flatMap(x -> 
Arrays.asList(SPACE.split(x)).iterator());
-
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new 
Tuple2<>(s, 1))
-        .reduceByKey((i1, i2) -> i1 + i2);
-
-    wordCounts.print();
-    jssc.start();
-    jssc.awaitTermination();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
index 474b03a..def0602 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
@@ -18,11 +18,9 @@
 // scalastyle:off println
 package org.apache.spark.examples.streaming
 
-import kafka.serializer.StringDecoder
-
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.kafka._
+import org.apache.spark.streaming.kafka010._
 
 /**
  * Consumes messages from one or more topics in Kafka and does wordcount.
@@ -57,11 +55,13 @@ object DirectKafkaWordCount {
     // Create direct kafka stream with brokers and topics
     val topicsSet = topics.split(",").toSet
     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
-    val messages = KafkaUtils.createDirectStream[String, String, 
StringDecoder, StringDecoder](
-      ssc, kafkaParams, topicsSet)
+    val messages = KafkaUtils.createDirectStream[String, String](
+      ssc,
+      LocationStrategies.PreferConsistent,
+      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
 
     // Get the lines, split them into words, count the words and print
-    val lines = messages.map(_._2)
+    val lines = messages.map(_.value)
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
     wordCounts.print()

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
deleted file mode 100644
index e7f9bf3..0000000
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import java.util.HashMap
-
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.kafka._
-
-/**
- * Consumes messages from one or more topics in Kafka and does wordcount.
- * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
- *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
- *   <group> is the name of kafka consumer group
- *   <topics> is a list of one or more kafka topics to consume from
- *   <numThreads> is the number of threads the kafka consumer should use
- *
- * Example:
- *    `$ bin/run-example \
- *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
- *      my-consumer-group topic1,topic2 1`
- */
-object KafkaWordCount {
-  def main(args: Array[String]) {
-    if (args.length < 4) {
-      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> 
<numThreads>")
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    val Array(zkQuorum, group, topics, numThreads) = args
-    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
-    val ssc = new StreamingContext(sparkConf, Seconds(2))
-    ssc.checkpoint("checkpoint")
-
-    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
-    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, 
topicMap).map(_._2)
-    val words = lines.flatMap(_.split(" "))
-    val wordCounts = words.map(x => (x, 1L))
-      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
-    wordCounts.print()
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-
-// Produces some random words between 1 and 100.
-object KafkaWordCountProducer {
-
-  def main(args: Array[String]) {
-    if (args.length < 4) {
-      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> 
<topic> " +
-        "<messagesPerSec> <wordsPerMessage>")
-      System.exit(1)
-    }
-
-    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
-
-    // Zookeeper connection properties
-    val props = new HashMap[String, Object]()
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
-    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-      "org.apache.kafka.common.serialization.StringSerializer")
-    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-      "org.apache.kafka.common.serialization.StringSerializer")
-
-    val producer = new KafkaProducer[String, String](props)
-
-    // Send some messages
-    while(true) {
-      (1 to messagesPerSec.toInt).foreach { messageNum =>
-        val str = (1 to wordsPerMessage.toInt).map(x => 
scala.util.Random.nextInt(10).toString)
-          .mkString(" ")
-
-        val message = new ProducerRecord[String, String](topic, null, str)
-        producer.send(message)
-      }
-
-      Thread.sleep(1000)
-    }
-  }
-
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
index 9159051..89ccbe2 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
@@ -23,6 +23,7 @@ import org.apache.spark.annotation.Experimental
  * Represents the host and port info for a Kafka broker.
  * Differs from the Kafka project's internal kafka.cluster.Broker, which 
contains a server ID.
  */
+@deprecated("Update to Kafka 0.10 integration", "2.3.0")
 final class Broker private(
     /** Broker's hostname */
     val host: String,
@@ -49,6 +50,7 @@ final class Broker private(
  * Companion object that provides methods to create instances of [[Broker]].
  */
 @Experimental
+@deprecated("Update to Kafka 0.10 integration", "2.3.0")
 object Broker {
   def create(host: String, port: Int): Broker =
     new Broker(host, port)

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
index e0e44d4..570affa 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -42,6 +42,7 @@ import org.apache.spark.annotation.DeveloperApi
  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
  */
 @DeveloperApi
+@deprecated("Update to Kafka 0.10 integration", "2.3.0")
 class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
   import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
 
@@ -376,6 +377,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) 
extends Serializable {
 }
 
 @DeveloperApi
+@deprecated("Update to Kafka 0.10 integration", "2.3.0")
 object KafkaCluster {
   type Err = ArrayBuffer[Throwable]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 7823072..36082e9 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -41,6 +41,7 @@ import org.apache.spark.streaming.api.java._
 import org.apache.spark.streaming.dstream.{DStream, InputDStream, 
ReceiverInputDStream}
 import org.apache.spark.streaming.util.WriteAheadLogUtils
 
+@deprecated("Update to Kafka 0.10 integration", "2.3.0")
 object KafkaUtils {
   /**
    * Create an input stream that pulls messages from Kafka Brokers.

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
index 10d364f..6dab5f9 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
@@ -30,6 +30,7 @@ import kafka.common.TopicAndPartition
  *   }
  * }}}
  */
+@deprecated("Update to Kafka 0.10 integration", "2.3.0")
 trait HasOffsetRanges {
   def offsetRanges: Array[OffsetRange]
 }
@@ -42,6 +43,7 @@ trait HasOffsetRanges {
  * @param fromOffset Inclusive starting offset
  * @param untilOffset Exclusive ending offset
  */
+@deprecated("Update to Kafka 0.10 integration", "2.3.0")
 final class OffsetRange private(
     val topic: String,
     val partition: Int,
@@ -80,6 +82,7 @@ final class OffsetRange private(
 /**
  * Companion object the provides methods to create instances of 
[[OffsetRange]].
  */
+@deprecated("Update to Kafka 0.10 integration", "2.3.0")
 object OffsetRange {
   def create(topic: String, partition: Int, fromOffset: Long, untilOffset: 
Long): OffsetRange =
     new OffsetRange(topic, partition, fromOffset, untilOffset)

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a051fea..af511c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,8 +104,6 @@
     <module>examples</module>
     <module>repl</module>
     <module>launcher</module>
-    <module>external/kafka-0-8</module>
-    <module>external/kafka-0-8-assembly</module>
     <module>external/kafka-0-10</module>
     <module>external/kafka-0-10-assembly</module>
     <module>external/kafka-0-10-sql</module>
@@ -2654,6 +2652,14 @@
     </profile>
 
     <profile>
+      <id>kafka-0-8</id>
+      <modules>
+        <module>external/kafka-0-8</module>
+        <module>external/kafka-0-8-assembly</module>
+      </modules>
+    </profile>
+
+    <profile>
       <id>test-java-home</id>
       <activation>
         <property><name>env.JAVA_HOME</name></property>

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 748b1c4..a568d26 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -44,9 +44,9 @@ object BuildCommons {
   ).map(ProjectRef(buildLocation, _))
 
   val streamingProjects@Seq(
-    streaming, streamingFlumeSink, streamingFlume, streamingKafka, 
streamingKafka010
+    streaming, streamingFlumeSink, streamingFlume, streamingKafka010
   ) = Seq(
-    "streaming", "streaming-flume-sink", "streaming-flume", 
"streaming-kafka-0-8", "streaming-kafka-0-10"
+    "streaming", "streaming-flume-sink", "streaming-flume", 
"streaming-kafka-0-10"
   ).map(ProjectRef(buildLocation, _))
 
   val allProjects@Seq(
@@ -56,9 +56,9 @@ object BuildCommons {
     "tags", "sketch", "kvstore"
   ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
 
-  val optionallyEnabledProjects@Seq(mesos, yarn, sparkGangliaLgpl,
+  val optionallyEnabledProjects@Seq(mesos, yarn, streamingKafka, 
sparkGangliaLgpl,
     streamingKinesisAsl, dockerIntegrationTests, hadoopCloud) =
-    Seq("mesos", "yarn", "ganglia-lgpl", "streaming-kinesis-asl",
+    Seq("mesos", "yarn", "streaming-kafka-0-8", "ganglia-lgpl", 
"streaming-kinesis-asl",
       "docker-integration-tests", 
"hadoop-cloud").map(ProjectRef(buildLocation, _))
 
   val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, 
streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) 
=

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/python/pyspark/streaming/kafka.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kafka.py 
b/python/pyspark/streaming/kafka.py
index 9d1a6ec..4af4135 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -55,6 +55,8 @@ class KafkaUtils(object):
         :param keyDecoder:  A function used to decode key (default is 
utf8_decoder)
         :param valueDecoder:  A function used to decode value (default is 
utf8_decoder)
         :return: A DStream object
+
+        .. note:: Deprecated in 2.3.0
         """
         if kafkaParams is None:
             kafkaParams = dict()
@@ -77,8 +79,6 @@ class KafkaUtils(object):
                            keyDecoder=utf8_decoder, valueDecoder=utf8_decoder,
                            messageHandler=None):
         """
-        .. note:: Experimental
-
         Create an input stream that directly pulls messages from a Kafka 
Broker and specific offset.
 
         This is not a receiver based Kafka input stream, it directly pulls the 
message from Kafka
@@ -103,6 +103,9 @@ class KafkaUtils(object):
         :param messageHandler: A function used to convert 
KafkaMessageAndMetadata. You can assess
                                meta using messageHandler (default is None).
         :return: A DStream object
+
+        .. note:: Experimental
+        .. note:: Deprecated in 2.3.0
         """
         if fromOffsets is None:
             fromOffsets = dict()
@@ -142,8 +145,6 @@ class KafkaUtils(object):
                   keyDecoder=utf8_decoder, valueDecoder=utf8_decoder,
                   messageHandler=None):
         """
-        .. note:: Experimental
-
         Create an RDD from Kafka using offset ranges for each topic and 
partition.
 
         :param sc:  SparkContext object
@@ -156,6 +157,9 @@ class KafkaUtils(object):
         :param messageHandler: A function used to convert 
KafkaMessageAndMetadata. You can assess
                                meta using messageHandler (default is None).
         :return: An RDD object
+
+        .. note:: Experimental
+        .. note:: Deprecated in 2.3.0
         """
         if leaders is None:
             leaders = dict()
@@ -224,6 +228,8 @@ 
________________________________________________________________________________
 class OffsetRange(object):
     """
     Represents a range of offsets from a single Kafka TopicAndPartition.
+
+    .. note:: Deprecated in 2.3.0
     """
 
     def __init__(self, topic, partition, fromOffset, untilOffset):
@@ -263,6 +269,8 @@ class OffsetRange(object):
 class TopicAndPartition(object):
     """
     Represents a specific topic and partition for Kafka.
+
+    .. note:: Deprecated in 2.3.0
     """
 
     def __init__(self, topic, partition):
@@ -294,6 +302,8 @@ class TopicAndPartition(object):
 class Broker(object):
     """
     Represent the host and port info for a Kafka broker.
+
+    .. note:: Deprecated in 2.3.0
     """
 
     def __init__(self, host, port):
@@ -312,6 +322,8 @@ class Broker(object):
 class KafkaRDD(RDD):
     """
     A Python wrapper of KafkaRDD, to provide additional information on normal 
RDD.
+
+    .. note:: Deprecated in 2.3.0
     """
 
     def __init__(self, jrdd, ctx, jrdd_deserializer):
@@ -332,6 +344,8 @@ class KafkaRDD(RDD):
 class KafkaDStream(DStream):
     """
     A Python wrapper of KafkaDStream
+
+    .. note:: Deprecated in 2.3.0
     """
 
     def __init__(self, jdstream, ssc, jrdd_deserializer):
@@ -368,6 +382,8 @@ class KafkaDStream(DStream):
 class KafkaTransformedDStream(TransformedDStream):
     """
     Kafka specific wrapper of TransformedDStream to transform on Kafka RDD.
+
+    .. note:: Deprecated in 2.3.0
     """
 
     def __init__(self, prev, func):
@@ -388,6 +404,8 @@ class KafkaTransformedDStream(TransformedDStream):
 class KafkaMessageAndMetadata(object):
     """
     Kafka message and metadata information. Including topic, partition, offset 
and message
+
+    .. note:: Deprecated in 2.3.0
     """
 
     def __init__(self, topic, partition, offset, key, message):

http://git-wip-us.apache.org/repos/asf/spark/blob/4fbf748b/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index ffba995..229cf53 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1516,7 +1516,10 @@ def search_kinesis_asl_assembly_jar():
         return jars[0]
 
 
-# Must be same as the variable and condition defined in KinesisTestUtils.scala
+# Must be same as the variable and condition defined in modules.py
+kafka_test_environ_var = "ENABLE_KAFKA_0_8_TESTS"
+are_kafka_tests_enabled = os.environ.get(kafka_test_environ_var) == '1'
+# Must be same as the variable and condition defined in KinesisTestUtils.scala 
and modules.py
 kinesis_test_environ_var = "ENABLE_KINESIS_TESTS"
 are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1'
 
@@ -1535,9 +1538,16 @@ if __name__ == "__main__":
 
     os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
     testcases = [BasicOperationTests, WindowFunctionTests, 
StreamingContextTests, CheckpointTests,
-                 KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests,
+                 FlumeStreamTests, FlumePollingStreamTests,
                  StreamingListenerTests]
 
+    if are_kafka_tests_enabled:
+        testcases.append(KafkaStreamTests)
+    else:
+        sys.stderr.write(
+            "Skipped test_kafka_stream (enable by setting environment variable 
%s=1"
+            % kafka_test_environ_var)
+
     if kinesis_jar_present is True:
         testcases.append(KinesisStreamTests)
     elif are_kinesis_tests_enabled is False:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to