[ https://issues.apache.org/jira/browse/KAFKA-9266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16990827#comment-16990827 ]
daile commented on KAFKA-9266: ------------------------------ It looks like it has been fixed in 2.3.1 > KafkaConsumer manual assignment does not reset group assignment > --------------------------------------------------------------- > > Key: KAFKA-9266 > URL: https://issues.apache.org/jira/browse/KAFKA-9266 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 2.3.0 > Reporter: G G > Priority: Major > > When using the manual assignment API, SubscriptionState still remembers group > subscriptions in its groupSubscription member of topics to which it is no > longer subscribed. > See the following code which shows the unexpected behavior: > {code:java} > TopicPartition tp1 = new TopicPartition("a", 0); > TopicPartition tp2 = new TopicPartition("b", 0); > LogContext logContext = new LogContext(); > SubscriptionState state = new SubscriptionState(logContext, > OffsetResetStrategy.NONE); > state.assignFromUser(ImmutableSet.of(tp1, tp2)); > state.unsubscribe(); > state.assignFromUser(ImmutableSet.of(tp1)); > assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Succeeds > > state.assignFromUser(ImmutableSet.of(tp1, tp2)); > state.assignFromUser(ImmutableSet.of(tp1)); > assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Fails: > Expected [a] but was [a, b] > {code} > The problem seems to be that within SubscriptionState.changeSubscription() > the groupSubscription only grows and is never trimmed if the assignment is > manual: > {code} > private boolean changeSubscription(Set<String> topicsToSubscribe) { > ... > groupSubscription = new HashSet<>(groupSubscription); > groupSubscription.addAll(topicsToSubscribe); > .... > } > {code} > This behavior in turn leads to METADATA requests by the client with > partitions which are actually no longer assigned: > {code} > KafkaConsumer consumer; > consumer.assign(ImmutableList.of(topicPartition1, topicPartition2)); > consumer.poll(); // This will cause a MetadataRequest to be sent to the > broker with topic1 and topic2 > consumer.assign(ImmutableList.of(topicPartition1)); > consumer.poll(); // This will AGAIN cause a MetadataRequest for topic1 and > topic2 instead of only topic1 > {code} > And this in turn causes the deletion of the topicPartion2 to fail. The > workaround is to do a consumer.unassign(); before the second > consumer.assign(); -- This message was sent by Atlassian Jira (v8.3.4#803005)