showuon commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r665328864



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -122,6 +131,13 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
                 // If the current member's generation is higher, all the 
previously owned partitions are invalid
                 if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
                     
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+
+                    allPreviousPartitionsToOwner.clear();
+                    partitionsWithMultiplePreviousOwners.clear();
+                    for (String droppedOutConsumer : membersWithOldGeneration) 
{
+                        
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
+                    }

Review comment:
       Because we cleared them earlier now, the `membersWithOldGeneration` is 
not necessary any more. We can just iterate `membersOfCurrentHighestGeneration` 
here.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -186,16 +212,25 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
             
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
         List<TopicPartition> assignedPartitions = new ArrayList<>();
-        // Reassign previously owned partitions to the expected number
+        // Reassign previously owned partitions, up to the expected number of 
partitions per consumer
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
 
+            for (TopicPartition doublyClaimedPartition : 
partitionsWithMultiplePreviousOwners) {
+                if (ownedPartitions.contains(doublyClaimedPartition)) {
+                    log.warn("Found partition {} still claimed as owned by 
consumer {}, despite being claimed by multiple"

Review comment:
       nit: add a space after "multiple". i.e. `despite being claimed by 
multiple[ ]`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to