[ https://issues.apache.org/jira/browse/KAFKA-2686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang resolved KAFKA-2686. ---------------------------------- Resolution: Fixed Fix Version/s: 0.9.0.0 Issue resolved by pull request 352 [https://github.com/apache/kafka/pull/352] > unsubscribe() call leaves KafkaConsumer in invalid state for manual > topic-partition assignment > ---------------------------------------------------------------------------------------------- > > Key: KAFKA-2686 > URL: https://issues.apache.org/jira/browse/KAFKA-2686 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.0 > Reporter: The Data Lorax > Assignee: Guozhang Wang > Fix For: 0.9.0.0 > > > The bellow code snippet demonstrated the problem. > Basically, the unsubscribe() call leaves the KafkaConsumer in a state that > means poll() will always return empty record sets, even if new > topic-partitions have been assigned that have messages pending. This is > because unsubscribe() sets SubscriptionState.needsPartitionAssignment to > true, and assign() does not clear this flag. The only thing that clears this > flag is when the consumer handles the response from a JoinGroup request. > {code} > final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); > consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1))); > ConsumerRecords<String, String> records = consumer.poll(100);// <- Works, > returning records > consumer.unsubscribe(); // Puts consumer into invalid state. > consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2))); > records = consumer.poll(100);// <- Always returns empty record set. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)