[ 
https://issues.apache.org/jira/browse/KAFKA-9266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9266.
----------------------------------
    Fix Version/s: 2.3.1
       Resolution: Fixed

> KafkaConsumer manual assignment does not reset group assignment
> ---------------------------------------------------------------
>
>                 Key: KAFKA-9266
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9266
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.3.0
>            Reporter: G G
>            Priority: Major
>             Fix For: 2.3.1
>
>
> When using the manual assignment API, SubscriptionState still remembers group 
> subscriptions in its groupSubscription member of topics to which it is no 
> longer subscribed.
> See the following code which shows the unexpected behavior:
> {code:java}
>     TopicPartition tp1 = new TopicPartition("a", 0);
>     TopicPartition tp2 = new TopicPartition("b", 0);
>     LogContext logContext = new LogContext();
>     SubscriptionState state = new SubscriptionState(logContext, 
> OffsetResetStrategy.NONE);
>     state.assignFromUser(ImmutableSet.of(tp1, tp2));
>     state.unsubscribe();
>     state.assignFromUser(ImmutableSet.of(tp1));
>     assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Succeeds
>     
>     state.assignFromUser(ImmutableSet.of(tp1, tp2));
>     state.assignFromUser(ImmutableSet.of(tp1));
>     assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Fails: 
> Expected [a] but was [a, b]
> {code}
> The problem seems to be that within SubscriptionState.changeSubscription() 
> the groupSubscription only grows and is never trimmed if the assignment is 
> manual:
> {code}
>     private boolean changeSubscription(Set<String> topicsToSubscribe) {
>         ...
>         groupSubscription = new HashSet<>(groupSubscription);
>         groupSubscription.addAll(topicsToSubscribe);
>         ....
>     }
> {code}
> This behavior in turn leads to METADATA requests by the client with 
> partitions which are actually no longer assigned:
> {code}
> KafkaConsumer consumer;
> consumer.assign(ImmutableList.of(topicPartition1, topicPartition2));
> consumer.poll(); // This will cause a MetadataRequest to be sent to the 
> broker with topic1 and topic2
> consumer.assign(ImmutableList.of(topicPartition1));
> consumer.poll(); // This will AGAIN cause a MetadataRequest for topic1 and 
> topic2 instead of only topic1
> {code}
> And this in turn causes the deletion of the topicPartion2 to fail. The 
> workaround is to do a consumer.unassign(); before the second 
> consumer.assign();



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to