[ 
https://issues.apache.org/jira/browse/SPARK-6431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14481266#comment-14481266
 ] 

Cody Koeninger commented on SPARK-6431:
---------------------------------------

I think this got mis-diagnosed on the mailing list, sorry for the confusion.

The only way I've been able to reproduce that exception is by trying to start a 
stream for a topic that doesn't exist at all.  Alberto, did you actually run 
kafka-topics.sh --create before starting the job, or in some other way create 
the topic?  Pretty sure what happened here is that your topic didn't exist the 
first time you ran the job.  Your brokers were set to auto-create topics, so it 
did exist the next time you ran the job.  Putting a message into the topic 
didn't have anything to do with it.

Here's why I think that's what happened.  Following console session is an 
example, where "empty" topic existed prior to starting the console, but had no 
messages.  Topic "hasonemesssage" existed and had one message in it.  Topic 
"doesntexistyet" didn't exist at the beginning of the console.

The metadata apis return the same info for existing-but-empty topics as they do 
for topics with messages in them:

scala> kc.getPartitions(Set("empty")).right
res0: 
scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]]
 = RightProjection(Right(
Set([empty,0], [empty,1])))

scala> kc.getPartitions(Set("hasonemessage")).right
res1: 
scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]]
 = RightProjection(Right(Set([hasonemessage,0], [hasonemessage,1])))


Leader offsets are both 0 for the empty topic, as you'd expect:

scala> kc.getLatestLeaderOffsets(kc.getPartitions(Set("empty")).right.get)
res5: 
Either[org.apache.spark.streaming.kafka.KafkaCluster.Err,Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset]]
 = Right(Map([empty,1] -> LeaderOffset(localhost,9094,0), [empty,0] -> 
LeaderOffset(localhost,9093,0)))

And one of the leader offsets is 1 for the topic with one message:

scala> 
kc.getLatestLeaderOffsets(kc.getPartitions(Set("hasonemessage")).right.get)
res6: 
Either[org.apache.spark.streaming.kafka.KafkaCluster.Err,Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset]]
 = Right(Map([hasonemessage,0] -> LeaderOffset(localhost,9092,1), 
[hasonemessage,1] -> LeaderOffset(localhost,9093,0)))


The first time a metadata request is made against the non-existing topic, it 
returns empty:

kc.getPartitions(Set("doesntexistyet")).right
res2: 
scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]]
 = RightProjection(Right(Set()))


But if your brokers are configured with auto.create.topics.enable set to true, 
that metadata request alone is enough to trigger creation of the topic.  
Requesting it again shows that the topic has been created:

scala> kc.getPartitions(Set("doesntexistyet")).right
res3: 
scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]]
 = RightProjection(Right(Set([doesntexistyet,0], [doesntexistyet,1])))


If you don't think that explains what happened, please let me know if you have 
a way of reproducing that exception against an existing-but-empty topic, 
because I cant.

As far as what to do about this, my instinct is to just improve the error 
handling for the getPartitions call.  If the topic doesn't exist yet, It 
shouldn't be returning an empty set, it should be returning an error.


> Couldn't find leader offsets exception when creating KafkaDirectStream
> ----------------------------------------------------------------------
>
>                 Key: SPARK-6431
>                 URL: https://issues.apache.org/jira/browse/SPARK-6431
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.0
>            Reporter: Alberto
>
> When I try to create an InputDStream using the createDirectStream method of 
> the KafkaUtils class and the kafka topic does not have any messages yet am 
> getting the following error:
> org.apache.spark.SparkException: Couldn't find leader offsets for Set()
> org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't 
> find leader offsets for Set()
>       at 
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
> If I put a message in the topic before creating the DirectStream everything 
> works fine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to