Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-06 Thread via GitHub


dajac merged PR #15798:
URL: https://github.com/apache/kafka/pull/15798


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-06 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1590935155


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -203,6 +189,22 @@ public ConsumerGroupMember build() {
 return member;
 }
 
+private boolean ownsRevokedPartitions(

Review Comment:
   nit: Should we add some javadoc?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class NoOpPartitionAssignor implements PartitionAssignor {
+static final String NAME = "no-op";
+@Override

Review Comment:
   nit: Let's add an empty line before this one.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -203,6 +189,22 @@ public ConsumerGroupMember build() {
 return member;
 }
 
+private boolean ownsRevokedPartitions(
+Map> assignment
+) {
+if (ownedTopicPartitions == null) return true;
+
+for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions 
: ownedTopicPartitions) {
+for (Integer partitionId : topicPartitions.partitions()) {
+if (assignment.getOrDefault(topicPartitions.topicId(), 
Collections.emptySet()).contains(partitionId)) {

Review Comment:
   nit: There is a small optimization here. We could do 
`assignment.getOrDefault(topicPartitions.topicId(), Collections.emptySet())` 
before looping on the partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1589476625


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10822,1146 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testJoiningConsumerGroupWithNewDynamicMember() throws 
Exception {
+String groupId = "group-id";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new 
MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 2)
+.addTopic(barTopicId, barTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withSubscriptionMetadata(new HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+}
+})
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1)))
+   

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-03 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1589336191


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10822,1146 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testJoiningConsumerGroupWithNewDynamicMember() throws 
Exception {
+String groupId = "group-id";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new 
MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 2)
+.addTopic(barTopicId, barTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withSubscriptionMetadata(new HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+}
+})
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1)))
+  

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-03 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1589336191


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10822,1146 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testJoiningConsumerGroupWithNewDynamicMember() throws 
Exception {
+String groupId = "group-id";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new 
MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 2)
+.addTopic(barTopicId, barTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withSubscriptionMetadata(new HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+}
+})
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1)))
+  

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1589059959


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10822,1146 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testJoiningConsumerGroupWithNewDynamicMember() throws 
Exception {
+String groupId = "group-id";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new 
MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 2)
+.addTopic(barTopicId, barTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withSubscriptionMetadata(new HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+}
+})
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1)))
+   

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1589061058


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10822,1146 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testJoiningConsumerGroupWithNewDynamicMember() throws 
Exception {
+String groupId = "group-id";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new 
MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 2)
+.addTopic(barTopicId, barTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withSubscriptionMetadata(new HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+}
+})
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1)))
+   

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-02 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1587740243


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10823,1158 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testJoiningConsumerGroupWithNewDynamicMember(boolean 
replaySuccessfully) throws Exception {
+String groupId = "group-id";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new 
MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 2)
+.addTopic(barTopicId, barTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withSubscriptionMetadata(new HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+}
+})
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build())
+.

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-02 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1587735026


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10823,1158 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testJoiningConsumerGroupWithNewDynamicMember(boolean 
replaySuccessfully) throws Exception {
+String groupId = "group-id";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new 
MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 2)
+.addTopic(barTopicId, barTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withSubscriptionMetadata(new HashMap() {
+{
+put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+}
+})
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build())
+.

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-02 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1587732479


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10823,1158 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void 
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testJoiningConsumerGroupWithNewDynamicMember(boolean 
replaySuccessfully) throws Exception {

Review Comment:
   Not any specific reason converting all of them. I was just feeling a bit 
strange only convert only one because I couldn't decide which to choose haha. I 
agree there's no need for all of them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-30 Thread via GitHub


dajac commented on PR #15798:
URL: https://github.com/apache/kafka/pull/15798#issuecomment-2085866313

   @dongnuo123 Be aware of https://github.com/apache/kafka/pull/15785. The PR 
changes code that you have refactored or reused in this one. We will need to 
adapt when we merge it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-29 Thread via GitHub


dongnuo123 commented on PR #15798:
URL: https://github.com/apache/kafka/pull/15798#issuecomment-2083638319

   TODO: This patch is waiting on https://github.com/apache/kafka/pull/15818 to 
make sure the response future is not completed until the records are 
successfully replayed and persisted


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-29 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1583740908


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1169,6 +1173,64 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
 }
 }
 
