Has any one run into similar problems. I have experienced the same problem again. This time when I use kafka-consumer-groups.sh tool it says that my consumer group is either missing or rebalancing. But when I use the --list method it shows up on the list. So my guess is it is rebalancing some how. Again I have a single consumer group per topic with a single consumer in that group. Wondering it this causes some edge case. This consumer is up as of now, so I don't know why it would say it is rebalancing.
On Wed, Mar 9, 2016 at 11:05 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > Thanks! That worked. So I see that for the host that I am not getting > messages for has a massive lag for the 0th partition (the only one I send > messages on). The other 19 groups are all caught up which explains why they > have no issues. The lag is just increasing with time which confirms my > suspicion that no messages are being sent to it. However the owner of the > consumer is correctly shown to be the right process > (consumer-1_/ip_of_my_consumer). > > I know that I am calling poll regularly since I record metrics when I make > the poll call. The fact that Kafka consumer's JMX metrics also show 5 > responses a second probably proves that the poll call is being made. I am > guessing these poll calls yield empty responses and hence my application > sees no messages. > > Is there some known bug where a single consumer group with a single > consumer can run into such a problem? What happens if a consumer group has > a single consumer and it misses it's heart beat? Can it get stuck in limbo > in such a condition? I am guessing the problem will go away if I restart my > consumer but I want to try to figure out why it happened and how I can > prevent it. > > I did another experiment. I started sending data on all 8 partitions > instead of the 0th partition only. Now I see that the lag for those other 7 > partitions is 0 i.e. they are all caught up. However the 0th partition > which is still getting some traffic has the consumer offset at the same > number always and hence it's lag increasing. It must be in some kind of > limbo that the other partitions are not affected by. > > Thanks, > Rajiv > > On Wed, Mar 9, 2016 at 10:46 PM, Manikumar Reddy < > manikumar.re...@gmail.com> wrote: > >> We need to pass "--new-consumer" property to kafka-consumer-groups.sh >> command to use new consumer. >> >> sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list >> --new-consumer >> >> >> On Thu, Mar 10, 2016 at 12:02 PM, Rajiv Kurian <ra...@signalfx.com> >> wrote: >> >> > Hi Guozhang, >> > >> > I tried using the kafka-consumer-groups.sh --list command and it says I >> > have no consumer groups set up at all. Yet I am receiving data on 19 >> out of >> > 20 consumer processes (each with their own topic and consumer group). >> > >> > Here is my full kafka config as printed when my process started up: >> > >> > metric.reporters = [] >> > >> > metadata.max.age.ms = 300000 >> > >> > value.deserializer = class >> > sf.org.apache.kafka9.common.serialization.ByteArrayDeserializer >> > >> > group.id = myTopic_consumer >> > >> > partition.assignment.strategy = >> > [sf.org.apache.kafka9.clients.consumer.RangeAssignor] >> > >> > reconnect.backoff.ms = 50 >> > >> > sasl.kerberos.ticket.renew.window.factor = 0.8 >> > >> > max.partition.fetch.bytes = 1048576 >> > >> > bootstrap.servers = [myBroker1:9092, myBroker2:9092, >> > myBroker3:9092] >> > >> > retry.backoff.ms = 100 >> > >> > sasl.kerberos.kinit.cmd = /usr/bin/kinit >> > >> > sasl.kerberos.service.name = null >> > >> > sasl.kerberos.ticket.renew.jitter = 0.05 >> > >> > ssl.keystore.type = JKS >> > >> > ssl.trustmanager.algorithm = PKIX >> > >> > enable.auto.commit = false >> > >> > ssl.key.password = null >> > >> > fetch.max.wait.ms = 1000 >> > >> > sasl.kerberos.min.time.before.relogin = 60000 >> > >> > connections.max.idle.ms = 540000 >> > >> > ssl.truststore.password = null >> > >> > session.timeout.ms = 30000 >> > >> > metrics.num.samples = 2 >> > >> > client.id = >> > >> > ssl.endpoint.identification.algorithm = null >> > >> > key.deserializer = class sf.disco.kafka.VoidDeserializer >> > >> > ssl.protocol = TLS >> > >> > check.crcs = true >> > >> > request.timeout.ms = 40000 >> > >> > ssl.provider = null >> > >> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> > >> > ssl.keystore.location = null >> > >> > heartbeat.interval.ms = 3000 >> > >> > auto.commit.interval.ms = 5000 >> > >> > receive.buffer.bytes = 32768 >> > >> > ssl.cipher.suites = null >> > >> > ssl.truststore.type = JKS >> > >> > security.protocol = PLAINTEXT >> > >> > ssl.truststore.location = null >> > >> > ssl.keystore.password = null >> > >> > ssl.keymanager.algorithm = SunX509 >> > >> > metrics.sample.window.ms = 30000 >> > >> > fetch.min.bytes = 256 >> > >> > send.buffer.bytes = 131072 >> > >> > auto.offset.reset = earliest >> > >> > It prints out the group.id field as myTopic_consumer. I was expecting >> to >> > get this in the --list command and yet I am not getting it. Is this the >> > name of the consumer group or am I missing something? >> > >> > I use the subscribe call on the consumer and my understanding was that >> the >> > subscribe call would do all the work needed to create/join a group. >> Given I >> > have a single consumer per group and a single group per topic I'd >> expect to >> > see 20 groups (1 for each of my topics). However the --list returns no >> > groups at all! >> > >> > Thanks, >> > Rajiv >> > >> > On Wed, Mar 9, 2016 at 8:22 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> > > Rajiv, >> > > >> > > In the new Java consumer you used, the ZK dependency has been removed >> and >> > > hence you wont see any data from ZK path. >> > > >> > > To check the group metadata you can use the ConsumerGroupCommand, >> wrapped >> > > in bin/kafka-consumer-groups.sh. >> > > >> > > Guozhang >> > > >> > > On Wed, Mar 9, 2016 at 5:48 PM, Rajiv Kurian <ra...@signalfx.com> >> wrote: >> > > >> > > > Don't think I made my questions clear: >> > > > >> > > > On Kafka 0.9.0.1 broker and 0.9 consumer how do I tell what my >> > > > consumer-groups are? Can I still get this information in ZK? I don't >> > see >> > > > anything in the consumers folder which is alarming to me. This is >> > > > especially alarming because I do see that 8 partitions are assigned >> on >> > > the >> > > > consumer (via jmx). I specify the consumer group using: >> > > > >> > > > String myConsumerGroupId = myTopic + "_consumer"; >> > > > >> > > > >> > > >> > >> props.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, >> > > > myConsumerGroupId); >> > > > >> > > > I am running with this setup on about 20 consumers (each consuming a >> > > unique >> > > > topic) and I only see one of my consumers not passing any messages >> to >> > my >> > > > application even though I see that the jmx console says it is >> > receiving 5 >> > > > requests per second. The other 19 seem to be working fine. >> > > > >> > > > Each of these 20 topics was created when a message was sent to it >> i.e. >> > it >> > > > was not provisioned from before. Messages currently are only being >> sent >> > > to >> > > > partition 0 even though there are 8 partitions per topic. >> > > > >> > > > >> > > > Thanks, >> > > > >> > > > Rajiv >> > > > >> > > > On Wed, Mar 9, 2016 at 4:30 PM, Rajiv Kurian <ra...@signalfx.com> >> > wrote: >> > > > >> > > > > Also forgot to mention that when I do consume with the console >> > > consumer I >> > > > > do see data coming through. >> > > > > >> > > > > On Wed, Mar 9, 2016 at 3:44 PM, Rajiv Kurian <ra...@signalfx.com> >> > > wrote: >> > > > > >> > > > >> I am running the 0.9.0.1 broker with the 0.9 consumer. I am using >> > the >> > > > >> subscribe feature on the consumer to subscribe to a topic with 8 >> > > > partitions. >> > > > >> >> > > > >> consumer.subscribe(Arrays.asList(myTopic)); >> > > > >> >> > > > >> I have a single consumer group for said topic and a single >> process >> > > > >> subscribed with 8 partitions. >> > > > >> >> > > > >> When I use jmx on the consumer I do see that it has 8 partitions >> > > > assigned >> > > > >> to it according to the consumer-coordinator-metrics mbean. How >> can I >> > > > tell >> > > > >> what topic it is listening to? I couldn't find this on jmx >> > anywhere. I >> > > > do >> > > > >> see that it is getting 5 responses per second according to the >> > > > >> consumer-metrics mbean but I don't see any in my actual >> application. >> > > > >> >> > > > >> I consume my messages like this: >> > > > >> >> > > > >> public int poll(SubscriptionDataHandler handler, long >> timeout) { >> > > > >> >> > > > >> ConsumerRecords<Void, byte[]> records = null; >> > > > >> >> > > > >> try { >> > > > >> >> > > > >> records = consumer.poll(timeout); >> > > > >> >> > > > >> } catch (Exception e) { >> > > > >> >> > > > >> logger.error("Exception polling the Kafka , e); // >> > Don't >> > > > >> see any exceptions here >> > > > >> >> > > > >> return -1; >> > > > >> >> > > > >> } >> > > > >> >> > > > >> int numBuffers = 0; >> > > > >> >> > > > >> if (records != null) { >> > > > >> >> > > > >> for (ConsumerRecord<Void, byte[]> record : records) { >> > > > >> >> > > > >> byte[] payload = record.value(); >> > > > >> >> > > > >> if (payload != null && payload.length > 0) { >> > > > >> >> > > > >> ByteBuffer wrappedBuffer = >> > > ByteBuffer.wrap(payload); >> > > > >> >> > > > >> try { >> > > > >> >> > > > >> handler.handleData(wrappedBuffer); // >> This >> > is >> > > > >> never called >> > > > >> >> > > > >> } catch (Exception e) { >> > > > >> >> > > > >> logger.error("Exception consuming buffer >> , >> > e); >> > > > >> >> > > > >> } >> > > > >> >> > > > >> numBuffers += 1; // This is never >> incremented. >> > > > >> >> > > > >> } >> > > > >> >> > > > >> } >> > > > >> >> > > > >> >> > > > >> // Commit only after consuming. >> > > > >> >> > > > >> consumer.commitAsync(offsetCommitCallback); >> > > > >> >> > > > >> } >> > > > >> >> > > > >> I also don't see any data in the consumers folder in ZK. In fact >> it >> > is >> > > > >> completely empty. >> > > > >> >> > > > >> When I use the console-consumer, I do see the console-consumer >> show >> > up >> > > > in >> > > > >> the consumers folder, but none of my actual consumers show up. >> > > > >> >> > > > >> I tried looking for jmx data on the servers too and couldn't >> quite >> > > > figure >> > > > >> out where I can get jmax. >> > > > >> >> > > > >> I am trying to figure out why the Kafka consumer thinks it is >> > getting >> > > > >> messages ( 5 responses/second according to jmx) but I don't get >> any >> > in >> > > > my >> > > > >> application. >> > > > >> >> > > > >> Thanks, >> > > > >> Rajiv >> > > > >> >> > > > >> >> > > > >> >> > > > > >> > > > >> > > >> > > >> > > >> > > -- >> > > -- Guozhang >> > > >> > >> > >