[ https://issues.apache.org/jira/browse/KAFKA-2115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14491878#comment-14491878 ]
Gwen Shapira commented on KAFKA-2115: ------------------------------------- I created a small test that uses the new consumer API, so I can reproduce the error and validate my patch. One thing I noticed while running my test is that while my consumer is running, it looks like a new consumer is joining the consumer group every 10 seconds: ``` [2015-04-12 20:34:44,743] INFO [Kafka Coordinator 0]: Registered consumer test-23 for group test (kafka.coordinator.ConsumerCoordinator) [2015-04-12 20:34:44,746] INFO [Kafka Coordinator 0]: Handled join-group from consumer to group test (kafka.coordinator.ConsumerCoordinator) [2015-04-12 20:34:54,649] INFO [Kafka Coordinator 0]: Handled heartbeat of consumer from group test (kafka.coordinator.ConsumerCoordinator) [2015-04-12 20:34:54,652] INFO [Kafka Coordinator 0]: Registered consumer test-24 for group test (kafka.coordinator.ConsumerCoordinator) [2015-04-12 20:34:54,655] INFO [Kafka Coordinator 0]: Handled join-group from consumer to group test (kafka.coordinator.ConsumerCoordinator) [2015-04-12 20:35:04,719] INFO [Kafka Coordinator 0]: Handled heartbeat of consumer from group test (kafka.coordinator.ConsumerCoordinator) [2015-04-12 20:35:04,722] INFO [Kafka Coordinator 0]: Registered consumer test-25 for group test (kafka.coordinator.ConsumerCoordinator) [2015-04-12 20:35:04,725] INFO [Kafka Coordinator 0]: Handled join-group from consumer to group test (kafka.coordinator.ConsumerCoordinator) ``` I don't know whether this is expected behavior, bug in Kafka or bug in my consumer code... perhaps someone more familiar with the new consumer can jump in? Here's my test: ``` public class NewConsumerDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe("t1", "topic2"); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } } } ``` > Error updating metrics in RequestChannel > ---------------------------------------- > > Key: KAFKA-2115 > URL: https://issues.apache.org/jira/browse/KAFKA-2115 > Project: Kafka > Issue Type: Bug > Reporter: Gwen Shapira > Assignee: Gwen Shapira > Attachments: KAFKA-2115.patch > > > Reported by [~jkreps] on the mailing list: > kafka.common.KafkaException: Wrong request type 12 > > at kafka.api.RequestKeys$.nameForKey(RequestKeys.scala:55) > > at > > kafka.network.RequestChannel$Request.updateRequestMetrics(RequestChannel.scala:85) > > at kafka.network.Processor.write(SocketServer.scala:514) > > at kafka.network.Processor.run(SocketServer.scala:379) > > at java.lang.Thread.run(Thread.java:744) > > [2015-04-12 12:54:52,077] INFO [Kafka Coordinator 0]: Registered consumer > my-group-24 for group my-group (kafka.coordinator.ConsumerCoordinator) > > [2015-04-12 12:54:52,080] INFO [Kafka Coordinator 0]: Handled join-group > from consumer to group my-group (kafka.coordinator.ConsumerCoordinator) > [2015-04-12 12:54:52,081] ERROR Closing socket for /10.0.0.220 because of > error (kafka.network.Processor) > > kafka.common.KafkaException: Wrong request type 11 > > at kafka.api.RequestKeys$.nameForKey(RequestKeys.scala:55) > > at > > kafka.network.RequestChannel$Request.updateRequestMetrics(RequestChannel.scala:85) > > at kafka.network.Processor.write(SocketServer.scala:514) > > at kafka.network.Processor.run(SocketServer.scala:379) > > at java.lang.Thread.run(Thread.java:744) > This a result of KAFKA-2044 - we moved few Requests out > of RequestKeys to the newer ApiKeys, but didn't update the metrics > code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)