dajac commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1273950275


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -5205,11 +5245,102 @@ public void 
testReplaceStaticMemberInStableStateErrors() throws Exception {
 
         GenericGroupMember revertedMember = 
group.member(group.staticMemberId("group-instance-id"));
 
-        assertEquals(oldMember.memberId(), revertedMember.memberId());
-        assertEquals(oldMember.groupInstanceId(), 
revertedMember.groupInstanceId());
-        assertEquals(oldMember.rebalanceTimeoutMs(), 
revertedMember.rebalanceTimeoutMs());
-        assertEquals(oldMember.sessionTimeoutMs(), 
revertedMember.sessionTimeoutMs());
-        assertEquals(oldMember.supportedProtocols(), 
revertedMember.supportedProtocols());
+        assertEquals(oldMemberId, revertedMember.memberId());
+        assertEquals(Optional.of("group-instance-id"), 
revertedMember.groupInstanceId());
+        assertEquals(4000, revertedMember.rebalanceTimeoutMs());
+        assertEquals(3000, revertedMember.sessionTimeoutMs());
+        assertEquals(protocols, revertedMember.supportedProtocols());
+        assertEquals(1, group.size());
+        assertEquals(1, group.generationId());
+        assertTrue(group.isInState(STABLE));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testReplaceStaticMemberInStableStateSucceeds(
+        boolean supportSkippingAssignment
+    ) throws Exception {
+        // If the append future succeeds, the soft state is updated with the 
new member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection(0);
+
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("foo"))).array())
+        );
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withGroupInstanceId("group-instance-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType("consumer")
+            .withProtocols(protocols)
+            .build();
+
+        JoinGroupResponseData response = 
context.joinGenericGroupAndCompleteJoin(
+            request,
+            true,
+            supportSkippingAssignment
+        );
+
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        assertEquals(1, group.size());
+        assertEquals(1, group.generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        String oldMemberId = response.memberId();
+        // Simulate successful sync group phase
+        group.transitionTo(STABLE);
+
+        // Static member rejoins with UNKNOWN_MEMBER_ID and the append 
succeeds.
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("roundrobin")
+            .setMetadata(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                Collections.singletonList("bar"))).array()));
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(
+            request
+                .setProtocols(protocols)
+                .setRebalanceTimeoutMs(7000)
+                .setSessionTimeoutMs(6000),
+            responseFuture,
+            true,
+            supportSkippingAssignment);
+
+        assertEquals(
+            Collections.singletonList(newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        assertFalse(responseFuture.isDone());
+
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);

Review Comment:
   That's fair.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to