The kafka direct stream doesn't do any explicit caching.  I haven't looked
through the underlying simple consumer code in the kafka project in detail,
but I doubt it does either.

Honestly, I'd recommend not using auto created topics (it makes it too easy
to pollute your topics if someone fat-fingers something when interacting
with kafka), and instead explicitly creating topics before using them.

If you're trying to create the topic in your spark job right before using
it with direct stream, I can see how there might possibly be a race
condition - you're using the ZK api, but the direct stream is talking only
to the broker api.

On Thu, Jan 28, 2016 at 6:07 PM, asdf zxcv <benjamin.ha...@gmail.com> wrote:

> Does Spark cache which kafka topics exist? A service incorrectly assumes
> all the relevant topics exist, even if they are empty, causing it to fail.
> Fortunately the service is automatically restarted and by default, kafka
> creates the topic after it is requested.
>
> I'm trying to create the topic if it doesn't exist using
> AdminUtils.createTopic:
>
>       val zkClient = new ZkClient("localhost:2181", 10000, 10000,
> ZKStringSerializer)
>       while (!AdminUtils.topicExists(zkClient, topic)) {
>         AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties())
>       }
>
> But I still get an Error getting partition metadata for 'topic-name'.
> Does the topic exist? when I execute KafkaUtils.createDirectStream
>
> I've also tried to implement a retry with a wait such that the retry
> should occur after Kafka has created the requested topic with 
> auto.create.topics.enable
> = true, but this still doesn't work consistently.
>
> This is a bit frustrating to debug as well since the topic is successfully
> created about 50% of the time, other times I get message "Does the topic
> exist?". My thinking is that Spark may be caching the list of extant kafka
> topics, ignoring that I've added a new one. Is this the case? Am I missing
> something?
>
>
> Ben
>

Reply via email to