jeffkbkim commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1272854832


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -5205,11 +5245,102 @@ public void 
testReplaceStaticMemberInStableStateErrors() throws Exception {
 
         GenericGroupMember revertedMember = 
group.member(group.staticMemberId("group-instance-id"));
 
-        assertEquals(oldMember.memberId(), revertedMember.memberId());
-        assertEquals(oldMember.groupInstanceId(), 
revertedMember.groupInstanceId());
-        assertEquals(oldMember.rebalanceTimeoutMs(), 
revertedMember.rebalanceTimeoutMs());
-        assertEquals(oldMember.sessionTimeoutMs(), 
revertedMember.sessionTimeoutMs());
-        assertEquals(oldMember.supportedProtocols(), 
revertedMember.supportedProtocols());
+        assertEquals(oldMemberId, revertedMember.memberId());
+        assertEquals(Optional.of("group-instance-id"), 
revertedMember.groupInstanceId());
+        assertEquals(4000, revertedMember.rebalanceTimeoutMs());
+        assertEquals(3000, revertedMember.sessionTimeoutMs());
+        assertEquals(protocols, revertedMember.supportedProtocols());
+        assertEquals(1, group.size());
+        assertEquals(1, group.generationId());
+        assertTrue(group.isInState(STABLE));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testReplaceStaticMemberInStableStateSucceeds(
+        boolean supportSkippingAssignment
+    ) throws Exception {
+        // If the append future succeeds, the soft state is updated with the 
new member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection(0);
+
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("foo"))).array())
+        );
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withGroupInstanceId("group-instance-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType("consumer")
+            .withProtocols(protocols)
+            .build();
+
+        JoinGroupResponseData response = 
context.joinGenericGroupAndCompleteJoin(
+            request,
+            true,
+            supportSkippingAssignment
+        );
+
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        assertEquals(1, group.size());
+        assertEquals(1, group.generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        String oldMemberId = response.memberId();
+        // Simulate successful sync group phase
+        group.transitionTo(STABLE);
+
+        // Static member rejoins with UNKNOWN_MEMBER_ID and the append 
succeeds.
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("roundrobin")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("bar"))).array()));
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(
+            request
+                .setProtocols(protocols)
+                .setRebalanceTimeoutMs(7000)
+                .setSessionTimeoutMs(6000),
+            responseFuture,
+            true,
+            supportSkippingAssignment);
+
+        assertEquals(
+            Collections.singletonList(newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        assertFalse(responseFuture.isDone());
+
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);

Review Comment:
   i like it explicitly stated since it shows when we expect the append future 
to be completed.
   
   we only replay in `sleep()`. the append future can be completed from both 
join group and sync group requests along with sleep(), and follower sync 
requests do not generate records so it doesn't help simplify. also i think it 
confuses the reader more on what is actually going under the hood



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to