[ 
https://issues.apache.org/jira/browse/KAFKA-2686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-2686.
----------------------------------
       Resolution: Fixed
    Fix Version/s: 0.9.0.0

Issue resolved by pull request 352
[https://github.com/apache/kafka/pull/352]

> unsubscribe() call leaves KafkaConsumer in invalid state for manual 
> topic-partition assignment
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2686
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2686
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0
>            Reporter: The Data Lorax
>            Assignee: Guozhang Wang
>             Fix For: 0.9.0.0
>
>
> The bellow code snippet demonstrated the problem.
> Basically, the unsubscribe() call leaves the KafkaConsumer in a state that 
> means poll() will always return empty record sets, even if new 
> topic-partitions have been assigned that have messages pending.  This is 
> because unsubscribe() sets SubscriptionState.needsPartitionAssignment to 
> true, and assign() does not clear this flag. The only thing that clears this 
> flag is when the consumer handles the response from a JoinGroup request.
> {code}
> final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
> consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1)));
> ConsumerRecords<String, String> records = consumer.poll(100);// <- Works, 
> returning records
> consumer.unsubscribe();   // Puts consumer into invalid state.
> consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2)));
> records = consumer.poll(100);// <- Always returns empty record set.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to