ableegoldman commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r432727979



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -65,9 +69,182 @@ public MemberData(List<TopicPartition> partitions, 
Optional<Integer> generation)
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
                                                     Map<String, Subscription> 
subscriptions) {
+        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new 
HashMap<>();
+        Set<String> subscribedTopics = new HashSet<>();
+        if (allSubscriptionsEqual(subscriptions, consumerToOwnedPartitions, 
subscribedTopics)) {
+            log.debug("Detected that all consumers were subscribed to same set 
of topics, invoking the "
+                          + "optimized assignment algorithm");
+            return constrainedAssign(partitionsPerTopic, subscribedTopics, 
consumerToOwnedPartitions);
+        } else {
+            log.debug("Detected that all not consumers were subscribed to same 
set of topics, falling back to the "
+                          + "general case assignment algorithm");
+            return generalAssign(partitionsPerTopic, subscriptions);
+        }
+    }
+
+    private boolean allSubscriptionsEqual(Map<String, Subscription> 
subscriptions,
+                                          Map<String, List<TopicPartition>> 
consumerToOwnedPartitions,
+                                          Set<String> subscribedTopics) {
+        Set<String> membersWithOldGeneration = new HashSet<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = -1;
+
+        for (Map.Entry<String, Subscription> subscriptionEntry : 
subscriptions.entrySet()) {
+            String consumer = subscriptionEntry.getKey();
+            Subscription subscription = subscriptionEntry.getValue();
+
+            // initialize the subscribed topics set if this is the first 
subscription
+            if (subscribedTopics.isEmpty()) {
+                subscribedTopics.addAll(subscription.topics());
+            } else if (!(subscription.topics().size() == 
subscribedTopics.size()
+                && subscribedTopics.containsAll(subscription.topics()))) {
+                return false;
+            }
+
+            MemberData memberData = memberData(subscription);
+
+            // If this member's generation is lower than the current max, or 
it is not present while
+            // other generations are, consider it as having lost its owned 
partition
+            if (!memberData.generation.isPresent() && maxGeneration > 0
+                    || memberData.generation.isPresent() && 
memberData.generation.get() < maxGeneration) {
+                consumerToOwnedPartitions.put(consumer, new ArrayList<>());

Review comment:
       ack




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to