+/**
+ * Validates if the received classic member protocols are supported by the 
group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberId  The joining member id.
+ * @param protocolType  The joining member protocol type.
+ * @param protocols The joining member protocol collection.
+ */
+private void throwIfClassicProtocolIsNotSupported(
+ConsumerGroup group,
+String memberId,
+String protocolType,
+JoinGroupRequestProtocolCollection protocols
+) {
+if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {
+throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + 
memberId + "'s protocols are not supported.");
+}
+}
+
+/**
+ * Deserialize the subscription in JoinGroupRequestProtocolCollection.
+ * All the protocols have the same subscription, so the method picks a 
random one.
+ *
+ * @param protocols The JoinGroupRequestProtocolCollection.
+ * @return The Subscription.
+ */
+private static ConsumerPartitionAssignor.Subscription 
deserializeSubscription(
+JoinGroupRequestProtocolCollection protocols
+) {
+try {
+return ConsumerProtocol.deserializeSubscription(

Review Comment:
   Yeah it makes sense. I can open another pr once this one is merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-29 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1583691897


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10855,544 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception {
+int minSessionTimeout = 50;
+int maxSessionTimeout = 100;
+String groupId = "group-id";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withClassicGroupMinSessionTimeoutMs(minSessionTimeout)
+.withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout)
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
+.build();
+
+JoinGroupRequestData requestWithSmallSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withSessionTimeoutMs(minSessionTimeout - 1)
+.build();
+assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithSmallSessionTimeout));
+
+JoinGroupRequestData requestWithLargeSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withSessionTimeoutMs(maxSessionTimeout + 1)
+.build();
+assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithLargeSessionTimeout));
+}
+
+@Test
+public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported() 
{
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testConsumerGroupJoinWithNewDynamicMember() throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+  

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-29 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1583691897


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10855,544 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception {
+int minSessionTimeout = 50;
+int maxSessionTimeout = 100;
+String groupId = "group-id";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withClassicGroupMinSessionTimeoutMs(minSessionTimeout)
+.withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout)
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
+.build();
+
+JoinGroupRequestData requestWithSmallSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withSessionTimeoutMs(minSessionTimeout - 1)
+.build();
+assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithSmallSessionTimeout));
+
+JoinGroupRequestData requestWithLargeSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withSessionTimeoutMs(maxSessionTimeout + 1)
+.build();
+assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithLargeSessionTimeout));
+}
+
+@Test
+public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported() 
{
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testConsumerGroupJoinWithNewDynamicMember() throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+  

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-29 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1583634669


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1288,25 +1353,15 @@ private 
CoordinatorResult consumerGr
 
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
 .setClientId(clientId)
 .setClientHost(clientHost)
+.setClassicMemberMetadata(null)

Review Comment:
   Our default value is already null. This is necessary as 
`updatedMemberBuilder = new ConsumerGroupMember.Builder(member)` can have 
non-null classicMemberMetadata and we need it to update the members that have 
changed from the classic protocol to the consumer protocol.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-28 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1582404976


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10855,544 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception {
+int minSessionTimeout = 50;
+int maxSessionTimeout = 100;
+String groupId = "group-id";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withClassicGroupMinSessionTimeoutMs(minSessionTimeout)
+.withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout)
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
+.build();
+
+JoinGroupRequestData requestWithSmallSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withSessionTimeoutMs(minSessionTimeout - 1)
+.build();
+assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithSmallSessionTimeout));
+
+JoinGroupRequestData requestWithLargeSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withSessionTimeoutMs(maxSessionTimeout + 1)
+.build();
+assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithLargeSessionTimeout));
+}
+
+@Test
+public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported() 
{
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testConsumerGroupJoinWithNewDynamicMember() throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+  

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-28 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1582403702


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult classicGroupJoinToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+throwIfConsumerGroupIsFull(group, memberId);
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+// TODO: need to throw an exception if group is dead?

Review Comment:
   Do we ever transition a consumer group to DEAD? It's always confusing when 
it comes to type check in the new group coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-28 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1582397694


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1169,6 +1175,107 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
 }
 }
 
