As you mentioned you hope to transplant latest version of Spark into Kafka 0.7 in another mail, there are some notes you should take care:
1. Kafka 0.7+ can only be compiled with Scala 2.8, while now Spark is compiled with Scala 2.10, there is no binary compatible between these two Scala versions. So you have to modify Kafka code as previously Spark did to fix Scala problem. 2. High Level Consumer API changes between Kafka 0.7 and 0.8, so you have to modify KafkaInputDStream in Spark Streaming. Thanks Jerry From: Hemanth Yamijala [mailto:yhema...@gmail.com] Sent: Tuesday, September 09, 2014 1:19 PM To: Shao, Saisai Cc: user@spark.apache.org Subject: Re: Setting Kafka parameters in Spark Streaming Thanks, Shao, for providing the necessary information. Hemanth On Tue, Sep 9, 2014 at 8:21 AM, Shao, Saisai <saisai.s...@intel.com<mailto:saisai.s...@intel.com>> wrote: Hi Hemanth, I think there is a bug in this API in Spark 0.8.1, so you will meet this exception when using Java code with this API, this bug is fixed in latest version, as you can see the patch (https://github.com/apache/spark/pull/1508). But it’s only for Kafka 0.8+, as you still use kafka 0.7, you can modify the Spark code according to this patch and rebuild. Still highly recommend to use latest version of Spark and Kafka, there are lots of improvements in streaming field. Thanks Jerry From: Hemanth Yamijala [mailto:yhema...@gmail.com<mailto:yhema...@gmail.com>] Sent: Tuesday, September 09, 2014 12:49 AM To: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Setting Kafka parameters in Spark Streaming Hi, I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the parameter fetch.message.max.bytes when creating the Kafka DStream. The only API that seems to allow this is the following: kafkaStream[T, D <: kafka.serializer.Decoder[_]](typeClass: Class[T], decoderClass: Class[D], kafkaParams: Map[String, String], topics: Map[String, Integer], storageLevel: StorageLevel) I tried to call this as so: context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK()) However, this is causing an exception like: java.lang.ClassCastException: java.lang.Object cannot be cast to kafka.serializer.Decoder at org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125) at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158) at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154) Can anyone provide help on how to set these parameters ? Thanks Hemanth