[ 
https://issues.apache.org/jira/browse/KAFKA-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15105037#comment-15105037
 ] 

Federico Fissore edited comment on KAFKA-2985 at 1/18/16 10:00 AM:
-------------------------------------------------------------------

I'm too experiencing the same behaviour. I have a short test (below), 1 
producer, 1 consumer, 2 brokers. First run is fast as expected. Second run gets 
stuck at first {code}poll(0){code}.
ATM I'm not able to test latest from branch 0.9.0. A nightly build of kafka may 
help a lot, because then I'd just have to change my Dockerfile urls and I'll be 
up & running (related to #KAFKA-2380)

{code}
public class KafkaNewConsumerAPITest {

   private KafkaProducer<String, Object> producer;
   private KafkaConsumer<String, Object> consumer;

   @Before
   public void setUp() throws Exception {
      Properties props = new Properties();
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.4:9092");
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaMapSerializer.class.getName());
      props.put(ProducerConfig.LINGER_MS_CONFIG, 0);

      producer = new KafkaProducer<>(props);

      props = new Properties();
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.4:9092");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
KafkaMapDeserializer.class.getName());
      props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
RoundRobinAssignor.class.getName());
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
      consumer = new KafkaConsumer<>(props);
   }

   @Test
   public void shouldProduceAndConsumeOneMessage() throws Exception {
      String topic = "_test_topic_";

      consumer.subscribe(Collections.singletonList(topic));

      System.out.println("polling");
      ConsumerRecords<String, Object> records = consumer.poll(0);
      System.out.println("polled");
      for (ConsumerRecord<String, Object> record : records) {
         System.out.printf("offset = %d, key = %s, value = %s", 
record.offset(), record.key(), record.value());
      }
      consumer.commitSync();

      System.out.println("producing");
      Map<String, Object> hello = new MapBuilder().put("name", 
"world").put("random", "" + Math.random()).build();
      ProducerRecord<String, Object> record = new ProducerRecord<>(topic, 
"hello", hello);

      producer.send(record, (recordMetadata, e) -> {
         assertNull(e);
         assertTrue(recordMetadata.offset() >= 0);
         assertTrue(recordMetadata.partition() >= 0);
         assertEquals(topic, recordMetadata.topic());
      });

      producer.close();

      System.out.println("polling");
      records = consumer.poll(2000);
      System.out.println("polled");
      assertFalse(records.isEmpty());
      Iterator<ConsumerRecord<String, Object>> iterator = records.iterator();

      ConsumerRecord<String, Object> consumedRecord = iterator.next();
      System.out.printf("offset = %d, key = %s, value = %s", 
consumedRecord.offset(), consumedRecord.key(), consumedRecord.value());
      assertEquals("hello", consumedRecord.key());
      Map<String, Object> value = (Map<String, Object>) consumedRecord.value();
      assertEquals("world", value.get("name"));
      assertEquals(hello.get("random"), value.get("random"));

      consumer.commitSync();
   }
}
{code}


was (Author: fridrik):
I'm too experiencing the same behaviour. I have a short test (below), 1 
producer, 1 consumer, 2 brokers. First run, is fast as expected, second run 
gets stuck at first {code}poll(0){code}.
ATM I'm not able to test latest from branch 0.9.0. A nightly build of kafka may 
help a lot, because then I'd just have to change my Dockerfile urls and I'll be 
up & running (related to #KAFKA-2380)

{code}
public class KafkaNewConsumerAPITest {

   private KafkaProducer<String, Object> producer;
   private KafkaConsumer<String, Object> consumer;

   @Before
   public void setUp() throws Exception {
      Properties props = new Properties();
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.4:9092");
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaMapSerializer.class.getName());
      props.put(ProducerConfig.LINGER_MS_CONFIG, 0);

      producer = new KafkaProducer<>(props);

      props = new Properties();
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.4:9092");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
KafkaMapDeserializer.class.getName());
      props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
RoundRobinAssignor.class.getName());
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
      consumer = new KafkaConsumer<>(props);
   }

   @Test
   public void shouldProduceAndConsumeOneMessage() throws Exception {
      String topic = "_test_topic_";

      consumer.subscribe(Collections.singletonList(topic));

      System.out.println("polling");
      ConsumerRecords<String, Object> records = consumer.poll(0);
      System.out.println("polled");
      for (ConsumerRecord<String, Object> record : records) {
         System.out.printf("offset = %d, key = %s, value = %s", 
record.offset(), record.key(), record.value());
      }
      consumer.commitSync();

      System.out.println("producing");
      Map<String, Object> hello = new MapBuilder().put("name", 
"world").put("random", "" + Math.random()).build();
      ProducerRecord<String, Object> record = new ProducerRecord<>(topic, 
"hello", hello);

      producer.send(record, (recordMetadata, e) -> {
         assertNull(e);
         assertTrue(recordMetadata.offset() >= 0);
         assertTrue(recordMetadata.partition() >= 0);
         assertEquals(topic, recordMetadata.topic());
      });

      producer.close();

      System.out.println("polling");
      records = consumer.poll(2000);
      System.out.println("polled");
      assertFalse(records.isEmpty());
      Iterator<ConsumerRecord<String, Object>> iterator = records.iterator();

      ConsumerRecord<String, Object> consumedRecord = iterator.next();
      System.out.printf("offset = %d, key = %s, value = %s", 
consumedRecord.offset(), consumedRecord.key(), consumedRecord.value());
      assertEquals("hello", consumedRecord.key());
      Map<String, Object> value = (Map<String, Object>) consumedRecord.value();
      assertEquals("world", value.get("name"));
      assertEquals(hello.get("random"), value.get("random"));

      consumer.commitSync();
   }
}
{code}

