rreddy-22 commented on code in PR #14481: URL: https://github.com/apache/kafka/pull/14481#discussion_r1396669760
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java: ########## @@ -14,17 +14,891 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.coordinator.group.assignor; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * The general uniform assignment builder is used to generate the target assignment for a consumer group with + * at least one of its members subscribed to a different set of topics. + * + * Assignments are done according to the following principles: + * + * <li> Balance: Ensure partitions are distributed equally among all members. + * The difference in assignments sizes between any two members + * should not exceed one partition. </li> + * <li> Rack Matching: When feasible, aim to assign partitions to members + * located on the same rack thus avoiding cross-zone traffic. </li> + * <li> Stickiness: Minimize partition movements among members by retaining + * as much of the existing assignment as possible. </li> + * + * This assignment builder prioritizes the above properties in the following order: + * Balance > Rack Matching > Stickiness. + */ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); + /** + * The member metadata obtained from the assignment specification. + */ + private final Map<String, AssignmentMemberSpec> members; + + /** + * The topic and partition metadata describer. + */ + private final SubscribedTopicDescriber subscribedTopicDescriber; + + /** + * The list of all the topic Ids that the consumer group is subscribed to. + */ + private final Set<Uuid> subscriptionIds; + + /** + * Rack information. + */ + private final RackInfo rackInfo; + + /** + * List of subscribed members for each topic. + */ + private final Map<Uuid, List<String>> membersPerTopic; + + /** + * The partitions that still need to be assigned. + */ + private final Set<TopicIdPartition> unassignedPartitions; + + /** + * All the partitions that have been retained from the existing assignment. + */ + private final Set<TopicIdPartition> assignedStickyPartitions; + + /** + * Manages assignments to members based on their current assignment size and maximum allowed assignment size. + */ + private final AssignmentManager assignmentManager; + + /** + * List of all the members sorted by their respective assignment sizes. + */ + private final TreeSet<String> sortedMembersByAssignmentSize; + + /** + * Tracks the owner of each partition in the existing assignment on the client. Review Comment: thanks for pointing it out, I meant to be explicit about how the assignment is the one being used by the client currently, I can change it to member, what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org