dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1589059959


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10921,6 +10822,1146 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
         assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
     }
 
+    @Test
+    public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .build()))
+            .withConsumerGroupMaxSize(1)
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+        assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+    }
+
+    @Test
+    public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+                    .build()))
+            .build();
+
+        JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+        assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+        JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType("connect")
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+        assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+    }
+
+    @Test
+    public void testJoiningConsumerGroupWithNewDynamicMember() throws 
Exception {
+        String groupId = "group-id";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+            String memberId = Uuid.randomUuid().toString();
+            MockPartitionAssignor assignor = new 
MockPartitionAssignor("range");
+            GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+                .withAssignors(Collections.singletonList(assignor))
+                .withMetadataImage(new MetadataImageBuilder()
+                    .addTopic(fooTopicId, fooTopicName, 2)
+                    .addTopic(barTopicId, barTopicName, 1)
+                    .addRacks()
+                    .build())
+                .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                    .withSubscriptionMetadata(new HashMap<String, 
TopicMetadata>() {
+                        {
+                            put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                        }
+                    })
+                    .withMember(new ConsumerGroupMember.Builder(memberId)
+                        .setState(MemberState.STABLE)
+                        .setMemberEpoch(10)
+                        .setPreviousMemberEpoch(10)
+                        .setAssignedPartitions(mkAssignment(
+                            mkTopicAssignment(fooTopicId, 0, 1)))
+                        .build())
+                    .withAssignment(memberId, mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1)))
+                    .withAssignmentEpoch(10))
+                .build();
+
+            JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(UNKNOWN_MEMBER_ID)
+                
.withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol(
+                    Arrays.asList(fooTopicName, barTopicName),
+                    Collections.emptyList(),
+                    version))
+                .build();
+
+            // The first round of join request gets the new member id.
+            GroupMetadataManagerTestContext.JoinResult firstJoinResult = 
context.sendClassicGroupJoin(
+                request,
+                true
+            );
+            assertTrue(firstJoinResult.records.isEmpty());
+            // Simulate a successful write to the log.
+            firstJoinResult.appendFuture.complete(null);
+
+            assertTrue(firstJoinResult.joinFuture.isDone());
+            assertEquals(Errors.MEMBER_ID_REQUIRED.code(), 
firstJoinResult.joinFuture.get().errorCode());
+            String newMemberId = firstJoinResult.joinFuture.get().memberId();
+            assertNotEquals("", newMemberId);
+
+            assignor.prepareGroupAssignment(new GroupAssignment(
+                new HashMap<String, MemberAssignment>() {
+                    {
+                        put(memberId, new MemberAssignment(mkAssignment(
+                            mkTopicAssignment(fooTopicId, 0)
+                        )));
+                        put(newMemberId, new MemberAssignment(mkAssignment(
+                            mkTopicAssignment(barTopicId, 0)
+                        )));
+                    }
+                }
+            ));
+
+            JoinGroupRequestData secondRequest = new JoinGroupRequestData()
+                .setGroupId(request.groupId())
+                .setMemberId(newMemberId)
+                .setProtocolType(request.protocolType())
+                .setProtocols(request.protocols())
+                .setSessionTimeoutMs(request.sessionTimeoutMs())
+                .setRebalanceTimeoutMs(request.rebalanceTimeoutMs())
+                .setReason(request.reason());
+
+            // Send second join group request for a new dynamic member with 
the new member id.
+            GroupMetadataManagerTestContext.JoinResult secondJoinResult = 
context.sendClassicGroupJoin(
+                secondRequest,
+                true
+            );
+
+            ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(newMemberId)
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(0)
+                .setState(MemberState.STABLE)
+                .setClientId("client")
+                .setClientHost("localhost/127.0.0.1")
+                .setSubscribedTopicNames(Arrays.asList(fooTopicName, 
barTopicName))
+                .setRebalanceTimeoutMs(500)
+                .setAssignedPartitions(assignor.targetPartitions(newMemberId))
+                .setSupportedClassicProtocols(request.protocols())
+                .build();
+
+            List<Record> expectedRecords = Arrays.asList(
+                RecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedMember),
+                RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
+                    }
+                }),
+                RecordHelpers.newGroupEpochRecord(groupId, 11),
+
+                RecordHelpers.newTargetAssignmentRecord(groupId, memberId, 
assignor.targetPartitions(memberId)),
+                RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, 
assignor.targetPartitions(newMemberId)),
+                RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+
+                RecordHelpers.newCurrentAssignmentRecord(groupId, 
expectedMember)
+            );
+            assertRecordsEquals(expectedRecords.subList(0, 3), 
secondJoinResult.records.subList(0, 3));
+            assertUnorderedListEquals(expectedRecords.subList(3, 5), 
secondJoinResult.records.subList(3, 5));
+            assertRecordsEquals(expectedRecords.subList(5, 7), 
secondJoinResult.records.subList(5, 7));
+
+            secondJoinResult.appendFuture.complete(null);
+            assertTrue(secondJoinResult.joinFuture.isDone());
+            assertEquals(
+                new JoinGroupResponseData()
+                    .setMemberId(newMemberId)
+                    .setGenerationId(11)
+                    .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                    .setProtocolName("range"),
+                secondJoinResult.joinFuture.get()
+            );
+
+            context.assertSessionTimeout(groupId, newMemberId, 
request.sessionTimeoutMs());
+            context.assertSyncTimeout(groupId, newMemberId, 
request.rebalanceTimeoutMs());
+        }
+    }
+
+    @Test
+    public void testJoiningConsumerGroupFailingToPersistRecords() throws 
Exception {
+        String groupId = "group-id";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        String memberId = Uuid.randomUuid().toString();
+        String newMemberId = Uuid.randomUuid().toString();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0)
+                    )));
+                    put(newMemberId, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 1)
+                    )));
+                }
+            }
+        ));
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                    }
+                })
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1)))
+                .withAssignmentEpoch(10))
+            .build();
+        context.commit();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(newMemberId)
+            .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol(
+                Collections.singletonList(fooTopicName),
+                Collections.emptyList()))
+            .build();
+
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request);
+
+        // Simulate a failed write to the log.
+        joinResult.appendFuture.completeExceptionally(new 
NotLeaderOrFollowerException());
+        context.rollback();
+
+        context.assertNoSessionTimeout(groupId, newMemberId);
+        context.assertNoSyncTimeout(groupId, newMemberId);
+        assertThrows(
+            UnknownMemberIdException.class,
+            () -> 
context.groupMetadataManager.consumerGroup(groupId).getOrMaybeCreateMember(newMemberId,
 false)
+        );
+    }
+
+    @Test
+    public void testJoiningConsumerGroupWithNewStaticMember() throws Exception 
{
+        String groupId = "group-id";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        String memberId = Uuid.randomUuid().toString();
+        String instanceId = "instance-id";
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        // An empty target assignment is used as the new member id is unknown 
before calling join group.
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .addTopic(barTopicId, barTopicName, 1)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                    }
+                })
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId(instanceId)
+            .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol(
+                Arrays.asList(fooTopicName, barTopicName),
+                Collections.emptyList()))
+            .build();
+
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request);
+
+        // Simulate a successful write to log.
+        joinResult.appendFuture.complete(null);
+        String newMemberId = joinResult.joinFuture.get().memberId();
+        assertNotEquals("", newMemberId);
+
+        ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(newMemberId)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(0)
+            .setInstanceId(instanceId)
+            .setState(MemberState.STABLE)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName))
+            .setRebalanceTimeoutMs(500)
+            .setSupportedClassicProtocols(request.protocols())
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember),
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId, 
Collections.emptyMap()),
+            RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId, 
Collections.emptyMap()),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+        );
+        assertRecordsEquals(expectedRecords.subList(0, 3), 
joinResult.records.subList(0, 3));
+        assertUnorderedListEquals(expectedRecords.subList(3, 5), 
joinResult.records.subList(3, 5));
+        assertRecordsEquals(expectedRecords.subList(5, 7), 
joinResult.records.subList(5, 7));
+
+        assertTrue(joinResult.joinFuture.isDone());
+        assertEquals(
+            new JoinGroupResponseData()
+                .setMemberId(newMemberId)
+                .setGenerationId(11)
+                .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                .setProtocolName("range"),
+            joinResult.joinFuture.get()
+        );
+
+        context.assertSessionTimeout(groupId, newMemberId, 
request.sessionTimeoutMs());
+        context.assertSyncTimeout(groupId, newMemberId, 
request.rebalanceTimeoutMs());
+    }
+
+    @Test
+    public void testJoiningConsumerGroupReplacingExistingStaticMember() throws 
Exception {
+        String groupId = "group-id";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        String memberId = Uuid.randomUuid().toString();
+        String instanceId = "instance-id";
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        // An empty target assignment is used as the new member id is unknown 
before calling join group.
+
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));

Review Comment:
   In this case, I think that we could actually have an assignor (e.g. 
NoOpPartitionAssignor) which returns the assigned partitions that it receives. 
This would avoid this incoherent state. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to