[
https://issues.apache.org/jira/browse/KAFKA-2115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14491878#comment-14491878
]
Gwen Shapira commented on KAFKA-2115:
-------------------------------------
I created a small test that uses the new consumer API, so I can reproduce the
error and validate my patch.
One thing I noticed while running my test is that while my consumer is running,
it looks like a new consumer is joining the consumer group every 10 seconds:
```
[2015-04-12 20:34:44,743] INFO [Kafka Coordinator 0]: Registered consumer
test-23 for group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:34:44,746] INFO [Kafka Coordinator 0]: Handled join-group from
consumer to group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:34:54,649] INFO [Kafka Coordinator 0]: Handled heartbeat of
consumer from group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:34:54,652] INFO [Kafka Coordinator 0]: Registered consumer
test-24 for group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:34:54,655] INFO [Kafka Coordinator 0]: Handled join-group from
consumer to group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:35:04,719] INFO [Kafka Coordinator 0]: Handled heartbeat of
consumer from group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:35:04,722] INFO [Kafka Coordinator 0]: Registered consumer
test-25 for group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:35:04,725] INFO [Kafka Coordinator 0]: Handled join-group from
consumer to group test (kafka.coordinator.ConsumerCoordinator)
```
I don't know whether this is expected behavior, bug in Kafka or bug in my
consumer code... perhaps someone more familiar with the new consumer can jump
in?
Here's my test:
```
public class NewConsumerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);
consumer.subscribe("t1", "topic2");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s",
record.offset(), record.key(), record.value());
}
}
}
```
> Error updating metrics in RequestChannel
> ----------------------------------------
>
> Key: KAFKA-2115
> URL: https://issues.apache.org/jira/browse/KAFKA-2115
> Project: Kafka
> Issue Type: Bug
> Reporter: Gwen Shapira
> Assignee: Gwen Shapira
> Attachments: KAFKA-2115.patch
>
>
> Reported by [~jkreps] on the mailing list:
> kafka.common.KafkaException: Wrong request type 12
>
> at kafka.api.RequestKeys$.nameForKey(RequestKeys.scala:55)
>
> at
>
> kafka.network.RequestChannel$Request.updateRequestMetrics(RequestChannel.scala:85)
>
> at kafka.network.Processor.write(SocketServer.scala:514)
>
> at kafka.network.Processor.run(SocketServer.scala:379)
>
> at java.lang.Thread.run(Thread.java:744)
>
> [2015-04-12 12:54:52,077] INFO [Kafka Coordinator 0]: Registered consumer
> my-group-24 for group my-group (kafka.coordinator.ConsumerCoordinator)
>
> [2015-04-12 12:54:52,080] INFO [Kafka Coordinator 0]: Handled join-group
> from consumer to group my-group (kafka.coordinator.ConsumerCoordinator)
> [2015-04-12 12:54:52,081] ERROR Closing socket for /10.0.0.220 because of
> error (kafka.network.Processor)
>
> kafka.common.KafkaException: Wrong request type 11
>
> at kafka.api.RequestKeys$.nameForKey(RequestKeys.scala:55)
>
> at
>
> kafka.network.RequestChannel$Request.updateRequestMetrics(RequestChannel.scala:85)
>
> at kafka.network.Processor.write(SocketServer.scala:514)
>
> at kafka.network.Processor.run(SocketServer.scala:379)
>
> at java.lang.Thread.run(Thread.java:744)
> This a result of KAFKA-2044 - we moved few Requests out
> of RequestKeys to the newer ApiKeys, but didn't update the metrics
> code.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)