Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]

2024-03-20 Thread via GitHub


dajac merged PR #15546:
URL: https://github.com/apache/kafka/pull/15546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]

2024-03-19 Thread via GitHub


dajac commented on code in PR #15546:
URL: https://github.com/apache/kafka/pull/15546#discussion_r1530443218


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -619,6 +622,22 @@ ConsumerGroup getOrMaybeCreateConsumerGroup(
 }
 }
 
+/**
+ * Gets a consumer group.
+ *
+ * @param groupId   The group id.
+ *
+ * @return A ConsumerGroup.
+ * @throws GroupIdNotFoundException if the group does not exist or the 
group is not a consumer group.
+ *
+ * Package private for testing.
+ */
+ConsumerGroup getConsumerGroup(

Review Comment:
   I just noticed that we also have `consumerGroup(String, long)` as a method. 
I wonder whether we should also name this one `consumerGroup` too and call the 
one that I mentioned. It would also make sense to regroup them. What do you 
think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]

2024-03-19 Thread via GitHub


dongnuo123 commented on code in PR #15546:
URL: https://github.com/apache/kafka/pull/15546#discussion_r1530392822


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -598,15 +600,16 @@ public List 
describeGroups(
  */
 ConsumerGroup getOrMaybeCreateConsumerGroup(
 String groupId,
-boolean createIfNotExists
+boolean createIfNotExists,
+List records
 ) throws GroupIdNotFoundException {
 Group group = groups.get(groupId);
 
 if (group == null && !createIfNotExists) {
 throw new GroupIdNotFoundException(String.format("Consumer group 
%s not found.", groupId));
 }
 
-if (group == null) {
+if (group == null || maybeDeleteEmptyClassicGroup(group, records)) {

Review Comment:
   Oh thanks for the catch. Yeah it's necessary when the group is an empty 
classic group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]

2024-03-19 Thread via GitHub


dongnuo123 commented on code in PR #15546:
URL: https://github.com/apache/kafka/pull/15546#discussion_r1530392822


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -598,15 +600,16 @@ public List 
describeGroups(
  */
 ConsumerGroup getOrMaybeCreateConsumerGroup(
 String groupId,
-boolean createIfNotExists
+boolean createIfNotExists,
+List records
 ) throws GroupIdNotFoundException {
 Group group = groups.get(groupId);
 
 if (group == null && !createIfNotExists) {
 throw new GroupIdNotFoundException(String.format("Consumer group 
%s not found.", groupId));
 }
 
-if (group == null) {
+if (group == null || maybeDeleteEmptyClassicGroup(group, records)) {

Review Comment:
   Oh thanks for the catch. Yeah it's necessary when the group is a empty 
classic group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]

2024-03-19 Thread via GitHub


dajac commented on code in PR #15546:
URL: https://github.com/apache/kafka/pull/15546#discussion_r1529919197


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -598,15 +600,16 @@ public List 
describeGroups(
  */
 ConsumerGroup getOrMaybeCreateConsumerGroup(
 String groupId,
-boolean createIfNotExists
+boolean createIfNotExists,
+List records
 ) throws GroupIdNotFoundException {
 Group group = groups.get(groupId);
 
 if (group == null && !createIfNotExists) {
 throw new GroupIdNotFoundException(String.format("Consumer group 
%s not found.", groupId));
 }
 
-if (group == null) {
+if (group == null || maybeDeleteEmptyClassicGroup(group, records)) {

Review Comment:
   Should we only consider deleting the previous empty group only if 
`createIfNotExists` is `true`? It is perhaps not necessary. What do you think?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -619,6 +622,16 @@ ConsumerGroup getOrMaybeCreateConsumerGroup(
 }
 }
 
+/**
+ * An overloaded method of {@link 
GroupMetadataManager#getOrMaybeCreateConsumerGroup(String, boolean, 
List)}
+ */
+ConsumerGroup getOrMaybeCreateConsumerGroup(

Review Comment:
   It looks like that this one is always used with `createIfNotExists` set to 
`false`. Should we remove the flag and rename the method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]

2024-03-18 Thread via GitHub


dongnuo123 commented on code in PR #15546:
URL: https://github.com/apache/kafka/pull/15546#discussion_r1529010875


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9298,6 +9298,120 @@ public void 
testOnConsumerGroupStateTransitionOnLoading() {
 verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
 }
 
+@Test
+public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() {
+String classicGroupId = "classic-group-id";
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+assertThrows(GroupIdNotFoundException.class, () ->
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(classicGroupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setServerAssignor("range")
+.setRebalanceTimeoutMs(5000)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setTopicPartitions(Collections.emptyList(;
+}
+
+@Test
+public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+String classicGroupId = "classic-group-id";
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+
+CoordinatorResult result = 
context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(classicGroupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setServerAssignor("range")
+.setRebalanceTimeoutMs(5000)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setTopicPartitions(Collections.emptyList()));
+
+assertEquals(0, result.response().errorCode());
+
assertEquals(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId), 
result.records().get(0));
+assertEquals(Group.GroupType.CONSUMER,
+
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(classicGroupId, 
false).type());
+}
+
+@Test
+public void testClassicGroupJoinWithNonEmptyConsumerGroup() throws 
Exception {
+String consumerGroupId = "consumer-group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(consumerGroupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request);
+assertEquals(Errors.GROUP_ID_NOT_FOUND.code(), 
joinResult.joinFuture.get().errorCode());
+}
+
+@Test
+public void testClassicGroupJoinWithEmptyConsumerGroup() throws Exception {
+String consumerGroupId = "consumer-group-id";
+GroupMetadat

Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]

2024-03-18 Thread via GitHub


dajac commented on code in PR #15546:
URL: https://github.com/apache/kafka/pull/15546#discussion_r1527999364


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1056,7 +1056,12 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the consumer group.
 boolean createIfNotExists = memberEpoch == 0;
-final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
createIfNotExists);
+ConsumerGroup group;
+if (maybeDeleteEmptyClassicGroup(groupId, records)) {
+group = new ConsumerGroup(snapshotRegistry, groupId, metrics);
+} else {
+group = getOrMaybeCreateConsumerGroup(groupId, createIfNotExists);
+}

Review Comment:
   The method is already large so I wonder if we could push this logic into 
`getOrMaybeCreateConsumerGroup` and we should actually do this only if 
`createIfNotExists` is `true`.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9298,6 +9298,120 @@ public void 
testOnConsumerGroupStateTransitionOnLoading() {
 verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
 }
 
+@Test
+public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() {
+String classicGroupId = "classic-group-id";
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+assertThrows(GroupIdNotFoundException.class, () ->
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(classicGroupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setServerAssignor("range")
+.setRebalanceTimeoutMs(5000)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setTopicPartitions(Collections.emptyList(;
+}
+
+@Test
+public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+String classicGroupId = "classic-group-id";
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+
+CoordinatorResult result = 
context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(classicGroupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setServerAssignor("range")
+.setRebalanceTimeoutMs(5000)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setTopicPartitions(Collections.emptyList()));
+
+assertEquals(0, result.response().errorCode());
+
assertEquals(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId), 
result.records().get(0));
+assertEquals(Group.GroupType.CONSUMER,
+
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(classicGroupId, 
false).type());
+}
+
+@Test
+public void testClassicGroupJoinWithNonEmptyConsumerGroup() throws 
Exception {
+String consumerGroupId = "consumer-group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)
+.withMember(new ConsumerGroupMember.

[PR] KAFKA-16313: offline group protocol migration (reopened) [kafka]

2024-03-15 Thread via GitHub


dongnuo123 opened a new pull request, #15546:
URL: https://github.com/apache/kafka/pull/15546

   This patch enables the empty classic groups to upgrade to classic groups and 
empty consumer groups to upgrade to consumer groups.
   
   jira: https://issues.apache.org/jira/browse/KAFKA-16313
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org