dajac commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1609564193
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java: ########## @@ -33,44 +35,71 @@ public class AssignmentSpec { */ private final SubscriptionType subscriptionType; - public AssignmentSpec( + /** + * Reverse lookup map representing topic partitions with + * their current member assignments. + */ + private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment; + + public GroupSpecImpl( Map<String, AssignmentMemberSpec> members, - SubscriptionType subscriptionType + SubscriptionType subscriptionType, + Map<Uuid, Map<Integer, String>> invertedTargetAssignment ) { Objects.requireNonNull(members); this.members = members; this.subscriptionType = subscriptionType; + this.invertedTargetAssignment = invertedTargetAssignment; } /** - * @return Member metadata keyed by member Id. + * {@inheritDoc} */ public Map<String, AssignmentMemberSpec> members() { return members; } /** - * @return The group's subscription type. + * {@inheritDoc} */ public SubscriptionType subscriptionType() { Review Comment: ditto. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java: ########## @@ -33,44 +35,71 @@ public class AssignmentSpec { */ private final SubscriptionType subscriptionType; - public AssignmentSpec( + /** + * Reverse lookup map representing topic partitions with + * their current member assignments. + */ + private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment; + + public GroupSpecImpl( Map<String, AssignmentMemberSpec> members, - SubscriptionType subscriptionType + SubscriptionType subscriptionType, + Map<Uuid, Map<Integer, String>> invertedTargetAssignment ) { Objects.requireNonNull(members); this.members = members; this.subscriptionType = subscriptionType; + this.invertedTargetAssignment = invertedTargetAssignment; } /** - * @return Member metadata keyed by member Id. + * {@inheritDoc} */ public Map<String, AssignmentMemberSpec> members() { Review Comment: I think that we need `@Override` here. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java: ########## @@ -82,4 +83,34 @@ public static void assertAssignment( assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); }); } + + /** + * Generate a reverse look up map of partition to member target assignments from the given member spec. + * + * @param memberSpec A map where the key is the member Id and the value is an + * AssignmentMemberSpec object containing the member's partition assignments. + * @return Map of topic partition to member assignments. + */ + public static Map<Uuid, Map<Integer, String>> invertedTargetAssignment( + Map<String, AssignmentMemberSpec> memberSpec + ) { + Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>(); + for (Map.Entry<String, AssignmentMemberSpec> memberEntry : memberSpec.entrySet()) { + String memberId = memberEntry.getKey(); + Map<Uuid, Set<Integer>> topicsAndPartitions = memberEntry.getValue().assignedPartitions(); + + for (Map.Entry<Uuid, Set<Integer>> topicEntry : topicsAndPartitions.entrySet()) { + Uuid topicId = topicEntry.getKey(); + Set<Integer> partitions = topicEntry.getValue(); + + invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>()); + Map<Integer, String> partitionMap = invertedTargetAssignment.get(topicId); Review Comment: nit: `Map<Integer, String> partitionMap = invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>());` should work, I think. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ########## @@ -798,6 +798,55 @@ public void testUpdateSubscribedTopicNamesAndSubscriptionType() { ); } + @Test + public void testUpdateInvertedAssignment() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class); + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group", metricsShard); + Uuid topicId = Uuid.randomUuid(); + String memberId1 = "member1"; + String memberId2 = "member2"; + + // Initial assignment for member1 + Assignment initialAssignment = new Assignment(Collections.singletonMap( + topicId, + new HashSet<>(Collections.singletonList(0)) + )); + consumerGroup.updateTargetAssignment(memberId1, initialAssignment); + + // New assignment for member1; + Assignment newAssignment = new Assignment(Collections.singletonMap( + topicId, + new HashSet<>(Collections.singletonList(1)) + )); + consumerGroup.updateTargetAssignment(memberId1, newAssignment); + + // Verify that partition 0 is no longer assigned and partition 1 is assigned to member1 + assertFalse(consumerGroup.invertedTargetAssignment().get(topicId).containsKey(0)); + assertEquals(memberId1, consumerGroup.invertedTargetAssignment().get(topicId).get(1)); Review Comment: We usually prefer verifying the entire map to ensure that it only contains what we expect. The best way is to prepare the expect map and then use assertEquals. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java: ########## @@ -33,44 +35,71 @@ public class AssignmentSpec { */ private final SubscriptionType subscriptionType; - public AssignmentSpec( + /** + * Reverse lookup map representing topic partitions with + * their current member assignments. + */ + private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment; + + public GroupSpecImpl( Map<String, AssignmentMemberSpec> members, - SubscriptionType subscriptionType + SubscriptionType subscriptionType, + Map<Uuid, Map<Integer, String>> invertedTargetAssignment ) { Objects.requireNonNull(members); this.members = members; this.subscriptionType = subscriptionType; + this.invertedTargetAssignment = invertedTargetAssignment; } /** - * @return Member metadata keyed by member Id. + * {@inheritDoc} */ public Map<String, AssignmentMemberSpec> members() { return members; } /** - * @return The group's subscription type. + * {@inheritDoc} */ public SubscriptionType subscriptionType() { return subscriptionType; } + /** + * {@inheritDoc} + */ + @Override + public boolean isPartitionAssigned(Uuid topicId, int partitionId) { + Map<Integer, String> partitionMap = invertedTargetAssignment.get(topicId); + if (partitionMap == null) { + return false; + } + return partitionMap.containsKey(partitionId); + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - AssignmentSpec that = (AssignmentSpec) o; + GroupSpecImpl that = (GroupSpecImpl) o; return subscriptionType == that.subscriptionType && - members.equals(that.members); + members.equals(that.members) && + invertedTargetAssignment.equals(that.invertedTargetAssignment); } @Override public int hashCode() { - return Objects.hash(members, subscriptionType); + int result = members.hashCode(); + result = 31 * result + subscriptionType.hashCode(); + result = 31 * result + invertedTargetAssignment.hashCode(); + return result; } public String toString() { - return "AssignmentSpec(members=" + members + ", subscriptionType=" + subscriptionType.toString() + ')'; + return "GroupSpec(members=" + members + Review Comment: nit: `GroupSpecImpl`. ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java: ########## @@ -186,9 +190,33 @@ private void createAssignmentSpec() { Collections.emptyMap() )); } - assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap()); } + public Map<Uuid, Map<Integer, String>> invertedTargetAssignment( + GroupAssignment groupAssignment + ) { + Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>(); + for (Map.Entry<String, MemberAssignment> memberEntry : groupAssignment.members().entrySet()) { + String memberId = memberEntry.getKey(); + Map<Uuid, Set<Integer>> topicsAndPartitions = memberEntry.getValue().targetPartitions(); + + for (Map.Entry<Uuid, Set<Integer>> topicEntry : topicsAndPartitions.entrySet()) { + Uuid topicId = topicEntry.getKey(); + Set<Integer> partitions = topicEntry.getValue(); + + invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>()); + Map<Integer, String> partitionMap = invertedTargetAssignment.get(topicId); Review Comment: nit: We could combine in one line, I think. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ########## @@ -798,6 +798,55 @@ public void testUpdateSubscribedTopicNamesAndSubscriptionType() { ); } + @Test + public void testUpdateInvertedAssignment() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class); + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group", metricsShard); + Uuid topicId = Uuid.randomUuid(); + String memberId1 = "member1"; + String memberId2 = "member2"; + + // Initial assignment for member1 + Assignment initialAssignment = new Assignment(Collections.singletonMap( + topicId, + new HashSet<>(Collections.singletonList(0)) + )); + consumerGroup.updateTargetAssignment(memberId1, initialAssignment); + Review Comment: Should we also verify the inverted target assignment here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org