What is the value of your topics variable, and does it correspond to topics
that already exist on the cluster and have messages in them?

On Thu, Mar 19, 2015 at 3:10 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Looking at KafkaCluster#getLeaderOffsets():
>
>           respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
>             if (por.error == ErrorMapping.NoError) {
> ...
>             } else {
>               errs.append(ErrorMapping.exceptionFor(por.error))
>             }
> There should be some error other than "Couldn't find leader offsets for
> Set()"
>
> Can you check again ?
>
> Thanks
>
> On Thu, Mar 19, 2015 at 12:10 PM, Alberto Rodriguez <ardl...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I am trying to make the new kafka and spark streaming integration work
> > (direct
> > approach "no receivers"
> > <http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>).
> I
> > have created an unit test where I configure and start both zookeeper and
> > kafka.
> >
> > When I try to create the InputDStream using the createDirectStream method
> > of the KafkaUtils class I am getting the following error:
> >
> > org.apache.spark.SparkException:* Couldn't find leader offsets for Set()*
> > org.apache.spark.SparkException: org.apache.spark.SparkException:
> Couldn't
> > find leader offsets for Set()
> > at
> >
> >
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
> >
> > Following is the code that tries to create the DStream:
> >
> > val messages: InputDStream[(String, String)] =
> > KafkaUtils.createDirectStream[String, String, StringDecoder,
> > StringDecoder](
> >         ssc, kafkaParams, topics)
> >
> > Does anyone faced this problem?
> >
> > Thank you in advance.
> >
> > Kind regards,
> >
> > Alberto
> >
>

Reply via email to