[ 
https://issues.apache.org/jira/browse/KAFKA-2686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14969669#comment-14969669
 ] 

ASF GitHub Bot commented on KAFKA-2686:
---------------------------------------

GitHub user guozhangwang opened a pull request:

    https://github.com/apache/kafka/pull/352

    KAFKA-2686: Reset needsPartitionAssignment in SubscriptionState.assign()

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/guozhangwang/kafka K2686

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/352.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #352
    
----
commit ecc94205c0731ecf25737307aac2ffda20fc1a14
Author: Guozhang Wang <wangg...@gmail.com>
Date:   2015-10-22T19:05:50Z

    v1

----


> unsubscribe() call leaves KafkaConsumer in invalid state for manual 
> topic-partition assignment
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2686
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2686
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0
>            Reporter: The Data Lorax
>            Assignee: Guozhang Wang
>
> The bellow code snippet demonstrated the problem.
> Basically, the unsubscribe() call leaves the KafkaConsumer in a state that 
> means poll() will always return empty record sets, even if new 
> topic-partitions have been assigned that have messages pending.  This is 
> because unsubscribe() sets SubscriptionState.needsPartitionAssignment to 
> true, and assign() does not clear this flag. The only thing that clears this 
> flag is when the consumer handles the response from a JoinGroup request.
> {code}
> final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
> consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1)));
> ConsumerRecords<String, String> records = consumer.poll(100);// <- Works, 
> returning records
> consumer.unsubscribe();   // Puts consumer into invalid state.
> consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2)));
> records = consumer.poll(100);// <- Always returns empty record set.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to