[ https://issues.apache.org/jira/browse/KAFKA-2686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14969669#comment-14969669 ]
ASF GitHub Bot commented on KAFKA-2686: --------------------------------------- GitHub user guozhangwang opened a pull request: https://github.com/apache/kafka/pull/352 KAFKA-2686: Reset needsPartitionAssignment in SubscriptionState.assign() You can merge this pull request into a Git repository by running: $ git pull https://github.com/guozhangwang/kafka K2686 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/352.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #352 ---- commit ecc94205c0731ecf25737307aac2ffda20fc1a14 Author: Guozhang Wang <wangg...@gmail.com> Date: 2015-10-22T19:05:50Z v1 ---- > unsubscribe() call leaves KafkaConsumer in invalid state for manual > topic-partition assignment > ---------------------------------------------------------------------------------------------- > > Key: KAFKA-2686 > URL: https://issues.apache.org/jira/browse/KAFKA-2686 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.0 > Reporter: The Data Lorax > Assignee: Guozhang Wang > > The bellow code snippet demonstrated the problem. > Basically, the unsubscribe() call leaves the KafkaConsumer in a state that > means poll() will always return empty record sets, even if new > topic-partitions have been assigned that have messages pending. This is > because unsubscribe() sets SubscriptionState.needsPartitionAssignment to > true, and assign() does not clear this flag. The only thing that clears this > flag is when the consumer handles the response from a JoinGroup request. > {code} > final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); > consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1))); > ConsumerRecords<String, String> records = consumer.poll(100);// <- Works, > returning records > consumer.unsubscribe(); // Puts consumer into invalid state. > consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2))); > records = consumer.poll(100);// <- Always returns empty record set. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)