You were absolutely right Cody!! I have just put a message in the kafka topic before creating the DirectStream and now is working fine!
Do you think that I should open an issue to warn that the kafka topic must contain at least one message before the DirectStream creation? Thank you very much! You've just made my day ;) 2015-03-19 23:08 GMT+01:00 Cody Koeninger <c...@koeninger.org>: > Yeah, I wouldn't be shocked if Kafka's metadata apis didn't return results > for topics that don't have any messages. (sorry about the triple negative, > but I think you get my meaning). > > Try putting a message in the topic and seeing what happens. > > On Thu, Mar 19, 2015 at 4:38 PM, Alberto Rodriguez <ardl...@gmail.com> > wrote: > >> Thank you for replying, >> >> Ted, I have been debuging and the getLeaderOffsets method is not appending >> errors because the method findLeaders that is called at the first line of >> getLeaderOffsets is not returning leaders. >> >> Cody, the topics do not have any messages yet. Could this be an issue?? >> >> If you guys want to have a look at the code I've just uploaded it to my >> github account: big-brother <https://github.com/ardlema/big-brother> (see >> >> DirectKafkaWordCountTest.scala). >> >> Thank you again!! >> >> 2015-03-19 22:13 GMT+01:00 Cody Koeninger <c...@koeninger.org>: >> >> > What is the value of your topics variable, and does it correspond to >> > topics that already exist on the cluster and have messages in them? >> > >> > On Thu, Mar 19, 2015 at 3:10 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> > >> >> Looking at KafkaCluster#getLeaderOffsets(): >> >> >> >> respMap.get(tp).foreach { por: PartitionOffsetsResponse => >> >> if (por.error == ErrorMapping.NoError) { >> >> ... >> >> } else { >> >> errs.append(ErrorMapping.exceptionFor(por.error)) >> >> } >> >> There should be some error other than "Couldn't find leader offsets for >> >> Set()" >> >> >> >> Can you check again ? >> >> >> >> Thanks >> >> >> >> On Thu, Mar 19, 2015 at 12:10 PM, Alberto Rodriguez <ardl...@gmail.com >> > >> >> wrote: >> >> >> >> > Hi all, >> >> > >> >> > I am trying to make the new kafka and spark streaming integration >> work >> >> > (direct >> >> > approach "no receivers" >> >> > <http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html >> >). >> >> I >> >> > have created an unit test where I configure and start both zookeeper >> and >> >> > kafka. >> >> > >> >> > When I try to create the InputDStream using the createDirectStream >> >> method >> >> > of the KafkaUtils class I am getting the following error: >> >> > >> >> > org.apache.spark.SparkException:* Couldn't find leader offsets for >> >> Set()* >> >> > org.apache.spark.SparkException: org.apache.spark.SparkException: >> >> Couldn't >> >> > find leader offsets for Set() >> >> > at >> >> > >> >> > >> >> >> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413) >> >> > >> >> > Following is the code that tries to create the DStream: >> >> > >> >> > val messages: InputDStream[(String, String)] = >> >> > KafkaUtils.createDirectStream[String, String, StringDecoder, >> >> > StringDecoder]( >> >> > ssc, kafkaParams, topics) >> >> > >> >> > Does anyone faced this problem? >> >> > >> >> > Thank you in advance. >> >> > >> >> > Kind regards, >> >> > >> >> > Alberto >> >> > >> >> >> > >> > >> > >