+/**
+ * Validates if the received classic member protocols are supported by the 
group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberId  The joining member id.
+ * @param protocolType  The joining member protocol type.
+ * @param protocols The joining member protocol collection.
+ */
+private void throwIfClassicProtocolIsNotSupported(
+ConsumerGroup group,
+String memberId,
+String protocolType,
+JoinGroupRequestProtocolCollection protocols
+) {
+if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {

Review Comment:
   Yeah I was also thinking about this. The last commit just made a change in 
`supportsClassicProtocols` which requires the protocolType to be 
`ConsumerProtocol.PROTOCOL_TYPE` for both empty and non-empty groups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-28 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1582397540


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult classicGroupJoinToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+throwIfConsumerGroupIsFull(group, memberId);
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+// A dynamic member (re-)joins.
+throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, 
context);
+newMemberCreated = !group.hasMember(memberId);
+member = group.getOrMaybeCreateMember(memberId, true);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+removeMember(records, groupId, member.memberId());
+log.info("[GroupId {}] Static member with unknown member 
id and instance id {} re-joins the consumer group. " +
+"Created a new member {} to replace the existing 
member {}.", groupId, instanceId, memberId, member.memberId());
+}
+} else {
+// Rejoining static member. Fence the static group with 
unmatched member id.
+throwIfStaticMemberIsUnknown(member, instanceId);
+throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+}
+}
+
+int groupEpoch = group.groupEpoch();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+final List 
ownedTopicPartitions =
+validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+// 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+// record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+// changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+// record to the __consumer_offsets partition. Finally, the group 
epoch is

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-28 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1582397180


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1122,4 +1122,27 @@ private  OUT handleOperationException(
 return handler.apply(apiError.error(), apiError.message());
 }
 }
+
+/**
+ * Creates the JoinGroupResponseData according to the error type.
+ *
+ * @param memberId  The member id.
+ * @param error The error.
+ * @return The JoinGroupResponseData.
+ */
+private static JoinGroupResponseData createJoinGroupResponseData(
+String memberId,
+Errors error
+) {
+switch (error) {
+case MEMBER_ID_REQUIRED:
+case INVALID_SESSION_TIMEOUT:
+return new JoinGroupResponseData()
+.setMemberId(memberId)

Review Comment:
   Seems that `INVALID_GROUP_ID` also sets the member id. I don't think we use 
the member id here either. Should we remove them?
   
https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java#L325



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-26 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1581026465


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1122,4 +1122,27 @@ private  OUT handleOperationException(
 return handler.apply(apiError.error(), apiError.message());
 }
 }
+
+/**
+ * Creates the JoinGroupResponseData according to the error type.
+ *
+ * @param memberId  The member id.
+ * @param error The error.
+ * @return The JoinGroupResponseData.
+ */
+private static JoinGroupResponseData createJoinGroupResponseData(
+String memberId,
+Errors error
+) {
+switch (error) {
+case MEMBER_ID_REQUIRED:

Review Comment:
   Ah yeah you're right. Let's keep it as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-26 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580929744


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult classicGroupJoinToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+throwIfConsumerGroupIsFull(group, memberId);
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+// A dynamic member (re-)joins.
+throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, 
context);
+newMemberCreated = !group.hasMember(memberId);
+member = group.getOrMaybeCreateMember(memberId, true);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+removeMember(records, groupId, member.memberId());
+log.info("[GroupId {}] Static member with unknown member 
id and instance id {} re-joins the consumer group. " +
+"Created a new member {} to replace the existing 
member {}.", groupId, instanceId, memberId, member.memberId());
+}
+} else {
+// Rejoining static member. Fence the static group with 
unmatched member id.
+throwIfStaticMemberIsUnknown(member, instanceId);
+throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+}
+}
+
+int groupEpoch = group.groupEpoch();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+final List 
ownedTopicPartitions =
+validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+// 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+// record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+// changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+// record to the __consumer_offsets partition. Finally, the group 
epoch is bump

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-26 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580920772


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1288,25 +1396,10 @@ private 
CoordinatorResult consumerGr
 
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
 .setClientId(clientId)
 .setClientHost(clientHost)
+.setClassicMemberMetadata(null)
 .build();
 
-boolean bumpGroupEpoch = false;
-if (!updatedMember.equals(member)) {
-records.add(newMemberSubscriptionRecord(groupId, updatedMember));
-
-if 
(!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
-log.info("[GroupId {}] Member {} updated its subscribed topics 
to: {}.",
-groupId, memberId, updatedMember.subscribedTopicNames());
-bumpGroupEpoch = true;
-}
-
-if 
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
-log.info("[GroupId {}] Member {} updated its subscribed regex 
to: {}.",
-groupId, memberId, updatedMember.subscribedTopicRegex());
-bumpGroupEpoch = true;
-}
-}
-
+boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, 
member, updatedMember, records);

Review Comment:
   nit: The name does not really represent the intent here. How about 
`hasMemberChanged` or something along those lines?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1122,4 +1122,27 @@ private  OUT handleOperationException(
 return handler.apply(apiError.error(), apiError.message());
 }
 }
