As you mentioned you hope to transplant latest version of Spark into Kafka 0.7 
in another mail, there are some notes you should take care:


1.      Kafka 0.7+ can only be compiled with Scala 2.8, while now Spark is 
compiled with Scala 2.10, there is no binary compatible between these two Scala 
versions. So you have to modify Kafka code as previously Spark did to fix Scala 
problem.

2.      High Level Consumer API changes between Kafka 0.7 and 0.8, so you have 
to modify KafkaInputDStream in Spark Streaming.

Thanks
Jerry

From: Hemanth Yamijala [mailto:yhema...@gmail.com]
Sent: Tuesday, September 09, 2014 1:19 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: Re: Setting Kafka parameters in Spark Streaming

Thanks, Shao, for providing the necessary information.

Hemanth

On Tue, Sep 9, 2014 at 8:21 AM, Shao, Saisai 
<saisai.s...@intel.com<mailto:saisai.s...@intel.com>> wrote:
Hi Hemanth,

I think there is a bug in this API in Spark 0.8.1, so you will meet this 
exception when using Java code with this API, this bug is fixed in latest 
version, as you can see the patch (https://github.com/apache/spark/pull/1508). 
But it’s only for Kafka 0.8+, as you still use kafka 0.7, you can modify the 
Spark code according to this patch and rebuild. Still highly recommend to use 
latest version of Spark and Kafka, there are lots of improvements in streaming 
field.

Thanks
Jerry

From: Hemanth Yamijala [mailto:yhema...@gmail.com<mailto:yhema...@gmail.com>]
Sent: Tuesday, September 09, 2014 12:49 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Setting Kafka parameters in Spark Streaming

Hi,

I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the parameter 
fetch.message.max.bytes when creating the Kafka DStream. The only API that 
seems to allow this is the following:

kafkaStream[T, D <: kafka.serializer.Decoder[_]](typeClass: Class[T], 
decoderClass: Class[D], kafkaParams: Map[String, String], topics: Map[String, 
Integer], storageLevel: StorageLevel)
I tried to call this as so:
context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, 
StorageLevel.MEMORY_AND_DISK())
However, this is causing an exception like:
java.lang.ClassCastException: java.lang.Object cannot be cast to 
kafka.serializer.Decoder
    at 
org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105)
    at 
org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125)
    at 
org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158)
    at 
org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154)
Can anyone provide help on how to set these parameters ?
Thanks
Hemanth

Reply via email to