Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]
dajac merged PR #15546: URL: https://github.com/apache/kafka/pull/15546 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]
dajac commented on code in PR #15546: URL: https://github.com/apache/kafka/pull/15546#discussion_r1530443218 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -619,6 +622,22 @@ ConsumerGroup getOrMaybeCreateConsumerGroup( } } +/** + * Gets a consumer group. + * + * @param groupId The group id. + * + * @return A ConsumerGroup. + * @throws GroupIdNotFoundException if the group does not exist or the group is not a consumer group. + * + * Package private for testing. + */ +ConsumerGroup getConsumerGroup( Review Comment: I just noticed that we also have `consumerGroup(String, long)` as a method. I wonder whether we should also name this one `consumerGroup` too and call the one that I mentioned. It would also make sense to regroup them. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]
dongnuo123 commented on code in PR #15546: URL: https://github.com/apache/kafka/pull/15546#discussion_r1530392822 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -598,15 +600,16 @@ public List describeGroups( */ ConsumerGroup getOrMaybeCreateConsumerGroup( String groupId, -boolean createIfNotExists +boolean createIfNotExists, +List records ) throws GroupIdNotFoundException { Group group = groups.get(groupId); if (group == null && !createIfNotExists) { throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId)); } -if (group == null) { +if (group == null || maybeDeleteEmptyClassicGroup(group, records)) { Review Comment: Oh thanks for the catch. Yeah it's necessary when the group is an empty classic group -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]
dongnuo123 commented on code in PR #15546: URL: https://github.com/apache/kafka/pull/15546#discussion_r1530392822 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -598,15 +600,16 @@ public List describeGroups( */ ConsumerGroup getOrMaybeCreateConsumerGroup( String groupId, -boolean createIfNotExists +boolean createIfNotExists, +List records ) throws GroupIdNotFoundException { Group group = groups.get(groupId); if (group == null && !createIfNotExists) { throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId)); } -if (group == null) { +if (group == null || maybeDeleteEmptyClassicGroup(group, records)) { Review Comment: Oh thanks for the catch. Yeah it's necessary when the group is a empty classic group -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]
dajac commented on code in PR #15546: URL: https://github.com/apache/kafka/pull/15546#discussion_r1529919197 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -598,15 +600,16 @@ public List describeGroups( */ ConsumerGroup getOrMaybeCreateConsumerGroup( String groupId, -boolean createIfNotExists +boolean createIfNotExists, +List records ) throws GroupIdNotFoundException { Group group = groups.get(groupId); if (group == null && !createIfNotExists) { throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId)); } -if (group == null) { +if (group == null || maybeDeleteEmptyClassicGroup(group, records)) { Review Comment: Should we only consider deleting the previous empty group only if `createIfNotExists` is `true`? It is perhaps not necessary. What do you think? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -619,6 +622,16 @@ ConsumerGroup getOrMaybeCreateConsumerGroup( } } +/** + * An overloaded method of {@link GroupMetadataManager#getOrMaybeCreateConsumerGroup(String, boolean, List)} + */ +ConsumerGroup getOrMaybeCreateConsumerGroup( Review Comment: It looks like that this one is always used with `createIfNotExists` set to `false`. Should we remove the flag and rename the method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]
dongnuo123 commented on code in PR #15546: URL: https://github.com/apache/kafka/pull/15546#discussion_r1529010875 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -9298,6 +9298,120 @@ public void testOnConsumerGroupStateTransitionOnLoading() { verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); } +@Test +public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() { +String classicGroupId = "classic-group-id"; +String memberId = Uuid.randomUuid().toString(); +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.build(); +ClassicGroup classicGroup = new ClassicGroup( +new LogContext(), +classicGroupId, +EMPTY, +context.time, +context.metrics +); +context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); +assertThrows(GroupIdNotFoundException.class, () -> +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(classicGroupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setServerAssignor("range") +.setRebalanceTimeoutMs(5000) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setTopicPartitions(Collections.emptyList(; +} + +@Test +public void testConsumerGroupHeartbeatWithEmptyClassicGroup() { +String classicGroupId = "classic-group-id"; +String memberId = Uuid.randomUuid().toString(); +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.build(); +ClassicGroup classicGroup = new ClassicGroup( +new LogContext(), +classicGroupId, +EMPTY, +context.time, +context.metrics +); +context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + +CoordinatorResult result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(classicGroupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setServerAssignor("range") +.setRebalanceTimeoutMs(5000) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setTopicPartitions(Collections.emptyList())); + +assertEquals(0, result.response().errorCode()); + assertEquals(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId), result.records().get(0)); +assertEquals(Group.GroupType.CONSUMER, + context.groupMetadataManager.getOrMaybeCreateConsumerGroup(classicGroupId, false).type()); +} + +@Test +public void testClassicGroupJoinWithNonEmptyConsumerGroup() throws Exception { +String consumerGroupId = "consumer-group-id"; +String memberId = Uuid.randomUuid().toString(); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setState(MemberState.STABLE) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.build())) +.build(); + +JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(consumerGroupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); + +GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); +assertEquals(Errors.GROUP_ID_NOT_FOUND.code(), joinResult.joinFuture.get().errorCode()); +} + +@Test +public void testClassicGroupJoinWithEmptyConsumerGroup() throws Exception { +String consumerGroupId = "consumer-group-id"; +GroupMetadat
Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]
dajac commented on code in PR #15546: URL: https://github.com/apache/kafka/pull/15546#discussion_r1527999364 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1056,7 +1056,12 @@ private CoordinatorResult consumerGr // Get or create the consumer group. boolean createIfNotExists = memberEpoch == 0; -final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, createIfNotExists); +ConsumerGroup group; +if (maybeDeleteEmptyClassicGroup(groupId, records)) { +group = new ConsumerGroup(snapshotRegistry, groupId, metrics); +} else { +group = getOrMaybeCreateConsumerGroup(groupId, createIfNotExists); +} Review Comment: The method is already large so I wonder if we could push this logic into `getOrMaybeCreateConsumerGroup` and we should actually do this only if `createIfNotExists` is `true`. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -9298,6 +9298,120 @@ public void testOnConsumerGroupStateTransitionOnLoading() { verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); } +@Test +public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() { +String classicGroupId = "classic-group-id"; +String memberId = Uuid.randomUuid().toString(); +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.build(); +ClassicGroup classicGroup = new ClassicGroup( +new LogContext(), +classicGroupId, +EMPTY, +context.time, +context.metrics +); +context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); +assertThrows(GroupIdNotFoundException.class, () -> +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(classicGroupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setServerAssignor("range") +.setRebalanceTimeoutMs(5000) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setTopicPartitions(Collections.emptyList(; +} + +@Test +public void testConsumerGroupHeartbeatWithEmptyClassicGroup() { +String classicGroupId = "classic-group-id"; +String memberId = Uuid.randomUuid().toString(); +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.build(); +ClassicGroup classicGroup = new ClassicGroup( +new LogContext(), +classicGroupId, +EMPTY, +context.time, +context.metrics +); +context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + +CoordinatorResult result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(classicGroupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setServerAssignor("range") +.setRebalanceTimeoutMs(5000) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setTopicPartitions(Collections.emptyList())); + +assertEquals(0, result.response().errorCode()); + assertEquals(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId), result.records().get(0)); +assertEquals(Group.GroupType.CONSUMER, + context.groupMetadataManager.getOrMaybeCreateConsumerGroup(classicGroupId, false).type()); +} + +@Test +public void testClassicGroupJoinWithNonEmptyConsumerGroup() throws Exception { +String consumerGroupId = "consumer-group-id"; +String memberId = Uuid.randomUuid().toString(); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10) +.withMember(new ConsumerGroupMember.
[PR] KAFKA-16313: offline group protocol migration (reopened) [kafka]
dongnuo123 opened a new pull request, #15546: URL: https://github.com/apache/kafka/pull/15546 This patch enables the empty classic groups to upgrade to classic groups and empty consumer groups to upgrade to consumer groups. jira: https://issues.apache.org/jira/browse/KAFKA-16313 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org