[
https://issues.apache.org/jira/browse/KAFKA-2686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14969669#comment-14969669
]
ASF GitHub Bot commented on KAFKA-2686:
---------------------------------------
GitHub user guozhangwang opened a pull request:
https://github.com/apache/kafka/pull/352
KAFKA-2686: Reset needsPartitionAssignment in SubscriptionState.assign()
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/guozhangwang/kafka K2686
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/352.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #352
----
commit ecc94205c0731ecf25737307aac2ffda20fc1a14
Author: Guozhang Wang <[email protected]>
Date: 2015-10-22T19:05:50Z
v1
----
> unsubscribe() call leaves KafkaConsumer in invalid state for manual
> topic-partition assignment
> ----------------------------------------------------------------------------------------------
>
> Key: KAFKA-2686
> URL: https://issues.apache.org/jira/browse/KAFKA-2686
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.9.0.0
> Reporter: The Data Lorax
> Assignee: Guozhang Wang
>
> The bellow code snippet demonstrated the problem.
> Basically, the unsubscribe() call leaves the KafkaConsumer in a state that
> means poll() will always return empty record sets, even if new
> topic-partitions have been assigned that have messages pending. This is
> because unsubscribe() sets SubscriptionState.needsPartitionAssignment to
> true, and assign() does not clear this flag. The only thing that clears this
> flag is when the consumer handles the response from a JoinGroup request.
> {code}
> final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
> consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1)));
> ConsumerRecords<String, String> records = consumer.poll(100);// <- Works,
> returning records
> consumer.unsubscribe(); // Puts consumer into invalid state.
> consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2)));
> records = consumer.poll(100);// <- Always returns empty record set.
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)