dajac commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1609564193


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##########
@@ -33,44 +35,71 @@ public class AssignmentSpec {
      */
     private final SubscriptionType subscriptionType;
 
-    public AssignmentSpec(
+    /**
+     * Reverse lookup map representing topic partitions with
+     * their current member assignments.
+     */
+    private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
+
+    public GroupSpecImpl(
         Map<String, AssignmentMemberSpec> members,
-        SubscriptionType subscriptionType
+        SubscriptionType subscriptionType,
+        Map<Uuid, Map<Integer, String>> invertedTargetAssignment
     ) {
         Objects.requireNonNull(members);
         this.members = members;
         this.subscriptionType = subscriptionType;
+        this.invertedTargetAssignment = invertedTargetAssignment;
     }
 
     /**
-     * @return Member metadata keyed by member Id.
+     * {@inheritDoc}
      */
     public Map<String, AssignmentMemberSpec> members() {
         return members;
     }
 
     /**
-     * @return The group's subscription type.
+     * {@inheritDoc}
      */
     public SubscriptionType subscriptionType() {

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##########
@@ -33,44 +35,71 @@ public class AssignmentSpec {
      */
     private final SubscriptionType subscriptionType;
 
-    public AssignmentSpec(
+    /**
+     * Reverse lookup map representing topic partitions with
+     * their current member assignments.
+     */
+    private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
+
+    public GroupSpecImpl(
         Map<String, AssignmentMemberSpec> members,
-        SubscriptionType subscriptionType
+        SubscriptionType subscriptionType,
+        Map<Uuid, Map<Integer, String>> invertedTargetAssignment
     ) {
         Objects.requireNonNull(members);
         this.members = members;
         this.subscriptionType = subscriptionType;
+        this.invertedTargetAssignment = invertedTargetAssignment;
     }
 
     /**
-     * @return Member metadata keyed by member Id.
+     * {@inheritDoc}
      */
     public Map<String, AssignmentMemberSpec> members() {

Review Comment:
   I think that we need `@Override` here.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -82,4 +83,34 @@ public static void assertAssignment(
             assertEquals(expectedAssignment.get(memberId), 
computedAssignmentForMember);
         });
     }
+
+    /**
+     * Generate a reverse look up map of partition to member target 
assignments from the given member spec.
+     *
+     * @param memberSpec        A map where the key is the member Id and the 
value is an
+     *                          AssignmentMemberSpec object containing the 
member's partition assignments.
+     * @return Map of topic partition to member assignments.
+     */
+    public static Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
+        Map<String, AssignmentMemberSpec> memberSpec
+    ) {
+        Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new 
HashMap<>();
+        for (Map.Entry<String, AssignmentMemberSpec> memberEntry : 
memberSpec.entrySet()) {
+            String memberId = memberEntry.getKey();
+            Map<Uuid, Set<Integer>> topicsAndPartitions = 
memberEntry.getValue().assignedPartitions();
+
+            for (Map.Entry<Uuid, Set<Integer>> topicEntry : 
topicsAndPartitions.entrySet()) {
+                Uuid topicId = topicEntry.getKey();
+                Set<Integer> partitions = topicEntry.getValue();
+
+                invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>());
+                Map<Integer, String> partitionMap = 
invertedTargetAssignment.get(topicId);

Review Comment:
   nit: `Map<Integer, String> partitionMap = 
invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>());` should work, I 
think.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -798,6 +798,55 @@ public void 
testUpdateSubscribedTopicNamesAndSubscriptionType() {
         );
     }
 
+    @Test
+    public void testUpdateInvertedAssignment() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
"test-group", metricsShard);
+        Uuid topicId = Uuid.randomUuid();
+        String memberId1 = "member1";
+        String memberId2 = "member2";
+
+        // Initial assignment for member1
+        Assignment initialAssignment = new Assignment(Collections.singletonMap(
+            topicId,
+            new HashSet<>(Collections.singletonList(0))
+        ));
+        consumerGroup.updateTargetAssignment(memberId1, initialAssignment);
+
+        // New assignment for member1;
+        Assignment newAssignment = new Assignment(Collections.singletonMap(
+            topicId,
+            new HashSet<>(Collections.singletonList(1))
+        ));
+        consumerGroup.updateTargetAssignment(memberId1, newAssignment);
+
+        // Verify that partition 0 is no longer assigned and partition 1 is 
assigned to member1
+        
assertFalse(consumerGroup.invertedTargetAssignment().get(topicId).containsKey(0));
+        assertEquals(memberId1, 
consumerGroup.invertedTargetAssignment().get(topicId).get(1));

Review Comment:
   We usually prefer verifying the entire map to ensure that it only contains 
what we expect. The best way is to prepare the expect map and then use 
assertEquals.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##########
@@ -33,44 +35,71 @@ public class AssignmentSpec {
      */
     private final SubscriptionType subscriptionType;
 
-    public AssignmentSpec(
+    /**
+     * Reverse lookup map representing topic partitions with
+     * their current member assignments.
+     */
+    private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
+
+    public GroupSpecImpl(
         Map<String, AssignmentMemberSpec> members,
-        SubscriptionType subscriptionType
+        SubscriptionType subscriptionType,
+        Map<Uuid, Map<Integer, String>> invertedTargetAssignment
     ) {
         Objects.requireNonNull(members);
         this.members = members;
         this.subscriptionType = subscriptionType;
+        this.invertedTargetAssignment = invertedTargetAssignment;
     }
 
     /**
-     * @return Member metadata keyed by member Id.
+     * {@inheritDoc}
      */
     public Map<String, AssignmentMemberSpec> members() {
         return members;
     }
 
     /**
-     * @return The group's subscription type.
+     * {@inheritDoc}
      */
     public SubscriptionType subscriptionType() {
         return subscriptionType;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isPartitionAssigned(Uuid topicId, int partitionId) {
+        Map<Integer, String> partitionMap = 
invertedTargetAssignment.get(topicId);
+        if (partitionMap == null) {
+            return false;
+        }
+        return partitionMap.containsKey(partitionId);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-        AssignmentSpec that = (AssignmentSpec) o;
+        GroupSpecImpl that = (GroupSpecImpl) o;
         return subscriptionType == that.subscriptionType &&
-            members.equals(that.members);
+            members.equals(that.members) &&
+            invertedTargetAssignment.equals(that.invertedTargetAssignment);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(members, subscriptionType);
+        int result = members.hashCode();
+        result = 31 * result + subscriptionType.hashCode();
+        result = 31 * result + invertedTargetAssignment.hashCode();
+        return result;
     }
 
     public String toString() {
-        return "AssignmentSpec(members=" + members + ", subscriptionType=" + 
subscriptionType.toString() + ')';
+        return "GroupSpec(members=" + members +

Review Comment:
   nit: `GroupSpecImpl`.



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##########
@@ -186,9 +190,33 @@ private void createAssignmentSpec() {
                 Collections.emptyMap()
             ));
         }
-        assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+        groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, 
Collections.emptyMap());
     }
 
+    public Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
+        GroupAssignment groupAssignment
+    ) {
+        Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new 
HashMap<>();
+        for (Map.Entry<String, MemberAssignment> memberEntry : 
groupAssignment.members().entrySet()) {
+            String memberId = memberEntry.getKey();
+            Map<Uuid, Set<Integer>> topicsAndPartitions = 
memberEntry.getValue().targetPartitions();
+
+            for (Map.Entry<Uuid, Set<Integer>> topicEntry : 
topicsAndPartitions.entrySet()) {
+                Uuid topicId = topicEntry.getKey();
+                Set<Integer> partitions = topicEntry.getValue();
+
+                invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>());
+                Map<Integer, String> partitionMap = 
invertedTargetAssignment.get(topicId);

Review Comment:
   nit: We could combine in one line, I think.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -798,6 +798,55 @@ public void 
testUpdateSubscribedTopicNamesAndSubscriptionType() {
         );
     }
 
+    @Test
+    public void testUpdateInvertedAssignment() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
"test-group", metricsShard);
+        Uuid topicId = Uuid.randomUuid();
+        String memberId1 = "member1";
+        String memberId2 = "member2";
+
+        // Initial assignment for member1
+        Assignment initialAssignment = new Assignment(Collections.singletonMap(
+            topicId,
+            new HashSet<>(Collections.singletonList(0))
+        ));
+        consumerGroup.updateTargetAssignment(memberId1, initialAssignment);
+

Review Comment:
   Should we also verify the inverted target assignment here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to