Debugged, and found in KafkaApis.handleConsumerMetadataRequest that
consumer offsets topic gets created on first lookup of offsets topic
metadata, even when auto topic creation is disabled.

In that method there is following call:

// get metadata (and create the topic if necessary)
val offsetsTopicMetadata =
getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head

and it returns on first call offsetsTopicMetadata with empty
partitionsMetadata sequence and errorCode 5.

By the docs this means:
LeaderNotAvailable => This error is thrown if we are in the middle of a
leadership election and there is currently no leader for this partition and
hence it is unavailable for writes.

Since leadership election takes some time, especially for default 50
partitions of consumer offsets topic, first call to lookup consumer
metadata request will always return an error. This sounds like a bug,
especially if it's not documented as to be expected behavior. I'd prefer if
Kafka broker/controller did this init of consumer offsets topic on startup.

As workaround, as init step of my integration test, I will lookup offsets
topic metadata, until successful or timeout.

Kind regards,
Stevo Slavic.

On Tue, Oct 6, 2015 at 10:02 AM, Stevo Slavić <ssla...@gmail.com> wrote:

> Thanks Grant for quick reply!
>
> I've used AdminUtils.topicExists("__consumer_offsets") check and even
> 10sec after Kafka broker startup, the check fails.
>
> When, on which event, does this internal topic get created? Is there some
> broker config property preventing it from being created? Does one have to
> use high level consumer or make some special request (JoingGroup?) using
> simple consumer API to trigger consumer offsets topic init on broker?
>
> I'm using simple consumer API - I assume exclude.internal.topics,
> offsets.storage or dual.commit.enabled however configured shouldn't affect
> me, since I'm passing OffsetCommitRequest with version id 1, and even more
> I do not even reach point where commit is done, since lookup of consumer
> coordinator is throwing ConsumerCoordinatorNotAvailableException.
>
> Kind regards,
> Stevo Slavic.
>
> On Mon, Oct 5, 2015 at 5:59 PM, Grant Henke <ghe...@cloudera.com> wrote:
>
>> Hi Stevo,
>>
>> There are a couple of options to verify the topic exists:
>>
>>    1. Consume from a topic with "offsets.storage=kafka". If its not
>> created
>>    already, this should create it.
>>    2. List and describe the topic using the Kafka topics script. Ex:
>>
>> bin/kafka-topics.sh --zookeeper localhost:2181 --list
>>
>> bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic
>> __consumer_offsets
>>
>>
>>    1. Check the ZNode exists in Zookeeper. Ex:
>>
>> bin/zookeeper-shell.sh localhost:2181
>> ls /brokers/topics/__consumer_offsets
>>
>> get /brokers/topics/__consumer_offsets
>>
>>
>> Thanks,
>> Grant
>>
>> On Mon, Oct 5, 2015 at 10:44 AM, Stevo Slavić <ssla...@gmail.com> wrote:
>>
>> > Hello Apache Kafka community,
>> >
>> > In my integration tests, with single 0.8.2.2 broker, for newly created
>> > topic with single partition, after determining through topic metadata
>> > request that partition has lead broker assigned, when I try to reset
>> offset
>> > for given consumer group, I first try to discover offset coordinator and
>> > that lookup is throwing ConsumerCoordinatorNotAvailableException
>> >
>> > On
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
>> > it is documented that broker returns ConsumerCoordinatorNotAvailableCode
>> > for consumer metadata requests or offset commit requests if the offsets
>> > topic has not yet been created.
>> >
>> > I wonder if this is really the case, that the offsets topic has not been
>> > created. Any tips how to ensure/verify that offsets topic exists?
>> >
>> > Kind regards,
>> >
>> > Stevo Slavic.
>> >
>>
>>
>>
>> --
>> Grant Henke
>> Software Engineer | Cloudera
>> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>>
>
>

Reply via email to