dajac commented on code in PR #14017: URL: https://github.com/apache/kafka/pull/14017#discussion_r1273950275
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -5205,11 +5245,102 @@ public void testReplaceStaticMemberInStableStateErrors() throws Exception { GenericGroupMember revertedMember = group.member(group.staticMemberId("group-instance-id")); - assertEquals(oldMember.memberId(), revertedMember.memberId()); - assertEquals(oldMember.groupInstanceId(), revertedMember.groupInstanceId()); - assertEquals(oldMember.rebalanceTimeoutMs(), revertedMember.rebalanceTimeoutMs()); - assertEquals(oldMember.sessionTimeoutMs(), revertedMember.sessionTimeoutMs()); - assertEquals(oldMember.supportedProtocols(), revertedMember.supportedProtocols()); + assertEquals(oldMemberId, revertedMember.memberId()); + assertEquals(Optional.of("group-instance-id"), revertedMember.groupInstanceId()); + assertEquals(4000, revertedMember.rebalanceTimeoutMs()); + assertEquals(3000, revertedMember.sessionTimeoutMs()); + assertEquals(protocols, revertedMember.supportedProtocols()); + assertEquals(1, group.size()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(STABLE)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testReplaceStaticMemberInStableStateSucceeds( + boolean supportSkippingAssignment + ) throws Exception { + // If the append future succeeds, the soft state is updated with the new member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(0); + + protocols.add(new JoinGroupRequestProtocol() + .setName("range") + .setMetadata(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("foo"))).array()) + ); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("group-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("consumer") + .withProtocols(protocols) + .build(); + + JoinGroupResponseData response = context.joinGenericGroupAndCompleteJoin( + request, + true, + supportSkippingAssignment + ); + + assertEquals(Errors.NONE.code(), response.errorCode()); + assertEquals(1, group.size()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + String oldMemberId = response.memberId(); + // Simulate successful sync group phase + group.transitionTo(STABLE); + + // Static member rejoins with UNKNOWN_MEMBER_ID and the append succeeds. + protocols.add(new JoinGroupRequestProtocol() + .setName("roundrobin") + .setMetadata(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("bar"))).array())); + + CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request + .setProtocols(protocols) + .setRebalanceTimeoutMs(7000) + .setSessionTimeoutMs(6000), + responseFuture, + true, + supportSkippingAssignment); + + assertEquals( + Collections.singletonList(newGroupMetadataRecord(group, MetadataVersion.latest())), + result.records() + ); + assertFalse(responseFuture.isDone()); + + // Simulate a successful write to the log. + result.appendFuture().complete(null); Review Comment: That's fair. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org