[ 
https://issues.apache.org/jira/browse/KAFKA-2115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14491878#comment-14491878
 ] 

Gwen Shapira edited comment on KAFKA-2115 at 4/13/15 3:44 AM:
--------------------------------------------------------------

I created a small test that uses the new consumer API, so I can reproduce the 
error and validate my patch.

One thing I noticed while running my test is that while my consumer is running, 
it looks like a new consumer is joining the consumer group every 10 seconds:

{code}
[2015-04-12 20:34:44,743] INFO [Kafka Coordinator 0]: Registered consumer 
test-23 for group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:34:44,746] INFO [Kafka Coordinator 0]: Handled join-group from 
consumer to group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:34:54,649] INFO [Kafka Coordinator 0]: Handled heartbeat of 
consumer from group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:34:54,652] INFO [Kafka Coordinator 0]: Registered consumer 
test-24 for group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:34:54,655] INFO [Kafka Coordinator 0]: Handled join-group from 
consumer to group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:35:04,719] INFO [Kafka Coordinator 0]: Handled heartbeat of 
consumer from group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:35:04,722] INFO [Kafka Coordinator 0]: Registered consumer 
test-25 for group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:35:04,725] INFO [Kafka Coordinator 0]: Handled join-group from 
consumer to group test (kafka.coordinator.ConsumerCoordinator)
{code}

I don't know whether this is expected behavior, bug in Kafka or bug in my 
consumer code... perhaps someone more familiar with the new consumer can jump 
in?

Here's my test:

{code}
public class NewConsumerDemo {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
String>(props);
        consumer.subscribe("t1", "topic2");
        while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s", 
record.offset(), record.key(), record.value());
            }
    }
}
{code}


was (Author: gwenshap):
I created a small test that uses the new consumer API, so I can reproduce the 
error and validate my patch.

One thing I noticed while running my test is that while my consumer is running, 
it looks like a new consumer is joining the consumer group every 10 seconds:

```
[2015-04-12 20:34:44,743] INFO [Kafka Coordinator 0]: Registered consumer 
test-23 for group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:34:44,746] INFO [Kafka Coordinator 0]: Handled join-group from 
consumer to group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:34:54,649] INFO [Kafka Coordinator 0]: Handled heartbeat of 
consumer from group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:34:54,652] INFO [Kafka Coordinator 0]: Registered consumer 
test-24 for group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:34:54,655] INFO [Kafka Coordinator 0]: Handled join-group from 
consumer to group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:35:04,719] INFO [Kafka Coordinator 0]: Handled heartbeat of 
consumer from group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:35:04,722] INFO [Kafka Coordinator 0]: Registered consumer 
test-25 for group test (kafka.coordinator.ConsumerCoordinator)
[2015-04-12 20:35:04,725] INFO [Kafka Coordinator 0]: Handled join-group from 
consumer to group test (kafka.coordinator.ConsumerCoordinator)
```

I don't know whether this is expected behavior, bug in Kafka or bug in my 
consumer code... perhaps someone more familiar with the new consumer can jump 
in?

Here's my test:

```
public class NewConsumerDemo {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
String>(props);
        consumer.subscribe("t1", "topic2");
        while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s", 
record.offset(), record.key(), record.value());
            }
    }
}
```

> Error updating metrics in RequestChannel
> ----------------------------------------
>
>                 Key: KAFKA-2115
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2115
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Gwen Shapira
>            Assignee: Gwen Shapira
>         Attachments: KAFKA-2115.patch
>
>
> Reported by [~jkreps] on the mailing list:
>     kafka.common.KafkaException: Wrong request type 12
>     
>     at kafka.api.RequestKeys$.nameForKey(RequestKeys.scala:55)
>     
>     at
>     
> kafka.network.RequestChannel$Request.updateRequestMetrics(RequestChannel.scala:85)
>     
>     at kafka.network.Processor.write(SocketServer.scala:514)
>     
>     at kafka.network.Processor.run(SocketServer.scala:379)
>     
>     at java.lang.Thread.run(Thread.java:744)
>     
>     [2015-04-12 12:54:52,077] INFO [Kafka Coordinator 0]: Registered consumer
> my-group-24 for group my-group (kafka.coordinator.ConsumerCoordinator)
>     
>     [2015-04-12 12:54:52,080] INFO [Kafka Coordinator 0]: Handled join-group
>     from consumer  to group my-group (kafka.coordinator.ConsumerCoordinator)
>     [2015-04-12 12:54:52,081] ERROR Closing socket for /10.0.0.220 because of
>     error (kafka.network.Processor)
>     
>     kafka.common.KafkaException: Wrong request type 11
>     
>     at kafka.api.RequestKeys$.nameForKey(RequestKeys.scala:55)
>     
>     at
>     
> kafka.network.RequestChannel$Request.updateRequestMetrics(RequestChannel.scala:85)
>     
>     at kafka.network.Processor.write(SocketServer.scala:514)
>     
>     at kafka.network.Processor.run(SocketServer.scala:379)
>     
>      at java.lang.Thread.run(Thread.java:744)
> This a result of KAFKA-2044 - we moved few Requests out
> of RequestKeys to the newer ApiKeys, but didn't update the metrics
> code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to