> Consumer group stuck in rebalancing state
> -----------------------------------------
>
>                 Key: KAFKA-2985
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2985
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0
>         Environment: Kafka 0.9.0.0.
> Kafka Java consumer 0.9.0.0
> 2 Java producers.
> 3 Java consumers using the new consumer API.
> 2 kafka brokers.
>            Reporter: Jens Rantil
>            Assignee: Jason Gustafson
>
> We've doing some load testing on Kafka. _After_ the load test when our 
> consumers and have two times now seen Kafka become stuck in consumer group 
> rebalancing. This is after all our consumers are done consuming and 
> essentially polling periodically without getting any records.
> The brokers list the consumer group (named "default"), but I can't query the 
> offsets:
> {noformat}
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --list
> default
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --describe --group 
> default|sort
> Consumer group `default` does not exist or is rebalancing.
> {noformat}
> Retrying to query the offsets for 15 minutes or so still said it was 
> rebalancing. After restarting our first broker, the group immediately started 
> rebalancing. That broker was logging this before restart:
> {noformat}
> [2015-12-12 13:09:48,517] INFO [Group Metadata Manager on Broker 0]: Removed 
> 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
> [2015-12-12 13:10:16,139] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,141] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 16 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,575] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,141] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 17 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,143] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 17 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,314] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 17 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,144] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 18 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,145] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 18 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,340] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 18 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,146] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 19 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,148] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 19 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,238] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 19 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,148] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 20 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,149] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 20 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,360] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 20 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,150] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 21 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,152] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 21 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,217] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 21 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:16:10,152] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 22 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:16:10,154] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 22 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:16:10,339] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 22 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:17:09,155] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 23 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:17:09,157] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 23 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:17:09,262] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 23 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:18:08,157] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 24 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:18:08,159] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 24 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:18:08,333] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 24 (kafka.coordinator.GroupCoordinator)
> {noformat}
> Our consumers were logging:
> {noformat}
> Dec 12 13:09:17 X.X.X.110 system[27782]: [KafkaTaskExecutorConsumer] 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator Marking the 
> coordinator 2147483647 dead.
> Dec 12 13:09:17 X.X.X.110 system[27782]: [KafkaTaskExecutorConsumer] 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Error 
> UNKNOWN_MEMBER_ID occurred while committing offsets for group default
> Dec 12 13:09:17 X.X.X.110 system[27782]: [KafkaTaskExecutorConsumer] 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Auto offset 
> commit failed: Commit cannot be completed due to group rebalance
> Dec 12 13:09:17 X.X.X.144 system[9915]: [KafkaTaskExecutorConsumer] 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator Marking the 
> coordinator 2147483647 dead.
> Dec 12 13:09:17 X.X.X.144 system[9915]: [KafkaTaskExecutorConsumer] 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Error 
> UNKNOWN_MEMBER_ID occurred while committing offsets for group default
> Dec 12 13:09:17 X.X.X.144 system[9915]: [KafkaTaskExecutorConsumer] 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Auto offset 
> commit failed: Commit cannot be completed due to group rebalance
> Dec 12 13:09:17 X.X.X.110 system[27782]: [KafkaTaskExecutorConsumer] 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Error 
> UNKNOWN_MEMBER_ID occurred while committing offsets for group default
> Dec 12 13:09:17 X.X.X.110 system[27782]: [KafkaTaskExecutorConsumer] 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Auto offset 
> commit failed:
> Dec 12 13:09:17 X.X.X.110 system[27782]: [KafkaTaskExecutorConsumer] 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator Attempt to 
> join group default failed due to unknown member id, resetting and retrying.
> Dec 12 13:09:17 X.X.X.144 system[9915]: [KafkaTaskExecutorConsumer] 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Error 
> UNKNOWN_MEMBER_ID occurred while committing offsets for group default
> Dec 12 13:09:17 X.X.X.144 system[9915]: [KafkaTaskExecutorConsumer] 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Auto offset 
> commit failed:
> Dec 12 13:09:17 X.X.X.144 system[9915]: [KafkaTaskExecutorConsumer] 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator Attempt to 
> join group default failed due to unknown member id, resetting and retrying.
> {noformat}
> I understand that the broker might start rebalancing if my consumers hasn't 
> reported heartbeat in session timeout. This might well have happened during 
> my load test. However, the issue here is that the rebalancing doesn't 
> stabilize/finish after the load test is done.
> Let me know if I can be of any assistance to track this down.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to