+
+/**
+ * Creates the JoinGroupResponseData according to the error type.
+ *
+ * @param memberId  The member id.
+ * @param error The error.
+ * @return The JoinGroupResponseData.
+ */
+private static JoinGroupResponseData createJoinGroupResponseData(
+String memberId,
+Errors error
+) {
+switch (error) {
+case MEMBER_ID_REQUIRED:
+case INVALID_SESSION_TIMEOUT:
+return new JoinGroupResponseData()
+.setMemberId(memberId)

Review Comment:
   I actually wonder why we set the member id in the `INVALID_SESSION_TIMEOUT` 
case. Looking at the client code, we don't use it in the java client.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1092,6 +1097,86 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
 }
 }
 
+/**
+ * Validates if the received classic member protocols are supported by the 
group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberId  The joining member id.
+ * @param protocolType  The joining member protocol type.
+ * @param protocols The joining member protocol collection.
+ */
+private void throwIfClassicProtocolIsNotSupported(
+ConsumerGroup group,
+String memberId,
+String protocolType,
+JoinGroupRequestProtocolCollection protocols
+) {
+if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {
+throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + 
memberId + "'s protocols are not supported.");
+}
+}
+
+/**
+ * Deserialize the subscription in JoinGroupRequestProtocolCollection.
+ * All the protocols have the same subscription, so the method picks a 
random one.
+ *
+ * @param protocols The JoinGroupRequestProtocolCollection.
+ * @return The Subscription.
+ */
+private static ConsumerPartitionAssignor.Subscription 
deserializeSubscription(
+JoinGroupRequestProtocolCollection protocols
+) {
+try {
+return ConsumerProtocol.deserializeSubscription(
+ByteBuffer.wrap(protocols.stream().findAny().get().metadata())
+);
+} catch (SchemaException e) {
+throw new IllegalStateException("Malformed embedded consumer 
protocol.");
+}
+}
+
+/**
+ * Validates the generation id and returns the owned partitions in the 
JoinGroupRequest to a consumer group.
+ *
+ * @param memberThe joining member.
+ * @param subscription  The Subscription.
+ * @return The owned partitions if valid, otherwise return null.
+ */
+private List 
validateGenerationIdAndGetOwnedPartition(
+ConsumerGroupMember member,
+ConsumerPartitionAssignor.Subscription subscription
+) {
+List 
ownedPartitions =
+toTopicPartitions(subscription.ownedPartitions(), 
metadataImage.topics

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580243392


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1092,6 +1097,86 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
 }
 }
 
+/**
+ * Validates if the received classic member protocols are supported by the 
group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberId  The joining member id.
+ * @param protocolType  The joining member protocol type.
+ * @param protocols The joining member protocol collection.
+ */
+private void throwIfClassicProtocolIsNotSupported(
+ConsumerGroup group,
+String memberId,
+String protocolType,
+JoinGroupRequestProtocolCollection protocols
+) {
+if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {
+throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + 
memberId + "'s protocols are not supported.");
+}
+}
+
+/**
+ * Deserialize the subscription in JoinGroupRequestProtocolCollection.
+ * All the protocols have the same subscription, so the method picks a 
random one.
+ *
+ * @param protocols The JoinGroupRequestProtocolCollection.
+ * @return The Subscription.
+ */
+private static ConsumerPartitionAssignor.Subscription 
deserializeSubscription(
+JoinGroupRequestProtocolCollection protocols
+) {
+try {
+return ConsumerProtocol.deserializeSubscription(
+ByteBuffer.wrap(protocols.stream().findAny().get().metadata())
+);
+} catch (SchemaException e) {
+throw new IllegalStateException("Malformed embedded consumer 
protocol.");
+}
+}
+
+/**
+ * Validates the generation id and returns the owned partitions in the 
JoinGroupRequest to a consumer group.
+ *
+ * @param memberThe joining member.
+ * @param subscription  The Subscription.
+ * @return The owned partitions if valid, otherwise return null.
+ */
+private List 
validateGenerationIdAndGetOwnedPartition(
+ConsumerGroupMember member,
+ConsumerPartitionAssignor.Subscription subscription
+) {
+List 
ownedPartitions =
+toTopicPartitions(subscription.ownedPartitions(), 
metadataImage.topics());
+if (subscription.generationId().isPresent() && 
subscription.generationId().get() == member.memberEpoch()) {
+return ownedPartitions;
+} else {
+// If the generation id is not provided or doesn't match the 
member epoch, it's still safe to
+// accept the ownedPartitions that is a subset of the assigned 
partition. Otherwise, set the
+// ownedPartition to be null. When a new assignment is provided, 
the consumer will stop fetching
+// from and revoke the partitions it does not own.
+if (isSubset(ownedPartitions, member.assignedPartitions())) {
+return ownedPartitions;
+} else {
+return null;
+}
+}
+}
+
+/**
+ * @return The ConsumerGroupHeartbeatRequestData.TopicPartitions list 
converted from the TopicPartitions list.
+ */
+private static List 
toTopicPartitions(
+List partitions,
+TopicsImage topicsImage
+) {
+return ConsumerGroup.topicPartitionMapFromList(partitions, 
topicsImage).entrySet().stream().map(

Review Comment:
   I thought we need to sort the partitions by topic anyways, so I don't quite 
get how to combine(?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580240603


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+}
+} else {
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+}
+} else {
+// Rejoining static member. Fence the sta

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580240603


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+}
+} else {
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+}
+} else {
+// Rejoining static member. Fence the sta

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580240281


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1092,6 +1097,86 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
 }
 }
 
