Has any one run into similar problems. I have experienced the same problem
again. This time when I use kafka-consumer-groups.sh tool it says that my
consumer group is either missing or rebalancing. But when I use the --list
method it shows up on the list. So my guess is it is rebalancing some how.
Again I have a single consumer group per topic with a single consumer in
that group. Wondering it this causes some edge case. This consumer is up as
of now, so I don't know why it would say it is rebalancing.

On Wed, Mar 9, 2016 at 11:05 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> Thanks! That worked. So I see that for the host that I am not getting
> messages for has a massive lag for the 0th partition (the only one I send
> messages on). The other 19 groups are all caught up which explains why they
> have no issues. The lag is just increasing with time which confirms my
> suspicion that no messages are being sent to it. However the owner of the
> consumer is correctly shown to be the right process
> (consumer-1_/ip_of_my_consumer).
>
> I know that I am calling poll regularly since I record metrics when I make
> the poll call. The fact that Kafka consumer's JMX metrics also show 5
> responses a second probably proves that the poll call is being made. I am
> guessing these poll calls yield empty responses and hence my application
> sees no messages.
>
> Is there some known bug where a single consumer group with a single
> consumer can run into such a problem? What happens if a consumer group has
> a single consumer and it misses it's heart beat? Can it get stuck in limbo
> in such a condition? I am guessing the problem will go away if I restart my
> consumer but I want to try to figure out why it happened and how I can
> prevent it.
>
> I did another experiment. I started sending data on all 8 partitions
> instead of the 0th partition only. Now I see that the lag for those other 7
> partitions is 0 i.e. they are all caught up. However the 0th partition
> which is still getting some traffic has the consumer offset at the same
> number always and hence it's lag increasing. It must be in some kind of
> limbo that the other partitions are not affected by.
>
> Thanks,
> Rajiv
>
> On Wed, Mar 9, 2016 at 10:46 PM, Manikumar Reddy <
> manikumar.re...@gmail.com> wrote:
>
>> We need to pass "--new-consumer" property to kafka-consumer-groups.sh
>> command to use new consumer.
>>
>> sh kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --list
>>  --new-consumer
>>
>>
>> On Thu, Mar 10, 2016 at 12:02 PM, Rajiv Kurian <ra...@signalfx.com>
>> wrote:
>>
>> > Hi Guozhang,
>> >
>> > I tried using the kafka-consumer-groups.sh --list command and it says I
>> > have no consumer groups set up at all. Yet I am receiving data on 19
>> out of
>> > 20 consumer processes (each with their own topic and consumer group).
>> >
>> > Here is my full kafka config as printed when my process started up:
>> >
>> > metric.reporters = []
>> >
>> >         metadata.max.age.ms = 300000
>> >
>> >         value.deserializer = class
>> > sf.org.apache.kafka9.common.serialization.ByteArrayDeserializer
>> >
>> >         group.id = myTopic_consumer
>> >
>> >         partition.assignment.strategy =
>> > [sf.org.apache.kafka9.clients.consumer.RangeAssignor]
>> >
>> >         reconnect.backoff.ms = 50
>> >
>> >         sasl.kerberos.ticket.renew.window.factor = 0.8
>> >
>> >         max.partition.fetch.bytes = 1048576
>> >
>> >         bootstrap.servers = [myBroker1:9092, myBroker2:9092,
>> > myBroker3:9092]
>> >
>> >         retry.backoff.ms = 100
>> >
>> >         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> >
>> >         sasl.kerberos.service.name = null
>> >
>> >         sasl.kerberos.ticket.renew.jitter = 0.05
>> >
>> >         ssl.keystore.type = JKS
>> >
>> >         ssl.trustmanager.algorithm = PKIX
>> >
>> >         enable.auto.commit = false
>> >
>> >         ssl.key.password = null
>> >
>> >         fetch.max.wait.ms = 1000
>> >
>> >         sasl.kerberos.min.time.before.relogin = 60000
>> >
>> >         connections.max.idle.ms = 540000
>> >
>> >         ssl.truststore.password = null
>> >
>> >         session.timeout.ms = 30000
>> >
>> >         metrics.num.samples = 2
>> >
>> >         client.id =
>> >
>> >         ssl.endpoint.identification.algorithm = null
>> >
>> >         key.deserializer = class sf.disco.kafka.VoidDeserializer
>> >
>> >         ssl.protocol = TLS
>> >
>> >         check.crcs = true
>> >
>> >         request.timeout.ms = 40000
>> >
>> >         ssl.provider = null
>> >
>> >         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> >
>> >         ssl.keystore.location = null
>> >
>> >         heartbeat.interval.ms = 3000
>> >
>> >         auto.commit.interval.ms = 5000
>> >
>> >         receive.buffer.bytes = 32768
>> >
>> >         ssl.cipher.suites = null
>> >
>> >         ssl.truststore.type = JKS
>> >
>> >         security.protocol = PLAINTEXT
>> >
>> >         ssl.truststore.location = null
>> >
>> >         ssl.keystore.password = null
>> >
>> >         ssl.keymanager.algorithm = SunX509
>> >
>> >         metrics.sample.window.ms = 30000
>> >
>> >         fetch.min.bytes = 256
>> >
>> >         send.buffer.bytes = 131072
>> >
>> >         auto.offset.reset = earliest
>> >
>> > It prints out the group.id field as myTopic_consumer. I was expecting
>> to
>> > get this in the --list command and yet I am not getting it. Is this the
>> > name of the consumer group or am I missing something?
>> >
>> > I use the subscribe call on the consumer and my understanding was that
>> the
>> > subscribe call would do all the work needed to create/join a group.
>> Given I
>> > have a single consumer per group and a single group per topic I'd
>> expect to
>> > see 20 groups (1 for each of my topics). However the --list returns no
>> > groups at all!
>> >
>> > Thanks,
>> > Rajiv
>> >
>> > On Wed, Mar 9, 2016 at 8:22 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >
>> > > Rajiv,
>> > >
>> > > In the new Java consumer you used, the ZK dependency has been removed
>> and
>> > > hence you wont see any data from ZK path.
>> > >
>> > > To check the group metadata you can use the ConsumerGroupCommand,
>> wrapped
>> > > in bin/kafka-consumer-groups.sh.
>> > >
>> > > Guozhang
>> > >
>> > > On Wed, Mar 9, 2016 at 5:48 PM, Rajiv Kurian <ra...@signalfx.com>
>> wrote:
>> > >
>> > > > Don't think I made my questions clear:
>> > > >
>> > > > On Kafka 0.9.0.1 broker and 0.9 consumer how do I tell what my
>> > > > consumer-groups are? Can I still get this information in ZK? I don't
>> > see
>> > > > anything in the consumers folder which is alarming to me. This is
>> > > > especially alarming because I do see that 8 partitions are assigned
>> on
>> > > the
>> > > > consumer (via jmx). I specify the consumer group using:
>> > > >
>> > > > String myConsumerGroupId = myTopic + "_consumer";
>> > > >
>> > > >
>> > >
>> >
>> props.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
>> > > >  myConsumerGroupId);
>> > > >
>> > > > I am running with this setup on about 20 consumers (each consuming a
>> > > unique
>> > > > topic) and I only see one of my consumers not passing any messages
>> to
>> > my
>> > > > application even though I see that the jmx console says it is
>> > receiving 5
>> > > > requests per second. The other 19 seem to be working fine.
>> > > >
>> > > > Each of these 20 topics was created when a message was sent to it
>> i.e.
>> > it
>> > > > was not provisioned from before. Messages currently are only being
>> sent
>> > > to
>> > > > partition 0 even though there are 8 partitions per topic.
>> > > >
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Rajiv
>> > > >
>> > > > On Wed, Mar 9, 2016 at 4:30 PM, Rajiv Kurian <ra...@signalfx.com>
>> > wrote:
>> > > >
>> > > > > Also forgot to mention that when I do consume with the console
>> > > consumer I
>> > > > > do see data coming through.
>> > > > >
>> > > > > On Wed, Mar 9, 2016 at 3:44 PM, Rajiv Kurian <ra...@signalfx.com>
>> > > wrote:
>> > > > >
>> > > > >> I am running the 0.9.0.1 broker with the 0.9 consumer. I am using
>> > the
>> > > > >> subscribe feature on the consumer to subscribe to a topic with 8
>> > > > partitions.
>> > > > >>
>> > > > >> consumer.subscribe(Arrays.asList(myTopic));
>> > > > >>
>> > > > >> I have a single consumer group for said topic and a single
>> process
>> > > > >> subscribed with 8 partitions.
>> > > > >>
>> > > > >> When I use jmx on the consumer I do see that it has 8 partitions
>> > > > assigned
>> > > > >> to it according to the consumer-coordinator-metrics mbean. How
>> can I
>> > > > tell
>> > > > >> what topic it is listening to? I couldn't find this on jmx
>> > anywhere. I
>> > > > do
>> > > > >> see that it is getting 5 responses per second according to the
>> > > > >> consumer-metrics mbean but I don't see any in my actual
>> application.
>> > > > >>
>> > > > >> I consume my messages like this:
>> > > > >>
>> > > > >>     public int poll(SubscriptionDataHandler handler, long
>> timeout) {
>> > > > >>
>> > > > >>         ConsumerRecords<Void, byte[]> records = null;
>> > > > >>
>> > > > >>         try {
>> > > > >>
>> > > > >>             records = consumer.poll(timeout);
>> > > > >>
>> > > > >>         } catch (Exception e) {
>> > > > >>
>> > > > >>             logger.error("Exception polling the Kafka , e);  //
>> > Don't
>> > > > >> see any exceptions here
>> > > > >>
>> > > > >>             return -1;
>> > > > >>
>> > > > >>         }
>> > > > >>
>> > > > >>         int numBuffers = 0;
>> > > > >>
>> > > > >>         if (records != null) {
>> > > > >>
>> > > > >>             for (ConsumerRecord<Void, byte[]> record : records) {
>> > > > >>
>> > > > >>                 byte[] payload = record.value();
>> > > > >>
>> > > > >>                 if (payload != null && payload.length > 0) {
>> > > > >>
>> > > > >>                     ByteBuffer wrappedBuffer =
>> > > ByteBuffer.wrap(payload);
>> > > > >>
>> > > > >>                     try {
>> > > > >>
>> > > > >>                         handler.handleData(wrappedBuffer); //
>> This
>> > is
>> > > > >> never called
>> > > > >>
>> > > > >>                     } catch (Exception e) {
>> > > > >>
>> > > > >>                         logger.error("Exception consuming buffer
>> ,
>> > e);
>> > > > >>
>> > > > >>                     }
>> > > > >>
>> > > > >>                     numBuffers += 1;  // This is never
>> incremented.
>> > > > >>
>> > > > >>                 }
>> > > > >>
>> > > > >>             }
>> > > > >>
>> > > > >>
>> > > > >>             // Commit only after consuming.
>> > > > >>
>> > > > >>             consumer.commitAsync(offsetCommitCallback);
>> > > > >>
>> > > > >>         }
>> > > > >>
>> > > > >> I also don't see any data in the consumers folder in ZK. In fact
>> it
>> > is
>> > > > >> completely empty.
>> > > > >>
>> > > > >> When I use the console-consumer, I do see the console-consumer
>> show
>> > up
>> > > > in
>> > > > >> the consumers folder, but none of my actual consumers show up.
>> > > > >>
>> > > > >> I tried looking for jmx data on the servers too and couldn't
>> quite
>> > > > figure
>> > > > >> out where I can get jmax.
>> > > > >>
>> > > > >> I am trying to figure out why the Kafka consumer thinks it is
>> > getting
>> > > > >> messages ( 5 responses/second according to jmx) but I don't get
>> any
>> > in
>> > > > my
>> > > > >> application.
>> > > > >>
>> > > > >> Thanks,
>> > > > >> Rajiv
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>
>

Reply via email to