Repository: spark
Updated Branches:
  refs/heads/master 284771efb -> a32f0fb73


[SPARK-2103][Streaming] Change to ClassTag for KafkaInputDStream and fix 
reflection issue

This PR updates previous Manifest for KafkaInputDStream's Decoder to ClassTag, 
also fix the problem addressed in 
[SPARK-2103](https://issues.apache.org/jira/browse/SPARK-2103).

Previous Java interface cannot actually get the type of Decoder, so when using 
this Manifest to reconstruct the decode object will meet reflection exception.

Also for other two Java interfaces, ClassTag[String] is useless because calling 
Scala API will get the right implicit ClassTag.

Current Kafka unit test cannot actually verify the interface. I've tested these 
interfaces in my local and distribute settings.

Author: jerryshao <saisai.s...@intel.com>

Closes #1508 from jerryshao/SPARK-2103 and squashes the following commits:

e90c37b [jerryshao] Add Mima excludes
7529810 [jerryshao] Change Manifest to ClassTag for KafkaInputDStream's Decoder 
and fix Decoder construct issue when using Java API


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a32f0fb7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a32f0fb7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a32f0fb7

Branch: refs/heads/master
Commit: a32f0fb73a739c56208cafcd9f08618fb6dd8859
Parents: 284771e
Author: jerryshao <saisai.s...@intel.com>
Authored: Fri Aug 1 04:32:46 2014 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Aug 1 04:32:46 2014 -0700

----------------------------------------------------------------------
 .../spark/streaming/kafka/KafkaInputDStream.scala   | 14 +++++++-------
 .../apache/spark/streaming/kafka/KafkaUtils.scala   | 16 +++++-----------
 project/MimaExcludes.scala                          |  7 ++++++-
 3 files changed, 18 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a32f0fb7/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 38095e8..e20e2c8 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.kafka
 
 import scala.collection.Map
-import scala.reflect.ClassTag
+import scala.reflect.{classTag, ClassTag}
 
 import java.util.Properties
 import java.util.concurrent.Executors
@@ -48,8 +48,8 @@ private[streaming]
 class KafkaInputDStream[
   K: ClassTag,
   V: ClassTag,
-  U <: Decoder[_]: Manifest,
-  T <: Decoder[_]: Manifest](
+  U <: Decoder[_]: ClassTag,
+  T <: Decoder[_]: ClassTag](
     @transient ssc_ : StreamingContext,
     kafkaParams: Map[String, String],
     topics: Map[String, Int],
@@ -66,8 +66,8 @@ private[streaming]
 class KafkaReceiver[
   K: ClassTag,
   V: ClassTag,
-  U <: Decoder[_]: Manifest,
-  T <: Decoder[_]: Manifest](
+  U <: Decoder[_]: ClassTag,
+  T <: Decoder[_]: ClassTag](
     kafkaParams: Map[String, String],
     topics: Map[String, Int],
     storageLevel: StorageLevel
@@ -103,10 +103,10 @@ class KafkaReceiver[
       tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
     }
 
-    val keyDecoder = 
manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+    val keyDecoder = 
classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
       .newInstance(consumerConfig.props)
       .asInstanceOf[Decoder[K]]
-    val valueDecoder = 
manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+    val valueDecoder = 
classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
       .newInstance(consumerConfig.props)
       .asInstanceOf[Decoder[V]]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a32f0fb7/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 86bb91f..48668f7 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -65,7 +65,7 @@ object KafkaUtils {
    *                    in its own thread.
    * @param storageLevel Storage level to use for storing the received objects
    */
-  def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: 
Decoder[_]: Manifest](
+  def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: 
Decoder[_]: ClassTag](
       ssc: StreamingContext,
       kafkaParams: Map[String, String],
       topics: Map[String, Int],
@@ -89,8 +89,6 @@ object KafkaUtils {
       groupId: String,
       topics: JMap[String, JInt]
     ): JavaPairReceiverInputDStream[String, String] = {
-    implicit val cmt: ClassTag[String] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
     createStream(jssc.ssc, zkQuorum, groupId, 
Map(topics.mapValues(_.intValue()).toSeq: _*))
   }
 
@@ -111,8 +109,6 @@ object KafkaUtils {
       topics: JMap[String, JInt],
       storageLevel: StorageLevel
     ): JavaPairReceiverInputDStream[String, String] = {
-    implicit val cmt: ClassTag[String] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
     createStream(jssc.ssc, zkQuorum, groupId, 
Map(topics.mapValues(_.intValue()).toSeq: _*),
       storageLevel)
   }
@@ -140,13 +136,11 @@ object KafkaUtils {
       topics: JMap[String, JInt],
       storageLevel: StorageLevel
     ): JavaPairReceiverInputDStream[K, V] = {
-    implicit val keyCmt: ClassTag[K] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
-    implicit val valueCmt: ClassTag[V] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+    implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
+    implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
 
-    implicit val keyCmd: Manifest[U] = 
implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
-    implicit val valueCmd: Manifest[T] = 
implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
+    implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
+    implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
 
     createStream[K, V, U, T](
       jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: 
_*), storageLevel)

http://git-wip-us.apache.org/repos/asf/spark/blob/a32f0fb7/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 5a835f5..537ca0d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -71,7 +71,12 @@ object MimaExcludes {
               "org.apache.spark.storage.TachyonStore.putValues")
           ) ++
           Seq(
-            
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.streaming.flume.FlumeReceiver.this"),
+            ProblemFilters.exclude[IncompatibleMethTypeProblem](
+              "org.apache.spark.streaming.kafka.KafkaUtils.createStream"),
+            ProblemFilters.exclude[IncompatibleMethTypeProblem](
+              "org.apache.spark.streaming.kafka.KafkaReceiver.this")
           ) ++
           Seq( // Ignore some private methods in ALS.
             ProblemFilters.exclude[MissingMethodProblem](

Reply via email to