showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r638654728
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -469,73 +426,190 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); - balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, - consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer, revocationRequired); + balance(currentAssignment, prevAssignment, sortedAllPartitions, unassignedPartitions, sortedCurrentSubscriptions, + consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, revocationRequired, + partitionsPerTopic, totalPartitionsCount); + + if (log.isDebugEnabled()) { + log.debug("final assignment: {}", currentAssignment); + } + return currentAssignment; } + /** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedAssignedPartitions. If no assigned partitions, we'll just return all sorted topic partitions. + * This is used in generalAssign method + * + * We loop the sortedPartition, and compare the ith element in sortedAssignedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedAssignedPartitions + * + * @param sortedAllPartitions: sorted all partitions + * @param sortedAssignedPartitions: sorted partitions, all are included in the sortedPartitions + * @param topic2AllPotentialConsumers: topics mapped to all consumers that subscribed to it + * @return partitions that aren't assigned to any current consumer + */ + private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedAllPartitions, + List<TopicPartition> sortedAssignedPartitions, + Map<String, List<String>> topic2AllPotentialConsumers) { + if (sortedAssignedPartitions.isEmpty()) { + return sortedAllPartitions; + } + + List<TopicPartition> unassignedPartitions = new ArrayList<>(); + + Collections.sort(sortedAssignedPartitions, new PartitionComparator(topic2AllPotentialConsumers)); + + boolean shouldAddDirectly = false; + Iterator<TopicPartition> sortedAssignedPartitionsIter = sortedAssignedPartitions.iterator(); + TopicPartition nextAssignedPartition = sortedAssignedPartitionsIter.next(); + + for (TopicPartition topicPartition : sortedAllPartitions) { + if (shouldAddDirectly || !nextAssignedPartition.equals(topicPartition)) { + unassignedPartitions.add(topicPartition); + } else { + // this partition is in assignedPartitions, don't add to unassignedPartitions, just get next assigned partition + if (sortedAssignedPartitionsIter.hasNext()) { + nextAssignedPartition = sortedAssignedPartitionsIter.next(); + } else { + // add the remaining directly since there is no more sortedAssignedPartitions + shouldAddDirectly = true; + } + } + } + return unassignedPartitions; + } + + /** + * get the unassigned partition list by computing the difference set of all sorted partitions + * and sortedAssignedPartitions. If no assigned partitions, we'll just return all sorted topic partitions. + * This is used in constrainedAssign method + * + * To compute the difference set, we use two pointers technique here: + * + * We loop through the all sorted topics, and then iterate all partitions the topic has, + * compared with the ith element in sortedAssignedPartitions(i starts from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedAssignedPartitions + * + * @param totalPartitionsCount all partitions counts in this assignment + * @param partitionsPerTopic the number of partitions for each subscribed topic. + * @param sortedAssignedPartitions sorted partitions, all are included in the sortedPartitions + * @return the partitions not yet assigned to any consumers + */ + private List<TopicPartition> getUnassignedPartitions(int totalPartitionsCount, + Map<String, Integer> partitionsPerTopic, + List<TopicPartition> sortedAssignedPartitions) { + List<String> sortedAllTopics = new ArrayList<>(partitionsPerTopic.keySet()); + // sort all topics first, then we can have sorted all topic partitions by adding partitions starting from 0 + Collections.sort(sortedAllTopics); + + if (sortedAssignedPartitions.isEmpty()) { + // no assigned partitions means all partitions are unassigned partitions + return getAllTopicPartitions(partitionsPerTopic, sortedAllTopics, totalPartitionsCount); + } + + List<TopicPartition> unassignedPartitions = new ArrayList<>(totalPartitionsCount - sortedAssignedPartitions.size()); + + Collections.sort(sortedAssignedPartitions, Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)); + + boolean shouldAddDirectly = false; + Iterator<TopicPartition> sortedAssignedPartitionsIter = sortedAssignedPartitions.iterator(); + TopicPartition nextAssignedPartition = sortedAssignedPartitionsIter.next(); + + for (String topic : sortedAllTopics) { + int partitionCount = partitionsPerTopic.get(topic); + for (int i = 0; i < partitionCount; i++) { + if (shouldAddDirectly || !(nextAssignedPartition.topic().equals(topic) && nextAssignedPartition.partition() == i)) { + unassignedPartitions.add(new TopicPartition(topic, i)); + } else { + // this partition is in assignedPartitions, don't add to unassignedPartitions, just get next assigned partition + if (sortedAssignedPartitionsIter.hasNext()) { + nextAssignedPartition = sortedAssignedPartitionsIter.next(); + } else { + // add the remaining directly since there is no more sortedAssignedPartitions + shouldAddDirectly = true; + } + } + } + } + + return unassignedPartitions; + } + + /** + * update the prevAssignment with the partitions, consumer and generation in parameters + * + * @param partitions: The partitions to be updated the prevAssignement + * @param consumer: The consumer Id + * @param prevAssignment: The assignment contains the assignment with the 2nd largest generation + * @param generation: The generation of this assignment (partitions) + */ + private void updatePrevAssignment(Map<TopicPartition, ConsumerGenerationPair> prevAssignment, + List<TopicPartition> partitions, + String consumer, + int generation) { + for (TopicPartition partition: partitions) { + if (prevAssignment.containsKey(partition)) { + // only keep the latest previous assignment + if (generation > prevAssignment.get(partition).generation) { + prevAssignment.put(partition, new ConsumerGenerationPair(consumer, generation)); + } + } else { + prevAssignment.put(partition, new ConsumerGenerationPair(consumer, generation)); + } + } + } + + /** + * filling in the prevAssignment from the subscriptions. + * + * @param subscriptions: Map from the member id to their respective topic subscription + * @param prevAssignment: The assignment contains the assignment with the 2nd largest generation + */ private void prepopulateCurrentAssignments(Map<String, Subscription> subscriptions, - Map<String, List<TopicPartition>> currentAssignment, Map<TopicPartition, ConsumerGenerationPair> prevAssignment) { // we need to process subscriptions' user data with each consumer's reported generation in mind // higher generations overwrite lower generations in case of a conflict // note that a conflict could exists only if user data is for different generations - // for each partition we create a sorted map of its consumers by generation - Map<TopicPartition, TreeMap<Integer, String>> sortedPartitionConsumersByGeneration = new HashMap<>(); for (Map.Entry<String, Subscription> subscriptionEntry: subscriptions.entrySet()) { String consumer = subscriptionEntry.getKey(); - MemberData memberData = memberData(subscriptionEntry.getValue()); - - for (TopicPartition partition: memberData.partitions) { - if (sortedPartitionConsumersByGeneration.containsKey(partition)) { - Map<Integer, String> consumers = sortedPartitionConsumersByGeneration.get(partition); - if (memberData.generation.isPresent() && consumers.containsKey(memberData.generation.get())) { - // same partition is assigned to two consumers during the same rebalance. - // log a warning and skip this record - log.warn("Partition '{}' is assigned to multiple consumers following sticky assignment generation {}.", - partition, memberData.generation); - } else - consumers.put(memberData.generation.orElse(DEFAULT_GENERATION), consumer); - } else { - TreeMap<Integer, String> sortedConsumers = new TreeMap<>(); - sortedConsumers.put(memberData.generation.orElse(DEFAULT_GENERATION), consumer); - sortedPartitionConsumersByGeneration.put(partition, sortedConsumers); - } + Subscription subscription = subscriptionEntry.getValue(); + if (subscription.userData() != null) { + // since this is our 2nd time to deserialize memberData, rewind userData is necessary + subscription.userData().rewind(); } Review comment: This is actually a bug after `constrainedAssign` implemented. After `constrainedAssign` implemented, we'll do `allSubscriptionsEqual` to decide if we want to use `constrainedAssign` or `generalAssign`. In `allSubscriptionsEqual`, we not only check if subscription equal, but also deserialize the user data. So, if it is deserialized once, the position of userData (ByteBuffer) will be moved to the end of the buffer, so that we have to rewind here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org