Thanks! That worked. So I see that for the host that I am not getting
messages for has a massive lag for the 0th partition (the only one I send
messages on). The other 19 groups are all caught up which explains why they
have no issues. The lag is just increasing with time which confirms my
suspicion that no messages are being sent to it. However the owner of the
consumer is correctly shown to be the right process

I know that I am calling poll regularly since I record metrics when I make
the poll call. The fact that Kafka consumer's JMX metrics also show 5
responses a second probably proves that the poll call is being made. I am
guessing these poll calls yield empty responses and hence my application
sees no messages.

Is there some known bug where a single consumer group with a single
consumer can run into such a problem? What happens if a consumer group has
a single consumer and it misses it's heart beat? Can it get stuck in limbo
in such a condition? I am guessing the problem will go away if I restart my
consumer but I want to try to figure out why it happened and how I can
prevent it.

I did another experiment. I started sending data on all 8 partitions
instead of the 0th partition only. Now I see that the lag for those other 7
partitions is 0 i.e. they are all caught up. However the 0th partition
which is still getting some traffic has the consumer offset at the same
number always and hence it's lag increasing. It must be in some kind of
limbo that the other partitions are not affected by.


On Wed, Mar 9, 2016 at 10:46 PM, Manikumar Reddy <>

> We need to pass "--new-consumer" property to
> command to use new consumer.
> sh  --bootstrap-server localhost:9092 --list
>  --new-consumer
> On Thu, Mar 10, 2016 at 12:02 PM, Rajiv Kurian <> wrote:
> > Hi Guozhang,
> >
> > I tried using the --list command and it says I
> > have no consumer groups set up at all. Yet I am receiving data on 19 out
> of
> > 20 consumer processes (each with their own topic and consumer group).
> >
> > Here is my full kafka config as printed when my process started up:
> >
> > metric.reporters = []
> >
> > = 300000
> >
> >         value.deserializer = class
> >
> >
> > = myTopic_consumer
> >
> >         partition.assignment.strategy =
> > []
> >
> > = 50
> >
> >         sasl.kerberos.ticket.renew.window.factor = 0.8
> >
> >         max.partition.fetch.bytes = 1048576
> >
> >         bootstrap.servers = [myBroker1:9092, myBroker2:9092,
> > myBroker3:9092]
> >
> > = 100
> >
> >         sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >
> > = null
> >
> >         sasl.kerberos.ticket.renew.jitter = 0.05
> >
> >         ssl.keystore.type = JKS
> >
> >         ssl.trustmanager.algorithm = PKIX
> >
> > = false
> >
> >         ssl.key.password = null
> >
> > = 1000
> >
> >         sasl.kerberos.min.time.before.relogin = 60000
> >
> > = 540000
> >
> >         ssl.truststore.password = null
> >
> > = 30000
> >
> >         metrics.num.samples = 2
> >
> > =
> >
> >         ssl.endpoint.identification.algorithm = null
> >
> >         key.deserializer = class sf.disco.kafka.VoidDeserializer
> >
> >         ssl.protocol = TLS
> >
> >         check.crcs = true
> >
> > = 40000
> >
> >         ssl.provider = null
> >
> >         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >
> >         ssl.keystore.location = null
> >
> > = 3000
> >
> > = 5000
> >
> >         receive.buffer.bytes = 32768
> >
> >         ssl.cipher.suites = null
> >
> >         ssl.truststore.type = JKS
> >
> >         security.protocol = PLAINTEXT
> >
> >         ssl.truststore.location = null
> >
> >         ssl.keystore.password = null
> >
> >         ssl.keymanager.algorithm = SunX509
> >
> > = 30000
> >
> >         fetch.min.bytes = 256
> >
> >         send.buffer.bytes = 131072
> >
> >         auto.offset.reset = earliest
> >
> > It prints out the field as myTopic_consumer. I was expecting to
> > get this in the --list command and yet I am not getting it. Is this the
> > name of the consumer group or am I missing something?
> >
> > I use the subscribe call on the consumer and my understanding was that
> the
> > subscribe call would do all the work needed to create/join a group.
> Given I
> > have a single consumer per group and a single group per topic I'd expect
> to
> > see 20 groups (1 for each of my topics). However the --list returns no
> > groups at all!
> >
> > Thanks,
> > Rajiv
> >
> > On Wed, Mar 9, 2016 at 8:22 PM, Guozhang Wang <>
> wrote:
> >
> > > Rajiv,
> > >
> > > In the new Java consumer you used, the ZK dependency has been removed
> and
> > > hence you wont see any data from ZK path.
> > >
> > > To check the group metadata you can use the ConsumerGroupCommand,
> wrapped
> > > in bin/
> > >
> > > Guozhang
> > >
> > > On Wed, Mar 9, 2016 at 5:48 PM, Rajiv Kurian <>
> wrote:
> > >
> > > > Don't think I made my questions clear:
> > > >
> > > > On Kafka broker and 0.9 consumer how do I tell what my
> > > > consumer-groups are? Can I still get this information in ZK? I don't
> > see
> > > > anything in the consumers folder which is alarming to me. This is
> > > > especially alarming because I do see that 8 partitions are assigned
> on
> > > the
> > > > consumer (via jmx). I specify the consumer group using:
> > > >
> > > > String myConsumerGroupId = myTopic + "_consumer";
> > > >
> > > >
> > >
> >
> props.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
> > > >  myConsumerGroupId);
> > > >
> > > > I am running with this setup on about 20 consumers (each consuming a
> > > unique
> > > > topic) and I only see one of my consumers not passing any messages to
> > my
> > > > application even though I see that the jmx console says it is
> > receiving 5
> > > > requests per second. The other 19 seem to be working fine.
> > > >
> > > > Each of these 20 topics was created when a message was sent to it
> i.e.
> > it
> > > > was not provisioned from before. Messages currently are only being
> sent
> > > to
> > > > partition 0 even though there are 8 partitions per topic.
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Rajiv
> > > >
> > > > On Wed, Mar 9, 2016 at 4:30 PM, Rajiv Kurian <>
> > wrote:
> > > >
> > > > > Also forgot to mention that when I do consume with the console
> > > consumer I
> > > > > do see data coming through.
> > > > >
> > > > > On Wed, Mar 9, 2016 at 3:44 PM, Rajiv Kurian <>
> > > wrote:
> > > > >
> > > > >> I am running the broker with the 0.9 consumer. I am using
> > the
> > > > >> subscribe feature on the consumer to subscribe to a topic with 8
> > > > partitions.
> > > > >>
> > > > >> consumer.subscribe(Arrays.asList(myTopic));
> > > > >>
> > > > >> I have a single consumer group for said topic and a single process
> > > > >> subscribed with 8 partitions.
> > > > >>
> > > > >> When I use jmx on the consumer I do see that it has 8 partitions
> > > > assigned
> > > > >> to it according to the consumer-coordinator-metrics mbean. How
> can I
> > > > tell
> > > > >> what topic it is listening to? I couldn't find this on jmx
> > anywhere. I
> > > > do
> > > > >> see that it is getting 5 responses per second according to the
> > > > >> consumer-metrics mbean but I don't see any in my actual
> application.
> > > > >>
> > > > >> I consume my messages like this:
> > > > >>
> > > > >>     public int poll(SubscriptionDataHandler handler, long
> timeout) {
> > > > >>
> > > > >>         ConsumerRecords<Void, byte[]> records = null;
> > > > >>
> > > > >>         try {
> > > > >>
> > > > >>             records = consumer.poll(timeout);
> > > > >>
> > > > >>         } catch (Exception e) {
> > > > >>
> > > > >>             logger.error("Exception polling the Kafka , e);  //
> > Don't
> > > > >> see any exceptions here
> > > > >>
> > > > >>             return -1;
> > > > >>
> > > > >>         }
> > > > >>
> > > > >>         int numBuffers = 0;
> > > > >>
> > > > >>         if (records != null) {
> > > > >>
> > > > >>             for (ConsumerRecord<Void, byte[]> record : records) {
> > > > >>
> > > > >>                 byte[] payload = record.value();
> > > > >>
> > > > >>                 if (payload != null && payload.length > 0) {
> > > > >>
> > > > >>                     ByteBuffer wrappedBuffer =
> > > ByteBuffer.wrap(payload);
> > > > >>
> > > > >>                     try {
> > > > >>
> > > > >>                         handler.handleData(wrappedBuffer); // This
> > is
> > > > >> never called
> > > > >>
> > > > >>                     } catch (Exception e) {
> > > > >>
> > > > >>                         logger.error("Exception consuming buffer ,
> > e);
> > > > >>
> > > > >>                     }
> > > > >>
> > > > >>                     numBuffers += 1;  // This is never
> incremented.
> > > > >>
> > > > >>                 }
> > > > >>
> > > > >>             }
> > > > >>
> > > > >>
> > > > >>             // Commit only after consuming.
> > > > >>
> > > > >>             consumer.commitAsync(offsetCommitCallback);
> > > > >>
> > > > >>         }
> > > > >>
> > > > >> I also don't see any data in the consumers folder in ZK. In fact
> it
> > is
> > > > >> completely empty.
> > > > >>
> > > > >> When I use the console-consumer, I do see the console-consumer
> show
> > up
> > > > in
> > > > >> the consumers folder, but none of my actual consumers show up.
> > > > >>
> > > > >> I tried looking for jmx data on the servers too and couldn't quite
> > > > figure
> > > > >> out where I can get jmax.
> > > > >>
> > > > >> I am trying to figure out why the Kafka consumer thinks it is
> > getting
> > > > >> messages ( 5 responses/second according to jmx) but I don't get
> any
> > in
> > > > my
> > > > >> application.
> > > > >>
> > > > >> Thanks,
> > > > >> Rajiv
> > > > >>
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >

Reply via email to