Hi Saisai, thanks you for your help, all is working ok now.
Cheers!
2014-10-09 2:49 GMT+02:00 Shao, Saisai saisai.s...@intel.com:
Hi, I think you have to change the code like this to specify the type
info, like this:
* val kafkaStream: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](*
*ssc,*
*kafkaParams,*
*topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)*
You can take a try, actually Kafka unit test also use this API and worked
fine.
Besides, the fixed issue you mentioned below will only be occurred in Java
code calling related API.
Thanks
Jerry
*From:* Antonio Jesus Navarro [mailto:ajnava...@stratio.com]
*Sent:* Wednesday, October 08, 2014 10:04 PM
*To:* user@spark.apache.org
*Subject:* Error reading from Kafka
Hi, I'm trying to read from Kafka. I was able to do it correctly with
this method.
def createStream(
ssc: StreamingContext,
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[(String, String)]
But now I have to add some params to kafka consumer so I've changed to
other createStream method but I'm getting an error:
14/10/08 15:34:10 INFO receiver.ReceiverSupervisorImpl: Deregistering
receiver 0
*14/10/08 15:34:10 ERROR scheduler.ReceiverTracker: Deregistered receiver
for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException:
scala.runtime.Nothing$.init(kafka.utils.VerifiableProperties)*
at java.lang.Class.getConstructor0(Class.java:2849)
at java.lang.Class.getConstructor(Class.java:1718)
at
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
*This is my code. It seems that createStream returns
ReceiverInputDStream[(Nothing, Nothing)] (forced by me to (string, string))
so, I think that try togetConstructor(kafka.utils.VerifiableProperties) by
reflection from Nothing object and don't find the method. *
*val topics = config.getString(nessus.kafka.topics)*
* val numThreads = config.getInt(nessus.kafka.numThreads)*
* val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap*
* val kafkaParams = Map(*
*zookeeper.connect - localhost:2181,*
*group.id http://group.id/ - my-grp)*
* val kafkaStream: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream(ssc,*
*kafkaParams,*
*topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)*
I found that issue https://issues.apache.org/jira/browse/SPARK-2103
https://issues.apache.org/jira/browse/SPARK-2103 but it was solved and
I'm using spark 1.1.0 and scala 2.10 so I don't know what happens.
Any thoughts?
--
http://www.stratio.com/
Avenida de Europa, 26. Ática 5. 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd https://twitter.com/StratioBD*
--
http://www.stratio.com/
Avenida de Europa, 26. Ática 5. 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd https://twitter.com/StratioBD*