dajac commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2592662727


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1087,8 +1100,10 @@ void addPartitionEpochs(
                     partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
                 }
                 for (Integer partitionId : assignedPartitions) {
-                    Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
-                    if (prevValue != null) {
+                    Integer prevValue = partitionsOrNull.get(partitionId);
+                    if (prevValue == null || prevValue <= epoch) {

Review Comment:
   Step 4. does not seem correct to me in this example as I explained in the 
comment about the test itself.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,287 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        Uuid topicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-0.
+        // 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-0.
+        // 4. Member B is unassigned partition foo-0. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+        // to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+        // completely unowned partition in step 5 is allowed.  
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // foo-0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // foo becomes unowned.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        // Member A is unassigned foo-0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(14)
+            .setPreviousMemberEpoch(13)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .build()));
+
+        // Verify foo-0 is unowned and bar-0 is owned by member A at epoch 14.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(-1, group.currentPartitionEpoch(fooTopicId, 0));
+        assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupCurrentMemberAssignmentSameEpochWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // This test enacts the following scenario:

Review Comment:
   I have two high level remarks regarding the example:
   1) We are not consistent in the format between them. This is kind of 
annoying while reviewing.
   2) I wonder if we should make them more precise and really follow the 
correct sequence of the protocol and mention all the steps (e.g. group epoch 
bumped, new target assignment, etc.) based on heartbeats. This particular 
example seems incomplete.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,287 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        Uuid topicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-0.
+        // 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-0.
+        // 4. Member B is unassigned partition foo-0. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+        // to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+        // completely unowned partition in step 5 is allowed.  
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // foo-0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // foo becomes unowned.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        // Member A is unassigned foo-0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(14)
+            .setPreviousMemberEpoch(13)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .build()));
+
+        // Verify foo-0 is unowned and bar-0 is owned by member A at epoch 14.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(-1, group.currentPartitionEpoch(fooTopicId, 0));
+        assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupCurrentMemberAssignmentSameEpochWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // This test enacts the following scenario:
+        // 1. Member A unsubscribes from topic bar at epoch 10: Member A { 
epoch: 10, assigned partitions: [foo], pending revocations: [bar] }
+        // 2. A new assignment is available at epoch 11 with member A 
unsubscribing from topic foo.
+        // 3. Member A yields bar. The epoch is bumped to 11: Member A { 
epoch: 11, assigned partitions: [], pending revocations: [foo] }
+        // 4. Member A yields topic foo. Member A { epoch: 11, assigned 
partitions: [], pending revocations: [] } [removed by compaction]
+        // 5. Member B is assigned topic foo. Member B { epoch: 11, assigned 
partitions: [foo], pending revocations: [] }

Review Comment:
   I am confused by this example. I thought that one of the core invariant of 
the protocol is that a partition cannot be owned by two members at the same 
epoch. When member A unsubscribe from foo, the group epoch must be bumped to 
trigger a new computation of the target assignment. foo will only be 
reassignment to a new member is this new epoch. Hence here member B should be 
at epoch 12. Am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to