I went ahead and created https://issues.apache.org/jira/browse/SPARK-6434
to track this On Fri, Mar 20, 2015 at 2:44 AM, Alberto Rodriguez <ardl...@gmail.com> wrote: > You were absolutely right Cody!! I have just put a message in the kafka > topic before creating the DirectStream and now is working fine! > > Do you think that I should open an issue to warn that the kafka topic must > contain at least one message before the DirectStream creation? > > Thank you very much! You've just made my day ;) > > 2015-03-19 23:08 GMT+01:00 Cody Koeninger <c...@koeninger.org>: > > > Yeah, I wouldn't be shocked if Kafka's metadata apis didn't return > results > > for topics that don't have any messages. (sorry about the triple > negative, > > but I think you get my meaning). > > > > Try putting a message in the topic and seeing what happens. > > > > On Thu, Mar 19, 2015 at 4:38 PM, Alberto Rodriguez <ardl...@gmail.com> > > wrote: > > > >> Thank you for replying, > >> > >> Ted, I have been debuging and the getLeaderOffsets method is not > appending > >> errors because the method findLeaders that is called at the first line > of > >> getLeaderOffsets is not returning leaders. > >> > >> Cody, the topics do not have any messages yet. Could this be an issue?? > >> > >> If you guys want to have a look at the code I've just uploaded it to my > >> github account: big-brother <https://github.com/ardlema/big-brother> > (see > >> > >> DirectKafkaWordCountTest.scala). > >> > >> Thank you again!! > >> > >> 2015-03-19 22:13 GMT+01:00 Cody Koeninger <c...@koeninger.org>: > >> > >> > What is the value of your topics variable, and does it correspond to > >> > topics that already exist on the cluster and have messages in them? > >> > > >> > On Thu, Mar 19, 2015 at 3:10 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> > > >> >> Looking at KafkaCluster#getLeaderOffsets(): > >> >> > >> >> respMap.get(tp).foreach { por: PartitionOffsetsResponse => > >> >> if (por.error == ErrorMapping.NoError) { > >> >> ... > >> >> } else { > >> >> errs.append(ErrorMapping.exceptionFor(por.error)) > >> >> } > >> >> There should be some error other than "Couldn't find leader offsets > for > >> >> Set()" > >> >> > >> >> Can you check again ? > >> >> > >> >> Thanks > >> >> > >> >> On Thu, Mar 19, 2015 at 12:10 PM, Alberto Rodriguez < > ardl...@gmail.com > >> > > >> >> wrote: > >> >> > >> >> > Hi all, > >> >> > > >> >> > I am trying to make the new kafka and spark streaming integration > >> work > >> >> > (direct > >> >> > approach "no receivers" > >> >> > < > http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html > >> >). > >> >> I > >> >> > have created an unit test where I configure and start both > zookeeper > >> and > >> >> > kafka. > >> >> > > >> >> > When I try to create the InputDStream using the createDirectStream > >> >> method > >> >> > of the KafkaUtils class I am getting the following error: > >> >> > > >> >> > org.apache.spark.SparkException:* Couldn't find leader offsets for > >> >> Set()* > >> >> > org.apache.spark.SparkException: org.apache.spark.SparkException: > >> >> Couldn't > >> >> > find leader offsets for Set() > >> >> > at > >> >> > > >> >> > > >> >> > >> > org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413) > >> >> > > >> >> > Following is the code that tries to create the DStream: > >> >> > > >> >> > val messages: InputDStream[(String, String)] = > >> >> > KafkaUtils.createDirectStream[String, String, StringDecoder, > >> >> > StringDecoder]( > >> >> > ssc, kafkaParams, topics) > >> >> > > >> >> > Does anyone faced this problem? > >> >> > > >> >> > Thank you in advance. > >> >> > > >> >> > Kind regards, > >> >> > > >> >> > Alberto > >> >> > > >> >> > >> > > >> > > >> > > > > >