dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1582404976
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10921,6 +10855,544 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); } + @Test + public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build())) + .withConsumerGroupMaxSize(1) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> context.sendClassicGroupJoin(request)); + assertEquals("The consumer group has reached its maximum capacity of 1 members.", ex.getMessage()); + } + + @Test + public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception { + int minSessionTimeout = 50; + int maxSessionTimeout = 100; + String groupId = "group-id"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withClassicGroupMinSessionTimeoutMs(minSessionTimeout) + .withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)) + .build(); + + JoinGroupRequestData requestWithSmallSessionTimeout = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withSessionTimeoutMs(minSessionTimeout - 1) + .build(); + assertThrows(InvalidSessionTimeoutException.class, () -> context.sendClassicGroupJoin(requestWithSmallSessionTimeout)); + + JoinGroupRequestData requestWithLargeSessionTimeout = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withSessionTimeoutMs(maxSessionTimeout + 1) + .build(); + assertThrows(InvalidSessionTimeoutException.class, () -> context.sendClassicGroupJoin(requestWithLargeSessionTimeout)); + } + + @Test + public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin")) + .build())) + .build(); + + JoinGroupRequestData requestWithEmptyProtocols = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithEmptyProtocols)); + + JoinGroupRequestData requestWithInvalidProtocolType = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("connect") + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithInvalidProtocolType)); + } + + @Test + public void testConsumerGroupJoinWithNewDynamicMember() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList())) + .build(); + assertThrows(MemberIdRequiredException.class, () -> context.sendClassicGroupJoin(request, true)); + + // Simulate getting the new member id from the error response. + String newMemberId = Uuid.randomUuid().toString(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + JoinGroupRequestData secondRequest = new JoinGroupRequestData() + .setGroupId(request.groupId()) + .setMemberId(newMemberId) + .setProtocolType(request.protocolType()) + .setProtocols(request.protocols()) + .setSessionTimeoutMs(request.sessionTimeoutMs()) + .setRebalanceTimeoutMs(request.rebalanceTimeoutMs()) + .setReason(request.reason()); + + GroupMetadataManagerTestContext.JoinResult secondJoinResult = context.sendClassicGroupJoin( + secondRequest, + true + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setAssignedPartitions(assignor.targetPartitions(newMemberId)) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, assignor.targetPartitions(newMemberId)), + + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), secondJoinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), secondJoinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), secondJoinResult.records.subList(5, 7)); + + assertTrue(secondJoinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + secondJoinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, 45000); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @Test + public void testConsumerGroupJoinWithNewStaticMember() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList())) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + String newMemberId = joinResult.joinFuture.get().memberId(); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, Collections.emptyMap()), + + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), joinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), joinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), joinResult.records.subList(5, 7)); + + assertTrue(joinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, 45000); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @Test + public void testConsumerGroupJoinReplacingExistingStaticMember() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol( + Collections.singletonList(fooTopicName), + Collections.emptyList())) + .build(); + + // The static member joins with UNKNOWN_MEMBER_ID. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request, + true + ); + String newMemberId = joinResult.joinFuture.get().memberId(); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(0) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build(); + + List<Record> expectedRecords = Arrays.asList( + // Remove the old static member. + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId), + + // Create the new static member. + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords, joinResult.records); + + assertTrue(joinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(0) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, 45000); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @ParameterizedTest + @ValueSource(strings = {"", "instance-id"}) + public void testConsumerGroupRejoinWithExistingMember(String instanceId) throws Exception { + String groupId = "group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 2, mkMapOfPartitionRacks(1))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSupportedClassicProtocols(protocols) + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(barTopicId, 0))) + .withAssignmentEpoch(10)) + .build(); + context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + // The member rejoins with the same member id and protocols. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + assertEquals(Collections.emptyList(), joinResult.records); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(10) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + context.assertSyncTimeout(groupId, memberId1, request.rebalanceTimeoutMs()); + } + + @Test + public void testConsumerGroupJoinStaticMemberWithUnknownInstanceId() throws Exception { + String groupId = "group-id"; + String instanceId = "instance-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + // Set up a ConsumerGroup with no static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setSupportedClassicProtocols(protocols) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .build())) + .build(); + + // The member joins with an instance id. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupJoin(request)); + } + + @Test + public void testConsumerGroupJoinStaticMemberWithUnmatchedMemberId() throws Exception { + String groupId = "group-id"; + String instanceId = "instance-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setSupportedClassicProtocols(protocols) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .build())) + .build(); + + // The member joins with the same instance id and a different member id. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(Uuid.randomUuid().toString()) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + assertThrows(FencedInstanceIdException.class, () -> context.sendClassicGroupJoin(request)); + } + Review Comment: Just added. a few tests using all versions of the embedded consumer protocol. It seems that Subscription's ownedPartitions was not supported until version 1, but the new protocol can barely work without knowing the ownedPartitions. Should we add a check for ConsumerProtocol version that requires it to be greater than 0? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10921,6 +10855,544 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); } + @Test + public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build())) + .withConsumerGroupMaxSize(1) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> context.sendClassicGroupJoin(request)); + assertEquals("The consumer group has reached its maximum capacity of 1 members.", ex.getMessage()); + } + + @Test + public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception { + int minSessionTimeout = 50; + int maxSessionTimeout = 100; + String groupId = "group-id"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withClassicGroupMinSessionTimeoutMs(minSessionTimeout) + .withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)) + .build(); + + JoinGroupRequestData requestWithSmallSessionTimeout = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withSessionTimeoutMs(minSessionTimeout - 1) + .build(); + assertThrows(InvalidSessionTimeoutException.class, () -> context.sendClassicGroupJoin(requestWithSmallSessionTimeout)); + + JoinGroupRequestData requestWithLargeSessionTimeout = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withSessionTimeoutMs(maxSessionTimeout + 1) + .build(); + assertThrows(InvalidSessionTimeoutException.class, () -> context.sendClassicGroupJoin(requestWithLargeSessionTimeout)); + } + + @Test + public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin")) + .build())) + .build(); + + JoinGroupRequestData requestWithEmptyProtocols = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithEmptyProtocols)); + + JoinGroupRequestData requestWithInvalidProtocolType = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("connect") + .withDefaultProtocolTypeAndProtocols() + .build(); + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithInvalidProtocolType)); + } + + @Test + public void testConsumerGroupJoinWithNewDynamicMember() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList())) + .build(); + assertThrows(MemberIdRequiredException.class, () -> context.sendClassicGroupJoin(request, true)); + + // Simulate getting the new member id from the error response. + String newMemberId = Uuid.randomUuid().toString(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(newMemberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(barTopicId, 0) + ))); + } + } + )); + + JoinGroupRequestData secondRequest = new JoinGroupRequestData() + .setGroupId(request.groupId()) + .setMemberId(newMemberId) + .setProtocolType(request.protocolType()) + .setProtocols(request.protocols()) + .setSessionTimeoutMs(request.sessionTimeoutMs()) + .setRebalanceTimeoutMs(request.rebalanceTimeoutMs()) + .setReason(request.reason()); + + GroupMetadataManagerTestContext.JoinResult secondJoinResult = context.sendClassicGroupJoin( + secondRequest, + true + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setAssignedPartitions(assignor.targetPartitions(newMemberId)) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, assignor.targetPartitions(newMemberId)), + + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), secondJoinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), secondJoinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), secondJoinResult.records.subList(5, 7)); + + assertTrue(secondJoinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + secondJoinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, 45000); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @Test + public void testConsumerGroupJoinWithNewStaticMember() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Collections.emptyList())) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + String newMemberId = joinResult.joinFuture.get().memberId(); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, Collections.emptyMap()), + + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords.subList(0, 3), joinResult.records.subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 5), joinResult.records.subList(3, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), joinResult.records.subList(5, 7)); + + assertTrue(joinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(11) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, 45000); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @Test + public void testConsumerGroupJoinReplacingExistingStaticMember() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignmentEpoch(10)) + .build(); + context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol( + Collections.singletonList(fooTopicName), + Collections.emptyList())) + .build(); + + // The static member joins with UNKNOWN_MEMBER_ID. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request, + true + ); + String newMemberId = joinResult.joinFuture.get().memberId(); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(newMemberId) + .setMemberEpoch(0) + .setPreviousMemberEpoch(0) + .setInstanceId(instanceId) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .setRebalanceTimeoutMs(500) + .setSupportedClassicProtocols(request.protocols()) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .build(); + + List<Record> expectedRecords = Arrays.asList( + // Remove the old static member. + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId), + + // Create the new static member. + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), + RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + assertRecordsEquals(expectedRecords, joinResult.records); + + assertTrue(joinResult.joinFuture.isDone()); + assertEquals( + new JoinGroupResponseData() + .setMemberId(newMemberId) + .setGenerationId(0) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + + context.assertSessionTimeout(groupId, newMemberId, 45000); + context.assertSyncTimeout(groupId, newMemberId, request.rebalanceTimeoutMs()); + } + + @ParameterizedTest + @ValueSource(strings = {"", "instance-id"}) + public void testConsumerGroupRejoinWithExistingMember(String instanceId) throws Exception { + String groupId = "group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 2, mkMapOfPartitionRacks(1))); + } + }) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSupportedClassicProtocols(protocols) + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setRebalanceTimeoutMs(500) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName)) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(barTopicId, 0))) + .withAssignmentEpoch(10)) + .build(); + context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + // The member rejoins with the same member id and protocols. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + assertEquals(Collections.emptyList(), joinResult.records); + assertEquals( + new JoinGroupResponseData() + .setMemberId(memberId1) + .setGenerationId(10) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName("range"), + joinResult.joinFuture.get() + ); + context.assertSyncTimeout(groupId, memberId1, request.rebalanceTimeoutMs()); + } + + @Test + public void testConsumerGroupJoinStaticMemberWithUnknownInstanceId() throws Exception { + String groupId = "group-id"; + String instanceId = "instance-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + // Set up a ConsumerGroup with no static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setSupportedClassicProtocols(protocols) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .build())) + .build(); + + // The member joins with an instance id. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId1) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupJoin(request)); + } + + @Test + public void testConsumerGroupJoinStaticMemberWithUnmatchedMemberId() throws Exception { + String groupId = "group-id"; + String instanceId = "instance-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + GroupMetadataManagerTestContext.toRangeProtocol( + Arrays.asList(fooTopicName, barTopicName), + Arrays.asList(new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 1)) + ); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setSupportedClassicProtocols(protocols) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .build())) + .build(); + + // The member joins with the same instance id and a different member id. + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(Uuid.randomUuid().toString()) + .withGroupInstanceId(instanceId) + .withProtocols(protocols) + .build(); + + assertThrows(FencedInstanceIdException.class, () -> context.sendClassicGroupJoin(request)); + } + Review Comment: Just added a few tests using all versions of the embedded consumer protocol. It seems that Subscription's ownedPartitions was not supported until version 1, but the new protocol can barely work without knowing the ownedPartitions. Should we add a check for ConsumerProtocol version that requires it to be greater than 0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org