lianetm commented on code in PR #21305:
URL: https://github.com/apache/kafka/pull/21305#discussion_r2691274401


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -479,6 +479,241 @@ public void testConsumerGroupMemberEpochValidation() {
         assertEquals(100, result.response().memberEpoch());
     }
 
+    @Test
+    public void testMemberCanRejoinWithEpochZeroInStableState() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Member is in STABLE state with epoch 100.
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1, 2)))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 100, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        ))));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 0, 1, 2)
+        )));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
+
+        assertEquals(MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Prepare assignment for rejoin.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2)
+            )))
+        ));

Review Comment:
   is this really needed? I would expect that since nothing really changed on 
the GC side (we're just retrieving the state for a member that is coming back), 
we wouldn't need to ask the assignor to do anything. Am I missing something? 
(same on other tests here if this makes sense)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to