showuon commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665328864
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -122,6 +131,13 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, // If the current member's generation is higher, all the previously owned partitions are invalid if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); + + allPreviousPartitionsToOwner.clear(); + partitionsWithMultiplePreviousOwners.clear(); + for (String droppedOutConsumer : membersWithOldGeneration) { + consumerToOwnedPartitions.get(droppedOutConsumer).clear(); + } Review comment: Because we cleared them earlier now, the `membersWithOldGeneration` is not necessary any more. We can just iterate `membersOfCurrentHighestGeneration` here. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -186,16 +212,25 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); List<TopicPartition> assignedPartitions = new ArrayList<>(); - // Reassign previously owned partitions to the expected number + // Reassign previously owned partitions, up to the expected number of partitions per consumer for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List<TopicPartition> ownedPartitions = consumerEntry.getValue(); List<TopicPartition> consumerAssignment = assignment.get(consumer); + for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) { + if (ownedPartitions.contains(doublyClaimedPartition)) { + log.warn("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple" Review comment: nit: add a space after "multiple". i.e. `despite being claimed by multiple[ ]` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org