+/**
+ * Validates if the received classic member protocols are supported by the 
group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberId  The joining member id.
+ * @param protocolType  The joining member protocol type.
+ * @param protocols The joining member protocol collection.
+ */
+private void throwIfClassicProtocolIsNotSupported(
+ConsumerGroup group,
+String memberId,
+String protocolType,
+JoinGroupRequestProtocolCollection protocols
+) {
+if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {
+throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + 
memberId + "'s protocols are not supported.");
+}
+}
+
+/**
+ * Deserialize the subscription in JoinGroupRequestProtocolCollection.
+ * All the protocols have the same subscription, so the method picks a 
random one.
+ *
+ * @param protocols The JoinGroupRequestProtocolCollection.
+ * @return The Subscription.
+ */
+private static ConsumerPartitionAssignor.Subscription 
deserializeSubscription(
+JoinGroupRequestProtocolCollection protocols
+) {
+try {
+return ConsumerProtocol.deserializeSubscription(
+ByteBuffer.wrap(protocols.stream().findAny().get().metadata())
+);
+} catch (SchemaException e) {
+throw new IllegalStateException("Malformed embedded consumer 
protocol.");
+}
+}
+
+/**
+ * Validates the generation id and returns the owned partitions in the 
JoinGroupRequest to a consumer group.
+ *
+ * @param memberThe joining member.
+ * @param subscription  The Subscription.
+ * @return The owned partitions if valid, otherwise return null.
+ */
+private List 
validateGenerationIdAndGetOwnedPartition(

Review Comment:
   I was wondering if we can put off checking generation id to sync group and 
throw ILLEGAL_GENERATION there. The client will rejoin in such situation. 
That's what we did in the classic protocol.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+ 

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580210319


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+}
+} else {
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+}
+} else {
+// Rejoining static member. Fence the sta

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580203100


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+}
+} else {
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+}
+} else {
+// Rejoining static member. Fence the sta

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580183434


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+}
+} else {
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());

Review Comment:
   Yeah I think it makes sense. We can remove it without canceling the timers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to t

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580183181


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+}
+} else {
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.

Review Comment:
   If a static member joins without a member id, we replace any existing member 
with the same instance id; 
   if the static member joins with a member id, then we treat it as a normal 
rejoin if the static member exists and the member id matches, and throw an 
exception otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580182980


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);

Review Comment:
   I think we don't need to for new members, but it doesn't hurt to also do the 
check since we always accept the empty ownedPartitions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-25 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1579345622


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+List 
ownedTopicPartitions = null;
+
+if (!validateClassicGroupSessionTimeout(memberId, 
request.sessionTimeoutMs(), responseFuture)) {
+return EMPTY_RESULT;
+}
+throwIfConsumerGroupIsFull(group, memberId);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+// A dynamic member (re-)joins.
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+newMemberCreated = !group.hasMember(memberId);
+
+member = group.getOrMaybeCreateMember(memberId, true);
+if (!newMemberCreated) ownedTopicPartitions = 
validateGenerationIdAndGetOwnedPartition(member, subscription);

Review Comment:
   What's the reason for doing this only if the member is not new?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1336,6 +1422,233 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult consumerGroupJoin(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);

Review Comment:
   I think that we should rather do this after the request/group/member 
validations.



##
group-coordinator/src/main/java/org/apache/kafka/coordin