Re: Error reading from Kafka

2014-10-09 Thread Antonio Jesus Navarro
Hi Saisai, thanks you for your help, all is working ok now.

Cheers!

2014-10-09 2:49 GMT+02:00 Shao, Saisai saisai.s...@intel.com:

  Hi, I think you have to change the code like this to specify the type
 info, like this:



 *  val kafkaStream: ReceiverInputDStream[(String, String)] =
 KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](*

 *ssc,*

 *kafkaParams,*

 *topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)*



 You can take a try, actually Kafka unit test also use this API and worked
 fine.



 Besides, the fixed issue you mentioned below will only be occurred in Java
 code calling related API.



 Thanks

 Jerry





 *From:* Antonio Jesus Navarro [mailto:ajnava...@stratio.com]
 *Sent:* Wednesday, October 08, 2014 10:04 PM
 *To:* user@spark.apache.org
 *Subject:* Error reading from Kafka



 Hi, I'm trying to read from Kafka.  I was able to do it correctly with
 this method.



 def createStream(

   ssc: StreamingContext,

   zkQuorum: String,

   groupId: String,

   topics: Map[String, Int],

   storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

 ): ReceiverInputDStream[(String, String)]





 But now I have to add some params to kafka consumer so I've changed to
 other createStream method but I'm getting an error:



 14/10/08 15:34:10 INFO receiver.ReceiverSupervisorImpl: Deregistering
 receiver 0

 *14/10/08 15:34:10 ERROR scheduler.ReceiverTracker: Deregistered receiver
 for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException:
 scala.runtime.Nothing$.init(kafka.utils.VerifiableProperties)*

 at java.lang.Class.getConstructor0(Class.java:2849)

 at java.lang.Class.getConstructor(Class.java:1718)

 at
 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106)

 at
 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)

 at
 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)

 at
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)

 at
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)

 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

 at org.apache.spark.scheduler.Task.run(Task.scala:54)

 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)



 *This is my code.   It seems that createStream returns
 ReceiverInputDStream[(Nothing, Nothing)] (forced by me to (string, string))
  so, I think that try togetConstructor(kafka.utils.VerifiableProperties) by
 reflection from Nothing object and don't find the method.  *



   *val topics = config.getString(nessus.kafka.topics)*

 *  val numThreads = config.getInt(nessus.kafka.numThreads)*

 *  val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap*

 *  val kafkaParams = Map(*

 *zookeeper.connect - localhost:2181,*

 *group.id http://group.id/ - my-grp)*



 *  val kafkaStream: ReceiverInputDStream[(String, String)] =
 KafkaUtils.createStream(ssc,*

 *kafkaParams,*

 *topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)*





 I found that issue https://issues.apache.org/jira/browse/SPARK-2103
 https://issues.apache.org/jira/browse/SPARK-2103  but it was solved and
 I'm using spark 1.1.0  and scala 2.10 so I don't know what happens.



 Any thoughts?



 --

 http://www.stratio.com/
 Avenida de Europa, 26. Ática 5. 3ª Planta

 28224 Pozuelo de Alarcón, Madrid

 Tel: +34 91 352 59 42 // *@stratiobd https://twitter.com/StratioBD*




-- 
http://www.stratio.com/
Avenida de Europa, 26. Ática 5. 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd https://twitter.com/StratioBD*


RE: Error reading from Kafka

2014-10-08 Thread Shao, Saisai
Hi, I think you have to change the code like this to specify the type info, 
like this:

  val kafkaStream: ReceiverInputDStream[(String, String)] = 
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)

You can take a try, actually Kafka unit test also use this API and worked fine.

Besides, the fixed issue you mentioned below will only be occurred in Java code 
calling related API.

Thanks
Jerry


From: Antonio Jesus Navarro [mailto:ajnava...@stratio.com]
Sent: Wednesday, October 08, 2014 10:04 PM
To: user@spark.apache.org
Subject: Error reading from Kafka

Hi, I'm trying to read from Kafka.  I was able to do it correctly with this 
method.

def createStream(
  ssc: StreamingContext,
  zkQuorum: String,
  groupId: String,
  topics: Map[String, Int],
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[(String, String)]


But now I have to add some params to kafka consumer so I've changed to other 
createStream method but I'm getting an error:

14/10/08 15:34:10 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0
14/10/08 15:34:10 ERROR scheduler.ReceiverTracker: Deregistered receiver for 
stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: 
scala.runtime.Nothing$.init(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:2849)
at java.lang.Class.getConstructor(Class.java:1718)
at 
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

This is my code.   It seems that createStream returns 
ReceiverInputDStream[(Nothing, Nothing)] (forced by me to (string, string))  
so, I think that try togetConstructor(kafka.utils.VerifiableProperties) by 
reflection from Nothing object and don't find the method.

  val topics = config.getString(nessus.kafka.topics)
  val numThreads = config.getInt(nessus.kafka.numThreads)
  val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
  val kafkaParams = Map(
zookeeper.connect - localhost:2181,
group.idhttp://group.id/ - my-grp)

  val kafkaStream: ReceiverInputDStream[(String, String)] = 
KafkaUtils.createStream(ssc,
kafkaParams,
topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)


I found that issue https://issues.apache.org/jira/browse/SPARK-2103 
https://issues.apache.org/jira/browse/SPARK-2103   but it was solved and I'm 
using spark 1.1.0  and scala 2.10 so I don't know what happens.

Any thoughts?

--
[http://www.stratio.com/wp-content/uploads/2014/05/stratio_logo_2014.png]http://www.stratio.com/
Avenida de Europa, 26. Ática 5. 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // @stratiobdhttps://twitter.com/StratioBD