KAFKA-2273; Sticky partition assignment strategy (KIP-54) This PR implements a new partition assignment strategy called "sticky", and it's purpose is to balance partitions across consumers in a way that minimizes moving partitions around, or, in other words, preserves existing partition assignments as much as possible.
This patch is co-authored with rajinisivaram and edoardocomar. Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #1020 from vahidhashemian/KAFKA-2273 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e1abf177 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e1abf177 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e1abf177 Branch: refs/heads/trunk Commit: e1abf17708918b82d3974ea028a4d74e3892fa0f Parents: 9815e18 Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Authored: Wed May 17 20:13:19 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed May 17 20:15:17 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/StickyAssignor.java | 933 +++++++++++++++++++ .../internals/AbstractPartitionAssignor.java | 8 +- .../consumer/internals/ConsumerProtocol.java | 1 - .../consumer/internals/PartitionAssignor.java | 2 +- .../org/apache/kafka/common/TopicPartition.java | 1 - .../clients/consumer/StickyAssignorTest.java | 689 ++++++++++++++ .../kafka/api/PlaintextConsumerTest.scala | 58 +- 7 files changed, 1685 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java new file mode 100644 index 0000000..58e5915 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -0,0 +1,933 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.utils.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either: + * - the numbers of topic partitions assigned to consumers differ by at most one; or + * - each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it. + * Second, it preserved as many existing assignment as possible when a reassignment occurs. This helps in saving some of the + * overhead processing when topic partitions move from one consumer to another. + * + * Starting fresh it would work by distributing the partitions over consumers as evenly as possible. Even though this may sound similar to + * how round robin assignor works, the second example below shows that it is not. + * During a reassignment it would perform the reassignment in such a way that in the new assignment + * 1. topic partitions are still distributed as evenly as possible, and + * 2. topic partitions stay with their previously assigned consumers as much as possible. + * Of course, the first goal above takes precedence over the second one. + * + * <b>Example 1.</b> Suppose there are three consumers <code>C0</code>, <code>C1</code>, <code>C2</code>, + * four topics <code>t0,</code> <code>t1</code>, <code>t2</code>, <code>t3</code>, and each topic has 2 partitions, + * resulting in partitions <code>t0p0</code>, <code>t0p1</code>, <code>t1p0</code>, <code>t1p1</code>, <code>t2p0</code>, + * <code>t2p1</code>, <code>t3p0</code>, <code>t3p1</code>. Each consumer is subscribed to all three topics. + * + * The assignment with both sticky and round robin assignors will be: + * <ul> + * <li><code>C0: [t0p0, t1p1, t3p0]<code></li> + * <li><code>C1: [t0p1, t2p0, t3p1]<code></li> + * <li><code>C2: [t1p0, t2p1]<code></li> + * </ul> + * + * Now, let's assume <code>C1</code> is removed and a reassignment is about to happen. The round robin assignor would produce: + * <ul> + * <li><code>C0: [t0p0, t1p0, t2p0, t3p0]</code></li> + * <li><code>C2: [t0p1, t1p1, t2p1, t3p1]</code></li> + * </ul> + * + * while the sticky assignor would result in: + * <ul> + * <li><code>C0 [t0p0, t1p1, t3p0, t2p0]</code></li> + * <li><code>C2 [t1p0, t2p1, t0p1, t3p1]</code></li> + * </ul> + * preserving all the previous assignments (unlike the round robin assignor). + * + * <b>Example 2.</b> There are three consumers <code>C0</code>, <code>C1</code>, <code>C2</code>, + * and three topics <code>t0</code>, <code>t1</code>, <code>t2</code>, with 1, 2, and 3 partitions respectively. + * Therefore, the partitions are <code>t0p0</code>, <code>t1p0</code>, <code>t1p1</code>, <code>t2p0</code>, + * <code>t2p1</code>, <code>t2p2</code>. <code>C0</code> is subscribed to <code>t0</code>; <code>C1</code> is subscribed to + * <code>t0</code>, <code>t1</code>; and <code>C2</code> is subscribed to <code>t0</code>, <code>t1</code>, <code>t2</code>. + * + * The round robin assignor would come up with the following assignment: + * <ul> + * <li><code>C0 [t0p0]</code></li> + * <li><code>C1 [t1p0]</code></li> + * <li><code>C2 [t1p1, t2p0, t2p1, t2p2]</code></li> + * </ul> + * + * which is not as balanced as the assignment suggested by sticky assignor: + * <ul> + * <li><code>C0 [t0p0]</code></li> + * <li><code>C1 [t1p0, t1p1]</code></li> + * <li><code>C2 [t2p0, t2p1, t2p2]</code></li> + * </ul> + * + * Now, if consumer <code>C0</code> is removed, these two assignors would produce the following assignments. + * Round Robin (preserves 3 partition assignments): + * <ul> + * <li><code>C1 [t0p0, t1p1]</code></li> + * <li><code>C2 [t1p0, t2p0, t2p1, t2p2]</code></li> + * </ul> + * + * Sticky (preserves 5 partition assignments): + * <ul> + * <li><code>C1 [t1p0, t1p1, t0p0]</code></li> + * <li><code>C2 [t2p0, t2p1, t2p2]</code></li> + * </ul> + * + * <h3>Impact on <code>ConsumerRebalanceListener</code></h3> + * The sticky assignment strategy can provide some optimization to those consumers that have some partition cleanup code + * in their <code>onPartitionsRevoked()</code> callback listeners. The cleanup code is placed in that callback listener + * because the consumer has no assumption or hope of preserving any of its assigned partitions after a rebalance when it + * is using range or round robin assignor. The listener code would look like this: + * <code> + * class TheOldRebalanceListener implements ConsumerRebalanceListener { + * + * void onPartitionsRevoked(Collection<TopicPartition> partitions) { + * for (TopicPartition partition: partitions) { + * commitOffsets(partition); + * cleanupState(partition); + * } + * } + * + * void onPartitionsAssigned(Collection<TopicPartition> partitions) { + * for (TopicPartition partition: partitions) { + * initializeState(partition); + * initializeOffset(partition); + * } + * } + * } + * </code> + * + * As mentioned above, one advantage of the sticky assignor is that, in general, it reduces the number of partitions that + * actually move from one consumer to another during a reassignment. Therefore, it allows consumers to do their cleanup + * more efficiently. Of course, they still can perform the partition cleanup in the <code>onPartitionsRevoked()</code> + * listener, but they can be more efficient and make a note of their partitions before and after the rebalance, and do the + * cleanup after the rebalance only on the partitions they have lost (which is normally not a lot). The code snippet below + * clarifies this point: + * <code> + * class TheNewRebalanceListener implements ConsumerRebalanceListener { + * Collection<TopicPartition> lastAssignment = Collections.emptyList(); + * + * void onPartitionsRevoked(Collection<TopicPartition> partitions) { + * for (TopicPartition partition: partitions) + * commitOffsets(partition); + * } + * + * void onPartitionsAssigned(Collection<TopicPartition> assignment) { + * for (TopicPartition partition: difference(lastAssignment, assignment)) + * cleanupState(partition); + * + * for (TopicPartition partition: difference(assignment, lastAssignment)) + * initializeState(partition); + * + * for (TopicPartition partition: assignment) + * initializeOffset(partition); + * + * this.lastAssignment = assignment; + * } + * } + * </code> + * + * Any consumer that uses sticky assignment can leverage this listener like this: + * <code>consumer.subscribe(topics, new TheNewRebalanceListener());</code> + * + */ +public class StickyAssignor extends AbstractPartitionAssignor { + private static final Logger log = LoggerFactory.getLogger(StickyAssignor.class); + + // these schemas are used for preserving consumer's previously assigned partitions + // list and sending it as user data to the leader during a rebalance + private static final String TOPIC_PARTITIONS_KEY_NAME = "previous_assignment"; + private static final String TOPIC_KEY_NAME = "topic"; + private static final String PARTITIONS_KEY_NAME = "partitions"; + private static final Schema TOPIC_ASSIGNMENT = new Schema( + new Field(TOPIC_KEY_NAME, Type.STRING), + new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32))); + private static final Schema STICKY_ASSIGNOR_USER_DATA = new Schema( + new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT))); + + Map<String, List<TopicPartition>> currentAssignment = new HashMap<>(); + private List<TopicPartition> memberAssignment = null; + private PartitionMovements partitionMovements; + + public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, + Map<String, List<String>> subscriptions) { + partitionMovements = new PartitionMovements(); + + prepopulateCurrentAssignments(); + // make a deep copy of currentAssignment + Map<String, List<TopicPartition>> oldAssignment = deepCopy(currentAssignment); + + // a mapping of all topic partitions to all consumers that can be assigned to them + final HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers = new HashMap<>(); + // a mapping of all consumers to all potential topic partitions that can be assigned to them + final HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions = new HashMap<>(); + + // initialize partition2AllPotentialConsumers and consumer2AllPotentialPartitions in the following two for loops + for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) { + for (int i = 0; i < entry.getValue(); ++i) + partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<String>()); + } + + for (Entry<String, List<String>> entry: subscriptions.entrySet()) { + String consumer = entry.getKey(); + consumer2AllPotentialPartitions.put(consumer, new ArrayList<TopicPartition>()); + for (String topic: entry.getValue()) { + for (int i = 0; i < partitionsPerTopic.get(topic); ++i) { + TopicPartition topicPartition = new TopicPartition(topic, i); + consumer2AllPotentialPartitions.get(consumer).add(topicPartition); + partition2AllPotentialConsumers.get(topicPartition).add(consumer); + } + } + + // add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist + if (!currentAssignment.containsKey(consumer)) + currentAssignment.put(consumer, new ArrayList<TopicPartition>()); + } + + // a mapping of partition to current consumer + HashMap<TopicPartition, String> currentPartitionConsumer = new HashMap<>(); + for (Map.Entry<String, List<TopicPartition>> entry: currentAssignment.entrySet()) + for (TopicPartition topicPartition: entry.getValue()) + currentPartitionConsumer.put(topicPartition, entry.getKey()); + + List<TopicPartition> sortedPartitions = sortPartitions(oldAssignment.isEmpty(), partition2AllPotentialConsumers, consumer2AllPotentialPartitions); + + // all partitions that need to be assigned (initially set to all partitions but adjusted in the following loop) + List<TopicPartition> unassignedPartitions = new ArrayList<>(sortedPartitions); + for (Iterator<Map.Entry<String, List<TopicPartition>>> it = currentAssignment.entrySet().iterator(); it.hasNext();) { + Map.Entry<String, List<TopicPartition>> entry = it.next(); + if (!subscriptions.containsKey(entry.getKey())) { + // if a consumer that existed before (and had some partition assignments) is now removed, remove it from currentAssignment + for (TopicPartition topicPartition: entry.getValue()) + currentPartitionConsumer.remove(topicPartition); + it.remove(); + } else { + // otherwise (the consumer still exists) + for (Iterator<TopicPartition> partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) { + TopicPartition partition = partitionIter.next(); + if (!partition2AllPotentialConsumers.containsKey(partition)) { + // if this topic partition of this consumer no longer exists remove it from currentAssignment of the consumer + partitionIter.remove(); + currentPartitionConsumer.remove(partition); + } else if (!subscriptions.get(entry.getKey()).contains(partition.topic())) { + // if this partition cannot remain assigned to its current consumer because the consumer + // is no longer subscribed to its topic remove it from currentAssignment of the consumer + partitionIter.remove(); + } else + // otherwise, remove the topic partition from those that need to be assigned only if + // its current consumer is still subscribed to its topic (because it is already assigned + // and we would want to preserve that assignment as much as possible) + unassignedPartitions.remove(partition); + } + } + } + // at this point we have preserved all valid topic partition to consumer assignments and removed + // all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions + // to consumers so that the topic partition assignments are as balanced as possible. + + // an ascending sorted set of consumers based on how many topic partitions are already assigned to them + TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); + sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); + + balance(sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, + partition2AllPotentialConsumers, oldAssignment, currentPartitionConsumer); + return currentAssignment; + } + + private void prepopulateCurrentAssignments() { + Map<String, Subscription> subscriptions = getSubscriptions(); + if (subscriptions == null) + return; + + currentAssignment.clear(); + for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) { + ByteBuffer userData = subscriptionEntry.getValue().userData(); + if (userData != null && userData.hasRemaining()) + currentAssignment.put(subscriptionEntry.getKey(), deserializeTopicPartitionAssignment(userData)); + } + } + + @Override + public void onAssignment(Assignment assignment) { + memberAssignment = assignment.partitions(); + } + + @Override + public Subscription subscription(Set<String> topics) { + if (memberAssignment == null) + return new Subscription(new ArrayList<>(topics)); + + return new Subscription(new ArrayList<>(topics), serializeTopicPartitionAssignment(memberAssignment)); + } + + @Override + public String name() { + return "sticky"; + } + + /** + * determine if the current assignment is a balanced one + * + * @param sortedCurrentSubscriptions: an ascending sorted set of consumers based on how many topic partitions are already assigned to them + * @param allSubscriptions: a mapping of all consumers to all potential topic partitions that can be assigned to them + * @return + */ + private boolean isBalanced(TreeSet<String> sortedCurrentSubscriptions, Map<String, List<TopicPartition>> allSubscriptions) { + int min = currentAssignment.get(sortedCurrentSubscriptions.first()).size(); + int max = currentAssignment.get(sortedCurrentSubscriptions.last()).size(); + if (min >= max - 1) + // if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true + return true; + + // create a mapping from partitions to the consumer assigned to them + final HashMap<TopicPartition, String> allPartitions = new HashMap<>(); + Set<Entry<String, List<TopicPartition>>> assignments = currentAssignment.entrySet(); + for (Map.Entry<String, List<TopicPartition>> entry: assignments) { + List<TopicPartition> topicPartitions = entry.getValue(); + for (TopicPartition topicPartition: topicPartitions) { + if (allPartitions.containsKey(topicPartition)) + log.error(topicPartition + " is assigned to more than one consumer."); + allPartitions.put(topicPartition, entry.getKey()); + } + } + + // for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it + // could but did not get cannot be moved to it (because that would break the balance) + for (String consumer: sortedCurrentSubscriptions) { + List<TopicPartition> consumerPartitions = currentAssignment.get(consumer); + int consumerPartitionCount = consumerPartitions.size(); + + // skip if this consumer already has all the topic partitions it can get + if (consumerPartitionCount == allSubscriptions.get(consumer).size()) + continue; + + // otherwise make sure it cannot get any more + List<TopicPartition> potentialTopicPartitions = allSubscriptions.get(consumer); + for (TopicPartition topicPartition: potentialTopicPartitions) { + if (!currentAssignment.get(consumer).contains(topicPartition)) { + String otherConsumer = allPartitions.get(topicPartition); + int otherConsumerPartitionCount = currentAssignment.get(otherConsumer).size(); + if (consumerPartitionCount < otherConsumerPartitionCount) { + log.debug(topicPartition + " can be moved from consumer " + otherConsumer + " to consumer " + consumer + " for a more balanced assignment."); + return false; + } + } + } + } + return true; + } + + /** + * @return the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs. + * A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0. + * Lower balance score indicates a more balanced assignment. + */ + private int getBalanceScore(Map<String, List<TopicPartition>> assignment) { + int score = 0; + + Map<String, Integer> consumer2AssignmentSize = new HashMap<>(); + for (Entry<String, List<TopicPartition>> entry: assignment.entrySet()) + consumer2AssignmentSize.put(entry.getKey(), entry.getValue().size()); + + Iterator<Entry<String, Integer>> it = consumer2AssignmentSize.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, Integer> entry = it.next(); + int consumerAssignmentSize = entry.getValue(); + it.remove(); + for (Entry<String, Integer> otherEntry: consumer2AssignmentSize.entrySet()) + score += Math.abs(consumerAssignmentSize - otherEntry.getValue()); + } + + return score; + } + + /** + * Sort valid partitions so they are processed in the potential reassignment phase in the proper order + * that causes minimal partition movement among consumers (hence honoring maximal stickiness) + * + * @param isFreshAssignment whether this is a new assignment, or a reassignment of an existing one + * @param partition2AllPotentialConsumers a mapping of partitions to their potential consumers + * @param consumer2AllPotentialPartitions a mapping of consumers to potential partitions they can consumer from + * @return sorted list of valid partitions + */ + private List<TopicPartition> sortPartitions(boolean isFreshAssignment, + HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers, + HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions) { + List<TopicPartition> sortedPartitions = new ArrayList<>(); + + if (!isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions)) { + // if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics) + // then we just need to simply list partitions in a round robin fashion (from consumers with + // most assigned partitions to those with least) + Map<String, List<TopicPartition>> assignments = deepCopy(currentAssignment); + for (Entry<String, List<TopicPartition>> entry: assignments.entrySet()) { + List<TopicPartition> toRemove = new ArrayList<>(); + for (TopicPartition partition: entry.getValue()) + if (!partition2AllPotentialConsumers.keySet().contains(partition)) + toRemove.add(partition); + for (TopicPartition partition: toRemove) + entry.getValue().remove(partition); + } + TreeSet<String> sortedConsumers = new TreeSet<>(new SubscriptionComparator(assignments)); + sortedConsumers.addAll(assignments.keySet()); + + while (!sortedConsumers.isEmpty()) { + String consumer = sortedConsumers.pollLast(); + List<TopicPartition> remainingPartitions = assignments.get(consumer); + if (!remainingPartitions.isEmpty()) { + sortedPartitions.add(remainingPartitions.remove(0)); + sortedConsumers.add(consumer); + } + } + + for (TopicPartition partition: partition2AllPotentialConsumers.keySet()) { + if (!sortedPartitions.contains(partition)) + sortedPartitions.add(partition); + } + + } else { + // an ascending sorted set of topic partitions based on how many consumers can potentially use them + TreeSet<TopicPartition> sortedAllPartitions = new TreeSet<>(new PartitionComparator(partition2AllPotentialConsumers)); + sortedAllPartitions.addAll(partition2AllPotentialConsumers.keySet()); + + while (!sortedAllPartitions.isEmpty()) + sortedPartitions.add(sortedAllPartitions.pollFirst()); + } + + return sortedPartitions; + } + + /** + * @param partition2AllPotentialConsumers a mapping of partitions to their potential consumers + * @param consumer2AllPotentialPartitions a mapping of consumers to potential partitions they can consumer from + * @return true if potential consumers of partitions are the same, and potential partitions consumers can + * consumer from are the same too + */ + private boolean areSubscriptionsIdentical(HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers, + HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions) { + if (!hasIdenticalListElements(partition2AllPotentialConsumers.values())) + return false; + + if (!hasIdenticalListElements(consumer2AllPotentialPartitions.values())) + return false; + + return true; + } + + /** + * @param col a collection of elements of type list + * @return true if all lists in the collection have the same members; false otherwise + */ + private <T> boolean hasIdenticalListElements(Collection<List<T>> col) { + Iterator<List<T>> it = col.iterator(); + List<T> cur = it.next(); + while (it.hasNext()) { + List<T> next = it.next(); + if (!(cur.containsAll(next) && next.containsAll(cur))) + return false; + cur = next; + } + return true; + } + + /** + * @return the consumer to which the given partition is assigned. The assignment should improve the overall balance + * of the partition assignments to consumers. + */ + private String assignPartition(TopicPartition partition, TreeSet<String> sortedCurrentSubscriptions, + HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions, HashMap<TopicPartition, String> currentPartitionConsumer) { + for (String consumer: sortedCurrentSubscriptions) { + if (consumer2AllPotentialPartitions.get(consumer).contains(partition)) { + sortedCurrentSubscriptions.remove(consumer); + currentAssignment.get(consumer).add(partition); + currentPartitionConsumer.put(partition, consumer); + sortedCurrentSubscriptions.add(consumer); + return consumer; + } + } + return null; + } + + private boolean canParticipateInReassignment(TopicPartition partition, HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers) { + // if a partition has two or more potential consumers it is subject to reassignment. + return partition2AllPotentialConsumers.get(partition).size() >= 2; + } + + private boolean canParticipateInReassignment(String consumer, + HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions, + HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers) { + List<TopicPartition> currentPartitions = currentAssignment.get(consumer); + int currentAssignmentSize = currentPartitions.size(); + int maxAssignmentSize = consumer2AllPotentialPartitions.get(consumer).size(); + if (currentAssignmentSize > maxAssignmentSize) + log.error("The consumer " + consumer + " is assigned more partitions than the maximum possible."); + + if (currentAssignmentSize < maxAssignmentSize) + // if a consumer is not assigned all its potential partitions it is subject to reassignment + return true; + + for (TopicPartition partition: currentPartitions) + // if any of the partitions assigned to a consumer is subject to reassignment the consumer itself + // is subject to reassignment + if (canParticipateInReassignment(partition, partition2AllPotentialConsumers)) + return true; + + return false; + } + + /** + * Balance the current assignment using the data structures created in the assign(...) method above. + */ + private void balance(List<TopicPartition> sortedPartitions, List<TopicPartition> unassignedPartitions, TreeSet<String> sortedCurrentSubscriptions, + HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions, HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers, + Map<String, List<TopicPartition>> oldAssignment, HashMap<TopicPartition, String> currentPartitionConsumer) { + boolean initializing = currentAssignment.get(sortedCurrentSubscriptions.last()).isEmpty(); + boolean reassignmentPerformed = false; + + // assign all unassigned partitions + for (TopicPartition partition: unassignedPartitions) { + // skip if there is no potential consumer for the partition + if (partition2AllPotentialConsumers.get(partition).isEmpty()) + continue; + + assignPartition(partition, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, currentPartitionConsumer); + } + + // narrow down the reassignment scope to only those partitions that can actually be reassigned + Set<TopicPartition> fixedPartitions = new HashSet<>(); + for (TopicPartition partition: partition2AllPotentialConsumers.keySet()) + if (!canParticipateInReassignment(partition, partition2AllPotentialConsumers)) + fixedPartitions.add(partition); + sortedPartitions.removeAll(fixedPartitions); + + // narrow down the reassignment scope to only those consumers that are subject to reassignment + Map<String, List<TopicPartition>> fixedAssignments = new HashMap<>(); + for (String consumer: consumer2AllPotentialPartitions.keySet()) + if (!canParticipateInReassignment(consumer, consumer2AllPotentialPartitions, partition2AllPotentialConsumers)) { + sortedCurrentSubscriptions.remove(consumer); + fixedAssignments.put(consumer, currentAssignment.remove(consumer)); + } + + // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later + Map<String, List<TopicPartition>> preBalanceAssignment = deepCopy(currentAssignment); + HashMap<TopicPartition, String> preBalancePartitionConsumers = new HashMap<>(currentPartitionConsumer); + + reassignmentPerformed = performReassignments(sortedPartitions, sortedCurrentSubscriptions, + consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer); + + // if we are not preserving existing assignments and we have made changes to the current assignment + // make sure we are getting a more balanced assignment; otherwise, revert to previous assignment + if (!initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment)) { + deepCopy(preBalanceAssignment, currentAssignment); + currentPartitionConsumer.clear(); + currentPartitionConsumer.putAll(preBalancePartitionConsumers); + } + + // add the fixed assignments (those that could not change) back + for (Entry<String, List<TopicPartition>> entry: fixedAssignments.entrySet()) { + String consumer = entry.getKey(); + currentAssignment.put(consumer, entry.getValue()); + sortedCurrentSubscriptions.add(consumer); + } + + fixedAssignments.clear(); + } + + private boolean performReassignments(List<TopicPartition> reassignablePartitions, TreeSet<String> sortedCurrentSubscriptions, + HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions, + HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers, + HashMap<TopicPartition, String> currentPartitionConsumer) { + boolean reassignmentPerformed = false; + boolean modified; + + // repeat reassignment until no partition can be moved to improve the balance + do { + modified = false; + // reassign all reassignable partitions (starting from the partition with least potential consumers and if needed) + // until the full list is processed or a balance is achieved + Iterator<TopicPartition> partitionIterator = reassignablePartitions.iterator(); + while (partitionIterator.hasNext() && !isBalanced(sortedCurrentSubscriptions, consumer2AllPotentialPartitions)) { + TopicPartition partition = partitionIterator.next(); + + // the partition must have at least two consumers + if (partition2AllPotentialConsumers.get(partition).size() <= 1) + log.error("Expected more than one potential consumer for partition '" + partition + "'"); + + // the partition must have a current consumer + String consumer = currentPartitionConsumer.get(partition); + if (consumer == null) + log.error("Expected partition '" + partition + "' to be assigned to a consumer"); + + // check if a better-suited consumer exist for the partition; if so, reassign it + for (String otherConsumer: partition2AllPotentialConsumers.get(partition)) { + if (currentAssignment.get(consumer).size() > currentAssignment.get(otherConsumer).size() + 1) { + reassignPartition(partition, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions); + reassignmentPerformed = true; + modified = true; + break; + } + } + } + } while (modified); + + return reassignmentPerformed; + } + + private void reassignPartition(TopicPartition partition, TreeSet<String> sortedCurrentSubscriptions, + HashMap<TopicPartition, String> currentPartitionConsumer, + HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions) { + String consumer = currentPartitionConsumer.get(partition); + + // find the new consumer + String newConsumer = null; + for (String anotherConsumer: sortedCurrentSubscriptions) { + if (consumer2AllPotentialPartitions.get(anotherConsumer).contains(partition)) { + newConsumer = anotherConsumer; + break; + } + } + + assert newConsumer != null; + + // find the correct partition movement considering the stickiness requirement + TopicPartition partitionToBeMoved = partitionMovements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer); + processPartitionMovement(partitionToBeMoved, newConsumer, sortedCurrentSubscriptions, currentPartitionConsumer); + + return; + } + + private void processPartitionMovement(TopicPartition partition, String newConsumer, + TreeSet<String> sortedCurrentSubscriptions, + HashMap<TopicPartition, String> currentPartitionConsumer) { + String oldConsumer = currentPartitionConsumer.get(partition); + + sortedCurrentSubscriptions.remove(oldConsumer); + sortedCurrentSubscriptions.remove(newConsumer); + + partitionMovements.movePartition(partition, oldConsumer, newConsumer); + + currentAssignment.get(oldConsumer).remove(partition); + currentAssignment.get(newConsumer).add(partition); + currentPartitionConsumer.put(partition, newConsumer); + sortedCurrentSubscriptions.add(newConsumer); + sortedCurrentSubscriptions.add(oldConsumer); + } + + boolean isSticky() { + return partitionMovements.isSticky(); + } + + private static ByteBuffer serializeTopicPartitionAssignment(List<TopicPartition> partitions) { + Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA); + List<Struct> topicAssignments = new ArrayList<>(); + for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupDataByTopic(partitions).entrySet()) { + Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT); + topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey()); + topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); + topicAssignments.add(topicAssignment); + } + struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray()); + ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA.sizeOf(struct)); + STICKY_ASSIGNOR_USER_DATA.write(buffer, struct); + buffer.flip(); + return buffer; + } + + private static List<TopicPartition> deserializeTopicPartitionAssignment(ByteBuffer buffer) { + Struct struct = STICKY_ASSIGNOR_USER_DATA.read(buffer); + List<TopicPartition> partitions = new ArrayList<>(); + for (Object structObj : struct.getArray(TOPIC_PARTITIONS_KEY_NAME)) { + Struct assignment = (Struct) structObj; + String topic = assignment.getString(TOPIC_KEY_NAME); + for (Object partitionObj : assignment.getArray(PARTITIONS_KEY_NAME)) { + Integer partition = (Integer) partitionObj; + partitions.add(new TopicPartition(topic, partition)); + } + } + return partitions; + } + + private void deepCopy(Map<String, List<TopicPartition>> source, Map<String, List<TopicPartition>> dest) { + dest.clear(); + for (Entry<String, List<TopicPartition>> entry: source.entrySet()) + dest.put(entry.getKey(), new ArrayList<>(entry.getValue())); + } + + private Map<String, List<TopicPartition>> deepCopy(Map<String, List<TopicPartition>> assignment) { + Map<String, List<TopicPartition>> copy = new HashMap<>(); + deepCopy(assignment, copy); + return copy; + } + + private static class PartitionComparator implements Comparator<TopicPartition>, Serializable { + private static final long serialVersionUID = 1L; + private Map<TopicPartition, List<String>> map; + + PartitionComparator(Map<TopicPartition, List<String>> map) { + this.map = map; + } + + @Override + public int compare(TopicPartition o1, TopicPartition o2) { + int ret = map.get(o1).size() - map.get(o2).size(); + if (ret == 0) { + ret = o1.topic().compareTo(o2.topic()); + if (ret == 0) + ret = o1.partition() - o2.partition(); + } + return ret; + } + } + + private static class SubscriptionComparator implements Comparator<String>, Serializable { + private static final long serialVersionUID = 1L; + private Map<String, List<TopicPartition>> map; + + SubscriptionComparator(Map<String, List<TopicPartition>> map) { + this.map = map; + } + + @Override + public int compare(String o1, String o2) { + int ret = map.get(o1).size() - map.get(o2).size(); + if (ret == 0) + ret = o1.compareTo(o2); + return ret; + } + } + + /** + * This class maintains some data structures to simplify lookup of partition movements among consumers. At each point of + * time during a partition rebalance it keeps track of partition movements corresponding to each topic, and also possible + * movement (in form a <code>ConsumerPair</code> object) for each partition. + */ + private static class PartitionMovements { + private Map<String, Map<ConsumerPair, Set<TopicPartition>>> partitionMovementsByTopic = new HashMap<>(); + private Map<TopicPartition, ConsumerPair> partitionMovements = new HashMap<>(); + + private ConsumerPair removeMovementRecordOfPartition(TopicPartition partition) { + ConsumerPair pair = partitionMovements.remove(partition); + + String topic = partition.topic(); + Map<ConsumerPair, Set<TopicPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic); + partitionMovementsForThisTopic.get(pair).remove(partition); + if (partitionMovementsForThisTopic.get(pair).isEmpty()) + partitionMovementsForThisTopic.remove(pair); + if (partitionMovementsByTopic.get(topic).isEmpty()) + partitionMovementsByTopic.remove(topic); + + return pair; + } + + private void addPartitionMovementRecord(TopicPartition partition, ConsumerPair pair) { + partitionMovements.put(partition, pair); + + String topic = partition.topic(); + if (!partitionMovementsByTopic.containsKey(topic)) + partitionMovementsByTopic.put(topic, new HashMap<ConsumerPair, Set<TopicPartition>>()); + + Map<ConsumerPair, Set<TopicPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic); + if (!partitionMovementsForThisTopic.containsKey(pair)) + partitionMovementsForThisTopic.put(pair, new HashSet<TopicPartition>()); + + partitionMovementsForThisTopic.get(pair).add(partition); + } + + private void movePartition(TopicPartition partition, String oldConsumer, String newConsumer) { + ConsumerPair pair = new ConsumerPair(oldConsumer, newConsumer); + + if (partitionMovements.containsKey(partition)) { + // this partition has previously moved + ConsumerPair existingPair = removeMovementRecordOfPartition(partition); + assert existingPair.dstMemberId.equals(oldConsumer); + if (!existingPair.srcMemberId.equals(newConsumer)) { + // the partition is not moving back to its previous consumer + // return new ConsumerPair2(existingPair.src, newConsumer); + addPartitionMovementRecord(partition, new ConsumerPair(existingPair.srcMemberId, newConsumer)); + } + } else + addPartitionMovementRecord(partition, pair); + } + + private TopicPartition getTheActualPartitionToBeMoved(TopicPartition partition, String oldConsumer, String newConsumer) { + String topic = partition.topic(); + + if (!partitionMovementsByTopic.containsKey(topic)) + return partition; + + if (partitionMovements.containsKey(partition)) { + // this partition has previously moved + assert oldConsumer.equals(partitionMovements.get(partition).dstMemberId); + oldConsumer = partitionMovements.get(partition).srcMemberId; + } + + Map<ConsumerPair, Set<TopicPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic); + ConsumerPair reversePair = new ConsumerPair(newConsumer, oldConsumer); + if (!partitionMovementsForThisTopic.containsKey(reversePair)) + return partition; + + return partitionMovementsForThisTopic.get(reversePair).iterator().next(); + } + + private boolean isLinked(String src, String dst, Set<ConsumerPair> pairs, List<String> currentPath) { + if (src.equals(dst)) + return false; + + if (pairs.isEmpty()) + return false; + + if (new ConsumerPair(src, dst).in(pairs)) { + currentPath.add(src); + currentPath.add(dst); + return true; + } + + for (ConsumerPair pair: pairs) + if (pair.srcMemberId.equals(src)) { + Set<ConsumerPair> reducedSet = new HashSet<>(pairs); + reducedSet.remove(pair); + currentPath.add(pair.srcMemberId); + return isLinked(pair.dstMemberId, dst, reducedSet, currentPath); + } + + return false; + } + + private boolean in(List<String> cycle, Set<List<String>> cycles) { + List<String> superCycle = new ArrayList<>(cycle); + superCycle.remove(superCycle.size() - 1); + superCycle.addAll(cycle); + for (List<String> foundCycle: cycles) { + if (foundCycle.size() == cycle.size() && Collections.indexOfSubList(superCycle, foundCycle) != -1) + return true; + } + return false; + } + + private boolean hasCycles(Set<ConsumerPair> pairs) { + Set<List<String>> cycles = new HashSet<>(); + for (ConsumerPair pair: pairs) { + Set<ConsumerPair> reducedPairs = new HashSet<>(pairs); + reducedPairs.remove(pair); + List<String> path = new ArrayList<>(Collections.singleton(pair.srcMemberId)); + if (isLinked(pair.dstMemberId, pair.srcMemberId, reducedPairs, path) && !in(path, cycles)) { + cycles.add(new ArrayList<>(path)); + log.error("A cycle of length " + (path.size() - 1) + " was found: " + path.toString()); + } + } + + // for now we want to make sure there is no partition movements of the same topic between a pair of consumers. + // the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized + // tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases. + for (List<String> cycle: cycles) + if (cycle.size() == 3) // indicates a cycle of length 2 + return true; + return false; + } + + private boolean isSticky() { + for (Map.Entry<String, Map<ConsumerPair, Set<TopicPartition>>> topicMovements: this.partitionMovementsByTopic.entrySet()) { + Set<ConsumerPair> topicMovementPairs = topicMovements.getValue().keySet(); + if (hasCycles(topicMovementPairs)) { + log.error("Stickiness is violated for topic " + topicMovements.getKey() + + "\nPartition movements for this topic occurred among the following consumer pairs:" + + "\n" + topicMovements.getValue().toString()); + return false; + } + } + + return true; + } + } + + /** + * <code>ConsumerPair</code> represents a pair of Kafka consumer ids involved in a partition reassignment. Each + * <code>ConsumerPair</code> object, which contains a source (<code>src</code>) and a destination (<code>dst</code>) + * element, normally corresponds to a particular partition or topic, and indicates that the particular partition or some + * partition of the particular topic was moved from the source consumer to the destination consumer during the rebalance. + * This class is used, through the <code>PartitionMovements</code> class, by the sticky assignor and helps in determining + * whether a partition reassignment results in cycles among the generated graph of consumer pairs. + */ + private static class ConsumerPair { + private final String srcMemberId; + private final String dstMemberId; + + ConsumerPair(String srcMemberId, String dstMemberId) { + this.srcMemberId = srcMemberId; + this.dstMemberId = dstMemberId; + } + + public String toString() { + return this.srcMemberId + "->" + this.dstMemberId; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((this.srcMemberId == null) ? 0 : this.srcMemberId.hashCode()); + result = prime * result + ((this.dstMemberId == null) ? 0 : this.dstMemberId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) + return false; + + if (!getClass().isInstance(obj)) + return false; + + ConsumerPair otherPair = (ConsumerPair) obj; + return this.srcMemberId.equals(otherPair.srcMemberId) && this.dstMemberId.equals(otherPair.dstMemberId); + } + + private boolean in(Set<ConsumerPair> pairs) { + for (ConsumerPair pair: pairs) + if (this.equals(pair)) + return true; + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java index 5c97693..bc87ed0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java @@ -34,6 +34,7 @@ import java.util.Set; */ public abstract class AbstractPartitionAssignor implements PartitionAssignor { private static final Logger log = LoggerFactory.getLogger(AbstractPartitionAssignor.class); + private Map<String, Subscription> subscriptions = null; /** * Perform the group assignment given the partition counts and member subscriptions @@ -52,6 +53,7 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor { @Override public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) { + this.subscriptions = new HashMap<>(subscriptions); Set<String> allSubscribedTopics = new HashSet<>(); Map<String, List<String>> topicSubscriptions = new HashMap<>(); for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) { @@ -71,13 +73,17 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor { Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions); - // this class has maintains no user data, so just wrap the results + // this class maintains no user data, so just wrap the results Map<String, Assignment> assignments = new HashMap<>(); for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet()) assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue())); return assignments; } + protected Map<String, Subscription> getSubscriptions() { + return subscriptions; + } + @Override public void onAssignment(Assignment assignment) { // this assignor maintains no internal state, so nothing to do http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java index 392e272..f8be9a0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -146,7 +146,6 @@ public class ConsumerProtocol { // otherwise, assume versions can be parsed as V0 } - private static Map<String, List<Integer>> asMap(Collection<TopicPartition> partitions) { Map<String, List<Integer>> partitionMap = new HashMap<>(); for (TopicPartition partition : partitions) { http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java index 86683a0..4a7c7a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java @@ -67,7 +67,7 @@ public interface PartitionAssignor { /** - * Unique name for this assignor (e.g. "range" or "roundrobin") + * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky") * @return non-null unique name */ String name(); http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/main/java/org/apache/kafka/common/TopicPartition.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java index 832bcd8..dc79c2e 100644 --- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java +++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java @@ -75,5 +75,4 @@ public final class TopicPartition implements Serializable { public String toString() { return topic + "-" + partition; } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java new file mode 100644 index 0000000..e9cc828 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java @@ -0,0 +1,689 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.CollectionUtils; +import org.apache.kafka.common.utils.Utils; +import org.junit.Test; + +public class StickyAssignorTest { + + private StickyAssignor assignor = new StickyAssignor(); + + @Test + public void testOneConsumerNoTopic() { + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, Collections.<String>emptyList()); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(Collections.singleton(consumerId), assignment.keySet()); + assertTrue(assignment.get(consumerId).isEmpty()); + + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testOneConsumerNonexistentTopic() { + String topic = "topic"; + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 0); + Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, topics(topic)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + + assertEquals(Collections.singleton(consumerId), assignment.keySet()); + assertTrue(assignment.get(consumerId).isEmpty()); + + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testOneConsumerOneTopic() { + String topic = "topic"; + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, topics(topic)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(Arrays.asList(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId)); + + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testOnlyAssignsPartitionsFromSubscribedTopics() { + String topic = "topic"; + String otherTopic = "other"; + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + partitionsPerTopic.put(otherTopic, 3); + Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, topics(topic)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(Arrays.asList(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId)); + + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testOneConsumerMultipleTopics() { + String topic1 = "topic1"; + String topic2 = "topic2"; + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic1, 1); + partitionsPerTopic.put(topic2, 2); + Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, topics(topic1, topic2)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(Arrays.asList(tp(topic1, 0), tp(topic2, 0), tp(topic2, 1)), assignment.get(consumerId)); + + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testTwoConsumersOneTopicOnePartition() { + String topic = "topic"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 1); + + Map<String, List<String>> subscriptions = new HashMap<>(); + subscriptions.put(consumer1, topics(topic)); + subscriptions.put(consumer2, topics(topic)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(Arrays.asList(tp(topic, 0)), assignment.get(consumer1)); + assertEquals(Collections.<TopicPartition>emptyList(), assignment.get(consumer2)); + + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testTwoConsumersOneTopicTwoPartitions() { + String topic = "topic"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 2); + + Map<String, List<String>> subscriptions = new HashMap<>(); + subscriptions.put(consumer1, topics(topic)); + subscriptions.put(consumer2, topics(topic)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(Arrays.asList(tp(topic, 0)), assignment.get(consumer1)); + assertEquals(Arrays.asList(tp(topic, 1)), assignment.get(consumer2)); + + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testMultipleConsumersMixedTopicSubscriptions() { + String topic1 = "topic1"; + String topic2 = "topic2"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + String consumer3 = "consumer3"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic1, 3); + partitionsPerTopic.put(topic2, 2); + + Map<String, List<String>> subscriptions = new HashMap<>(); + subscriptions.put(consumer1, topics(topic1)); + subscriptions.put(consumer2, topics(topic1, topic2)); + subscriptions.put(consumer3, topics(topic1)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(Arrays.asList(tp(topic1, 0), tp(topic1, 2)), assignment.get(consumer1)); + assertEquals(Arrays.asList(tp(topic2, 0), tp(topic2, 1)), assignment.get(consumer2)); + assertEquals(Arrays.asList(tp(topic1, 1)), assignment.get(consumer3)); + + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testTwoConsumersTwoTopicsSixPartitions() { + String topic1 = "topic1"; + String topic2 = "topic2"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic1, 3); + partitionsPerTopic.put(topic2, 3); + + Map<String, List<String>> subscriptions = new HashMap<>(); + subscriptions.put(consumer1, topics(topic1, topic2)); + subscriptions.put(consumer2, topics(topic1, topic2)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(Arrays.asList(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1)), assignment.get(consumer1)); + assertEquals(Arrays.asList(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), assignment.get(consumer2)); + + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testAddRemoveConsumerOneTopic() { + String topic = "topic"; + String consumer1 = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + Map<String, List<String>> subscriptions = new HashMap<>(); + subscriptions.put(consumer1, topics(topic)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(Arrays.asList(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumer1)); + + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + + String consumer2 = "consumer2"; + subscriptions.put(consumer2, topics(topic)); + assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(Arrays.asList(tp(topic, 1), tp(topic, 2)), assignment.get(consumer1)); + assertEquals(Arrays.asList(tp(topic, 0)), assignment.get(consumer2)); + + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + assertTrue(assignor.isSticky()); + + subscriptions.remove(consumer1); + assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertTrue(assignment.get(consumer2).contains(tp(topic, 0))); + assertTrue(assignment.get(consumer2).contains(tp(topic, 1))); + assertTrue(assignment.get(consumer2).contains(tp(topic, 2))); + + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + assertTrue(assignor.isSticky()); + } + + /** + * This unit test performs sticky assignment for a scenario that round robin assignor handles poorly. + * Topics (partitions per topic): topic1 (2), topic2 (1), topic3 (2), topic4 (1), topic5 (2) + * Subscriptions: + * - consumer1: topic1, topic2, topic3, topic4, topic5 + * - consumer2: topic1, topic3, topic5 + * - consumer3: topic1, topic3, topic5 + * - consumer4: topic1, topic2, topic3, topic4, topic5 + * Round Robin Assignment Result: + * - consumer1: topic1-0, topic3-0, topic5-0 + * - consumer2: topic1-1, topic3-1, topic5-1 + * - consumer3: + * - consumer4: topic2-0, topic4-0 + * Sticky Assignment Result: + * - consumer1: topic2-0, topic3-0 + * - consumer2: topic1-0, topic3-1 + * - consumer3: topic1-1, topic5-0 + * - consumer4: topic4-0, topic5-1 + */ + @Test + public void testPoorRoundRobinAssignmentScenario() { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + for (int i = 1; i <= 5; i++) + partitionsPerTopic.put(String.format("topic%d", i), (i % 2) + 1); + + Map<String, List<String>> subscriptions = new HashMap<>(); + subscriptions.put("consumer1", Arrays.asList("topic1", "topic2", "topic3", "topic4", "topic5")); + subscriptions.put("consumer2", Arrays.asList("topic1", "topic3", "topic5")); + subscriptions.put("consumer3", Arrays.asList("topic1", "topic3", "topic5")); + subscriptions.put("consumer4", Arrays.asList("topic1", "topic2", "topic3", "topic4", "topic5")); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + } + + @Test + public void testAddRemoveTopicTwoConsumers() { + String topic = "topic"; + String consumer1 = "consumer"; + String consumer2 = "consumer2"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + Map<String, List<String>> subscriptions = new HashMap<>(); + subscriptions.put(consumer1, topics(topic)); + subscriptions.put(consumer2, topics(topic)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + // verify balance + assertTrue(isFullyBalanced(assignment)); + verifyValidityAndBalance(subscriptions, assignment); + // verify stickiness + List<TopicPartition> consumer1Assignment1 = assignment.get(consumer1); + List<TopicPartition> consumer2Assignment1 = assignment.get(consumer2); + assertTrue((consumer1Assignment1.size() == 1 && consumer2Assignment1.size() == 2) || + (consumer1Assignment1.size() == 2 && consumer2Assignment1.size() == 1)); + + String topic2 = "topic2"; + partitionsPerTopic.put(topic2, 3); + subscriptions.put(consumer1, topics(topic, topic2)); + subscriptions.put(consumer2, topics(topic, topic2)); + assignment = assignor.assign(partitionsPerTopic, subscriptions); + // verify balance + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + // verify stickiness + List<TopicPartition> consumer1assignment = assignment.get(consumer1); + List<TopicPartition> consumer2assignment = assignment.get(consumer2); + assertTrue(consumer1assignment.size() == 3 && consumer2assignment.size() == 3); + assertTrue(consumer1assignment.containsAll(consumer1Assignment1)); + assertTrue(consumer2assignment.containsAll(consumer2Assignment1)); + assertTrue(assignor.isSticky()); + + partitionsPerTopic.remove(topic); + subscriptions.put(consumer1, topics(topic2)); + subscriptions.put(consumer2, topics(topic2)); + assignment = assignor.assign(partitionsPerTopic, subscriptions); + // verify balance + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + // verify stickiness + List<TopicPartition> consumer1Assignment3 = assignment.get(consumer1); + List<TopicPartition> consumer2Assignment3 = assignment.get(consumer2); + assertTrue((consumer1Assignment3.size() == 1 && consumer2Assignment3.size() == 2) || + (consumer1Assignment3.size() == 2 && consumer2Assignment3.size() == 1)); + assertTrue(consumer1assignment.containsAll(consumer1Assignment3)); + assertTrue(consumer2assignment.containsAll(consumer2Assignment3)); + assertTrue(assignor.isSticky()); + } + + @Test + public void testReassignmentAfterOneConsumerLeaves() { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + for (int i = 1; i < 20; i++) + partitionsPerTopic.put(String.format("topic%02d", i), i); + + Map<String, List<String>> subscriptions = new HashMap<>(); + for (int i = 1; i < 20; i++) { + List<String> topics = new ArrayList<String>(); + for (int j = 1; j <= i; j++) + topics.add(String.format("topic%02d", j)); + subscriptions.put(String.format("consumer%02d", i), topics); + } + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + + subscriptions.remove("consumer10"); + assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(assignor.isSticky()); + } + + @Test + public void testReassignmentAfterOneConsumerAdded() { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put("topic", 20); + + Map<String, List<String>> subscriptions = new HashMap<>(); + for (int i = 1; i < 10; i++) + subscriptions.put(String.format("consumer%02d", i), Collections.singletonList("topic")); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + + subscriptions.put("consumer10", Collections.singletonList("topic")); + assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(assignor.isSticky()); + } + + @Test + public void testSameSubscriptions() { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + for (int i = 1; i < 15; i++) + partitionsPerTopic.put(String.format("topic%02d", i), i); + + Map<String, List<String>> subscriptions = new HashMap<>(); + for (int i = 1; i < 9; i++) { + List<String> topics = new ArrayList<String>(); + for (int j = 1; j <= partitionsPerTopic.size(); j++) + topics.add(String.format("topic%02d", j)); + subscriptions.put(String.format("consumer%02d", i), topics); + } + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + + subscriptions.remove("consumer05"); + assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(assignor.isSticky()); + } + + @Test + public void testLargeAssignmentWithMultipleConsumersLeaving() { + Random rand = new Random(); + int topicCount = 40; + int consumerCount = 200; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + for (int i = 0; i < topicCount; i++) + partitionsPerTopic.put(getTopicName(i, topicCount), rand.nextInt(10) + 1); + + Map<String, List<String>> subscriptions = new HashMap<>(); + for (int i = 0; i < consumerCount; i++) { + List<String> topics = new ArrayList<String>(); + for (int j = 0; j < rand.nextInt(20); j++) + topics.add(getTopicName(rand.nextInt(topicCount), topicCount)); + subscriptions.put(getConsumerName(i, consumerCount), topics); + } + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + + for (int i = 0; i < 100; ++i) { + String c = getConsumerName(rand.nextInt(consumerCount), consumerCount); + subscriptions.remove(c); + } + + assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(assignor.isSticky()); + } + + @Test + public void testNewSubscription() { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + for (int i = 1; i < 5; i++) + partitionsPerTopic.put(String.format("topic%02d", i), 1); + + Map<String, List<String>> subscriptions = new HashMap<>(); + for (int i = 0; i < 3; i++) { + List<String> topics = new ArrayList<String>(); + for (int j = i; j <= 3 * i - 2; j++) + topics.add(String.format("topic%02d", j)); + subscriptions.put(String.format("consumer%02d", i), topics); + } + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + + subscriptions.get("consumer00").add("topic01"); + + assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(assignor.isSticky()); + } + + @Test + public void testReassignmentWithRandomSubscriptionsAndChanges() { + final int minNumConsumers = 20; + final int maxNumConsumers = 40; + final int minNumTopics = 10; + final int maxNumTopics = 20; + + for (int round = 1; round <= 100; ++round) { + int numTopics = minNumTopics + new Random().nextInt(maxNumTopics - minNumTopics); + + ArrayList<String> topics = new ArrayList<>(); + for (int i = 0; i < numTopics; ++i) + topics.add(getTopicName(i, maxNumTopics)); + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + for (int i = 0; i < numTopics; ++i) + partitionsPerTopic.put(getTopicName(i, maxNumTopics), i + 1); + + int numConsumers = minNumConsumers + new Random().nextInt(maxNumConsumers - minNumConsumers); + + Map<String, List<String>> subscriptions = new HashMap<>(); + for (int i = 0; i < numConsumers; ++i) { + List<String> sub = Utils.sorted(getRandomSublist(topics)); + subscriptions.put(getConsumerName(i, maxNumConsumers), sub); + } + + StickyAssignor assignor = new StickyAssignor(); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + + subscriptions.clear(); + for (int i = 0; i < numConsumers; ++i) { + List<String> sub = Utils.sorted(getRandomSublist(topics)); + subscriptions.put(getConsumerName(i, maxNumConsumers), sub); + } + + assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(assignor.isSticky()); + } + } + + @Test + public void testMoveExistingAssignments() { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + for (int i = 1; i <= 6; i++) + partitionsPerTopic.put(String.format("topic%02d", i), 1); + + Map<String, List<String>> subscriptions = new HashMap<>(); + subscriptions.put("consumer01", topics("topic01", "topic02")); + subscriptions.put("consumer02", topics("topic01", "topic02", "topic03", "topic04")); + subscriptions.put("consumer03", topics("topic02", "topic03", "topic04", "topic05", "topic06")); + + assignor.currentAssignment.put("consumer01", new ArrayList<>(Arrays.asList(tp("topic01", 0)))); + assignor.currentAssignment.put("consumer02", new ArrayList<>(Arrays.asList(tp("topic02", 0), tp("topic03", 0)))); + assignor.currentAssignment.put("consumer03", new ArrayList<>(Arrays.asList(tp("topic04", 0), tp("topic05", 0), tp("topic06", 0)))); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + } + + @Test + public void testStickiness() { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put("topic01", 3); + Map<String, List<String>> subscriptions = new HashMap<>(); + subscriptions.put("consumer01", topics("topic01")); + subscriptions.put("consumer02", topics("topic01")); + subscriptions.put("consumer03", topics("topic01")); + subscriptions.put("consumer04", topics("topic01")); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + Map<String, TopicPartition> partitionsAssigned = new HashMap<>(); + + Set<Entry<String, List<TopicPartition>>> assignments = assignment.entrySet(); + for (Map.Entry<String, List<TopicPartition>> entry: assignments) { + String consumer = entry.getKey(); + List<TopicPartition> topicPartitions = entry.getValue(); + int size = topicPartitions.size(); + assertTrue("Consumer " + consumer + " is assigned more topic partitions than expected.", size <= 1); + if (size == 1) + partitionsAssigned.put(consumer, topicPartitions.get(0)); + } + + // removing the potential group leader + subscriptions.remove("consumer01"); + + assignment = assignor.assign(partitionsPerTopic, subscriptions); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(assignor.isSticky()); + + assignments = assignment.entrySet(); + for (Map.Entry<String, List<TopicPartition>> entry: assignments) { + String consumer = entry.getKey(); + List<TopicPartition> topicPartitions = entry.getValue(); + assertEquals("Consumer " + consumer + " is assigned more topic partitions than expected.", 1, topicPartitions.size()); + assertTrue("Stickiness was not honored for consumer " + consumer, + (!partitionsAssigned.containsKey(consumer)) || (assignment.get(consumer).contains(partitionsAssigned.get(consumer)))); + } + } + + private String getTopicName(int i, int maxNum) { + return getCanonicalName("t", i, maxNum); + } + + private String getConsumerName(int i, int maxNum) { + return getCanonicalName("c", i, maxNum); + } + + private String getCanonicalName(String str, int i, int maxNum) { + return str + pad(i, Integer.toString(maxNum).length()); + } + + private String pad(int num, int digits) { + StringBuilder sb = new StringBuilder(); + int iDigits = Integer.toString(num).length(); + + for (int i = 1; i <= digits - iDigits; ++i) + sb.append("0"); + + sb.append(num); + return sb.toString(); + } + + private static List<String> topics(String... topics) { + return Arrays.asList(topics); + } + + private static TopicPartition tp(String topic, int partition) { + return new TopicPartition(topic, partition); + } + + private static boolean isFullyBalanced(Map<String, List<TopicPartition>> assignment) { + int min = Integer.MAX_VALUE; + int max = Integer.MIN_VALUE; + for (List<TopicPartition> topicPartitions: assignment.values()) { + int size = topicPartitions.size(); + if (size < min) + min = size; + if (size > max) + max = size; + } + return max - min <= 1; + } + + private static List<String> getRandomSublist(ArrayList<String> list) { + List<String> selectedItems = new ArrayList<>(list); + int len = list.size(); + Random random = new Random(); + int howManyToRemove = random.nextInt(len); + + for (int i = 1; i <= howManyToRemove; ++i) + selectedItems.remove(random.nextInt(selectedItems.size())); + + return selectedItems; + } + + /** + * Verifies that the given assignment is valid and balanced with respect to the given subscriptions + * Validity requirements: + * - each consumer is subscribed to topics of all partitions assigned to it, and + * - each partition is assigned to no more than one consumer + * Balance requirements: + * - the assignment is fully balanced (the numbers of topic partitions assigned to consumers differ by at most one), or + * - there is no topic partition that can be moved from one consumer to another with 2+ fewer topic partitions + * + * @param subscriptions: topic subscriptions of each consumer + * @param assignment: given assignment for balance check + */ + private static void verifyValidityAndBalance(Map<String, List<String>> subscriptions, Map<String, List<TopicPartition>> assignments) { + int size = subscriptions.size(); + assert size == assignments.size(); + + List<String> consumers = Utils.sorted(assignments.keySet()); + + for (int i = 0; i < size; ++i) { + String consumer = consumers.get(i); + List<TopicPartition> partitions = assignments.get(consumer); + for (TopicPartition partition: partitions) + assertTrue("Error: Partition " + partition + "is assigned to c" + i + ", but it is not subscribed to Topic t" + partition.topic() + + "\nSubscriptions: " + subscriptions.toString() + "\nAssignments: " + assignments.toString(), + subscriptions.get(consumer).contains(partition.topic())); + + if (i == size - 1) + continue; + + for (int j = i + 1; j < size; ++j) { + String otherConsumer = consumers.get(j); + List<TopicPartition> otherPartitions = assignments.get(otherConsumer); + + Set<TopicPartition> intersection = new HashSet<>(partitions); + intersection.retainAll(otherPartitions); + assertTrue("Error: Consumers c" + i + " and c" + j + " have common partitions assigned to them: " + intersection.toString() + + "\nSubscriptions: " + subscriptions.toString() + "\nAssignments: " + assignments.toString(), + intersection.isEmpty()); + + int len = partitions.size(); + int otherLen = otherPartitions.size(); + + if (Math.abs(len - otherLen) <= 1) + continue; + + Map<String, List<Integer>> map = CollectionUtils.groupDataByTopic(partitions); + Map<String, List<Integer>> otherMap = CollectionUtils.groupDataByTopic(otherPartitions); + + if (len > otherLen) { + for (String topic: map.keySet()) + assertTrue("Error: Some partitions can be moved from c" + i + " to c" + j + " to achieve a better balance" + + "\nc" + i + " has " + len + " partitions, and c" + j + " has " + otherLen + " partitions." + + "\nSubscriptions: " + subscriptions.toString() + "\nAssignments: " + assignments.toString(), + !otherMap.containsKey(topic)); + } + + if (otherLen > len) { + for (String topic: otherMap.keySet()) + assertTrue("Error: Some partitions can be moved from c" + j + " to c" + i + " to achieve a better balance" + + "\nc" + i + " has " + len + " partitions, and c" + j + " has " + otherLen + " partitions." + + "\nSubscriptions: " + subscriptions.toString() + "\nAssignments: " + assignments.toString(), + !map.containsKey(topic)); + } + } + } + } +}