This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 3e3c618bdc9 KAFKA-16313: Offline group protocol migration (#15546) 3e3c618bdc9 is described below commit 3e3c618bdc90ea225632a6aeecb0e65b5ac8294f Author: Dongnuo Lyu <139248811+dongnuo...@users.noreply.github.com> AuthorDate: Wed Mar 20 03:49:11 2024 -0400 KAFKA-16313: Offline group protocol migration (#15546) This patch enables an empty classic group to be automatically converted to a new consumer group and vice versa. Reviewers: David Jacot <dja...@confluent.io> --- .../coordinator/group/GroupCoordinatorShard.java | 2 +- .../coordinator/group/GroupMetadataManager.java | 147 ++++++++++++++----- .../group/GroupCoordinatorShardTest.java | 8 +- .../group/GroupMetadataManagerTest.java | 156 +++++++++++++++++++-- .../group/GroupMetadataManagerTestContext.java | 4 +- 5 files changed, 266 insertions(+), 51 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index b55dd91cc1c..12c194c331b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -388,7 +388,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> { try { groupMetadataManager.validateDeleteGroup(groupId); numDeletedOffsets += offsetMetadataManager.deleteAllOffsets(groupId, records); - groupMetadataManager.deleteGroup(groupId, records); + groupMetadataManager.createGroupTombstoneRecords(groupId, records); deletedGroups.add(groupId); resultCollection.add( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 0a789fa9630..9068ad17efc 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -588,7 +588,9 @@ public class GroupMetadataManager { * * @param groupId The group id. * @param createIfNotExists A boolean indicating whether the group should be - * created if it does not exist. + * created if it does not exist or is an empty classic group. + * @param records The record list to which the group tombstones are written + * if the group is empty and is a classic group. * * @return A ConsumerGroup. * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or @@ -598,7 +600,8 @@ public class GroupMetadataManager { */ ConsumerGroup getOrMaybeCreateConsumerGroup( String groupId, - boolean createIfNotExists + boolean createIfNotExists, + List<Record> records ) throws GroupIdNotFoundException { Group group = groups.get(groupId); @@ -606,7 +609,7 @@ public class GroupMetadataManager { throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId)); } - if (group == null) { + if (group == null || (createIfNotExists && maybeDeleteEmptyClassicGroup(group, records))) { return new ConsumerGroup(snapshotRegistry, groupId, metrics); } else { if (group.type() == CONSUMER) { @@ -619,6 +622,40 @@ public class GroupMetadataManager { } } + /** + * Gets a consumer group by committed offset. + * + * @param groupId The group id. + * @param committedOffset A specified committed offset corresponding to this shard. + * + * @return A ConsumerGroup. + * @throws GroupIdNotFoundException if the group does not exist or is not a consumer group. + */ + public ConsumerGroup consumerGroup( + String groupId, + long committedOffset + ) throws GroupIdNotFoundException { + Group group = group(groupId, committedOffset); + + if (group.type() == CONSUMER) { + return (ConsumerGroup) group; + } else { + // We don't support upgrading/downgrading between protocols at the moment so + // we throw an exception if a group exists with the wrong type. + throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", + groupId)); + } + } + + /** + * An overloaded method of {@link GroupMetadataManager#consumerGroup(String, long)} + */ + ConsumerGroup consumerGroup( + String groupId + ) throws GroupIdNotFoundException { + return consumerGroup(groupId, Long.MAX_VALUE); + } + /** * The method should be called on the replay path. * Gets or maybe creates a consumer group and updates the groups map if a new group is created. @@ -723,31 +760,6 @@ public class GroupMetadataManager { } } - /** - * Gets a consumer group by committed offset. - * - * @param groupId The group id. - * @param committedOffset A specified committed offset corresponding to this shard. - * - * @return A ConsumerGroup. - * @throws GroupIdNotFoundException if the group does not exist or is not a consumer group. - */ - public ConsumerGroup consumerGroup( - String groupId, - long committedOffset - ) throws GroupIdNotFoundException { - Group group = group(groupId, committedOffset); - - if (group.type() == CONSUMER) { - return (ConsumerGroup) group; - } else { - // We don't support upgrading/downgrading between protocols at the moment so - // we throw an exception if a group exists with the wrong type. - throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", - groupId)); - } - } - /** * Removes the group. * @@ -1056,7 +1068,7 @@ public class GroupMetadataManager { // Get or create the consumer group. boolean createIfNotExists = memberEpoch == 0; - final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, createIfNotExists); + final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, createIfNotExists, records); throwIfConsumerGroupIsFull(group, memberId); // Get or create the member. @@ -1324,7 +1336,7 @@ public class GroupMetadataManager { String memberId, int memberEpoch ) throws ApiException { - ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); + ConsumerGroup group = consumerGroup(groupId); List<Record> records; if (instanceId == null) { ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); @@ -1449,7 +1461,7 @@ public class GroupMetadataManager { String key = consumerGroupSessionTimeoutKey(groupId, memberId); timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { try { - ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); + ConsumerGroup group = consumerGroup(groupId); ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); log.info("[GroupId {}] Member {} fenced from the group because its session expired.", groupId, memberId); @@ -1496,7 +1508,7 @@ public class GroupMetadataManager { String key = consumerGroupRebalanceTimeoutKey(groupId, memberId); timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { try { - ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); + ConsumerGroup group = consumerGroup(groupId); ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); if (member.memberEpoch() == memberEpoch) { @@ -2000,6 +2012,7 @@ public class GroupMetadataManager { CompletableFuture<JoinGroupResponseData> responseFuture ) { CoordinatorResult<Void, Record> result = EMPTY_RESULT; + List<Record> records = new ArrayList<>(); String groupId = request.groupId(); String memberId = request.memberId(); @@ -2017,6 +2030,7 @@ public class GroupMetadataManager { // Group is created if it does not exist and the member id is UNKNOWN. if member // is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND ClassicGroup group; + maybeDeleteEmptyConsumerGroup(groupId, records); boolean isNewGroup = !groups.containsKey(groupId); try { group = getOrMaybeCreateClassicGroup(groupId, isUnknownMember); @@ -2065,7 +2079,7 @@ public class GroupMetadataManager { } }); - List<Record> records = Collections.singletonList( + records.add( RecordHelpers.newEmptyGroupMetadataRecord(group, metadataImage.features().metadataVersion()) ); @@ -3547,12 +3561,25 @@ public class GroupMetadataManager { * @param groupId The id of the group to be deleted. It has been checked in {@link GroupMetadataManager#validateDeleteGroup}. * @param records The record list to populate. */ - public void deleteGroup( + public void createGroupTombstoneRecords( String groupId, List<Record> records ) { // At this point, we have already validated the group id, so we know that the group exists and that no exception will be thrown. - group(groupId).createGroupTombstoneRecords(records); + createGroupTombstoneRecords(group(groupId), records); + } + + /** + * Populates the record list passed in with record to update the state machine. + * + * @param group The group to be deleted. + * @param records The record list to populate. + */ + public void createGroupTombstoneRecords( + Group group, + List<Record> records + ) { + group.createGroupTombstoneRecords(records); } /** @@ -3574,7 +3601,55 @@ public class GroupMetadataManager { public void maybeDeleteGroup(String groupId, List<Record> records) { Group group = groups.get(groupId); if (group != null && group.isEmpty()) { - deleteGroup(groupId, records); + createGroupTombstoneRecords(groupId, records); + } + } + + /** + * @return true if the group is an empty classic group. + */ + private static boolean isEmptyClassicGroup(Group group) { + return group != null && group.type() == CLASSIC && group.isEmpty(); + } + + /** + * @return true if the group is an empty consumer group. + */ + private static boolean isEmptyConsumerGroup(Group group) { + return group != null && group.type() == CONSUMER && group.isEmpty(); + } + + /** + * Write tombstones for the group if it's empty and is a classic group. + * + * @param group The group to be deleted. + * @param records The list of records to delete the group. + * + * @return true if the group is empty + */ + private boolean maybeDeleteEmptyClassicGroup(Group group, List<Record> records) { + if (isEmptyClassicGroup(group)) { + // Delete the classic group by adding tombstones. + // There's no need to remove the group as the replay of tombstones removes it. + if (group != null) createGroupTombstoneRecords(group, records); + return true; + } + return false; + } + + /** + * Delete and write tombstones for the group if it's empty and is a consumer group. + * + * @param groupId The group id to be deleted. + * @param records The list of records to delete the group. + */ + private void maybeDeleteEmptyConsumerGroup(String groupId, List<Record> records) { + Group group = groups.get(groupId, Long.MAX_VALUE); + if (isEmptyConsumerGroup(group)) { + // Add tombstones for the previous consumer group. The tombstones won't actually be + // replayed because its coordinator result has a non-null appendFuture. + createGroupTombstoneRecords(group, records); + removeGroup(groupId); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 1e6930a8029..59868f36f10 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -225,14 +225,14 @@ public class GroupCoordinatorShardTest { List<Record> records = invocation.getArgument(1); records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId)); return null; - }).when(groupMetadataManager).deleteGroup(anyString(), anyList()); + }).when(groupMetadataManager).createGroupTombstoneRecords(anyString(), anyList()); CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> coordinatorResult = coordinator.deleteGroups(context, groupIds); for (String groupId : groupIds) { verify(groupMetadataManager, times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId)); - verify(groupMetadataManager, times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList()); + verify(groupMetadataManager, times(1)).createGroupTombstoneRecords(ArgumentMatchers.eq(groupId), anyList()); verify(offsetMetadataManager, times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList()); } assertEquals(expectedResult, coordinatorResult); @@ -291,7 +291,7 @@ public class GroupCoordinatorShardTest { List<Record> records = invocation.getArgument(1); records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId)); return null; - }).when(groupMetadataManager).deleteGroup(anyString(), anyList()); + }).when(groupMetadataManager).createGroupTombstoneRecords(anyString(), anyList()); CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> coordinatorResult = coordinator.deleteGroups(context, groupIds); @@ -299,7 +299,7 @@ public class GroupCoordinatorShardTest { for (String groupId : groupIds) { verify(groupMetadataManager, times(1)).validateDeleteGroup(eq(groupId)); if (!groupId.equals("group-id-2")) { - verify(groupMetadataManager, times(1)).deleteGroup(eq(groupId), anyList()); + verify(groupMetadataManager, times(1)).createGroupTombstoneRecords(eq(groupId), anyList()); verify(offsetMetadataManager, times(1)).deleteAllOffsets(eq(groupId), anyList()); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 43703059915..dc21a2140d2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -434,7 +434,7 @@ public class GroupMetadataManagerTest { )); assertThrows(GroupIdNotFoundException.class, () -> - context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false)); + context.groupMetadataManager.consumerGroup(groupId)); CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( new ConsumerGroupHeartbeatRequestData() @@ -2382,7 +2382,7 @@ public class GroupMetadataManagerTest { // The metadata refresh flag should be true. ConsumerGroup consumerGroup = context.groupMetadataManager - .getOrMaybeCreateConsumerGroup(groupId, false); + .consumerGroup(groupId); assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds())); // Prepare the assignment result. @@ -2493,7 +2493,7 @@ public class GroupMetadataManagerTest { // The metadata refresh flag should be true. ConsumerGroup consumerGroup = context.groupMetadataManager - .getOrMaybeCreateConsumerGroup(groupId, false); + .consumerGroup(groupId); assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds())); // Prepare the assignment result. @@ -2731,7 +2731,7 @@ public class GroupMetadataManagerTest { // Ensures that all refresh flags are set to the future. Arrays.asList("group1", "group2", "group3", "group4", "group5").forEach(groupId -> { - ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); + ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); group.setMetadataRefreshDeadline(context.time.milliseconds() + 5000L, 0); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); @@ -2768,12 +2768,12 @@ public class GroupMetadataManagerTest { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { - ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); + ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Collections.singletonList("group5").forEach(groupId -> { - ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); + ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); @@ -9167,7 +9167,7 @@ public class GroupMetadataManagerTest { List<Record> expectedRecords = Collections.singletonList(RecordHelpers.newGroupMetadataTombstoneRecord("group-id")); List<Record> records = new ArrayList<>(); - context.groupMetadataManager.deleteGroup("group-id", records); + context.groupMetadataManager.createGroupTombstoneRecords("group-id", records); assertEquals(expectedRecords, records); } @@ -9205,7 +9205,7 @@ public class GroupMetadataManagerTest { RecordHelpers.newGroupEpochTombstoneRecord(groupId) ); List<Record> records = new ArrayList<>(); - context.groupMetadataManager.deleteGroup(groupId, records); + context.groupMetadataManager.createGroupTombstoneRecords("group-id", records); assertEquals(expectedRecords, records); } @@ -9380,6 +9380,146 @@ public class GroupMetadataManagerTest { verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); } + @Test + public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() { + String classicGroupId = "classic-group-id"; + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + assertThrows(GroupIdNotFoundException.class, () -> + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); + } + + @Test + public void testConsumerGroupHeartbeatWithEmptyClassicGroup() { + String classicGroupId = "classic-group-id"; + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(Collections.emptyMap()) + .build(); + + assertEquals(Errors.NONE.code(), result.response().errorCode()); + assertEquals( + Arrays.asList( + RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId), + RecordHelpers.newMemberSubscriptionRecord(classicGroupId, expectedMember), + RecordHelpers.newGroupEpochRecord(classicGroupId, 1), + RecordHelpers.newTargetAssignmentRecord(classicGroupId, memberId, Collections.emptyMap()), + RecordHelpers.newTargetAssignmentEpochRecord(classicGroupId, 1), + RecordHelpers.newCurrentAssignmentRecord(classicGroupId, expectedMember) + ), + result.records() + ); + assertEquals( + Group.GroupType.CONSUMER, + context.groupMetadataManager.consumerGroup(classicGroupId).type() + ); + } + + @Test + public void testClassicGroupJoinWithNonEmptyConsumerGroup() throws Exception { + String consumerGroupId = "consumer-group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build())) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(consumerGroupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + assertEquals(Errors.GROUP_ID_NOT_FOUND.code(), joinResult.joinFuture.get().errorCode()); + } + + @Test + public void testClassicGroupJoinWithEmptyConsumerGroup() throws Exception { + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(consumerGroupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request, true); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId) + ); + + assertEquals(Errors.MEMBER_ID_REQUIRED.code(), joinResult.joinFuture.get().errorCode()); + assertEquals(expectedRecords, joinResult.records.subList(0, expectedRecords.size())); + assertEquals( + Group.GroupType.CLASSIC, + context.groupMetadataManager.getOrMaybeCreateClassicGroup(consumerGroupId, false).type() + ); + } + private static void checkJoinGroupResponse( JoinGroupResponseData expectedResponse, JoinGroupResponseData actualResponse, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index a02359ffed1..4a7cd8cae9e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -483,7 +483,7 @@ public class GroupMetadataManagerTestContext { String groupId ) { return groupMetadataManager - .getOrMaybeCreateConsumerGroup(groupId, false) + .consumerGroup(groupId) .state(); } @@ -492,7 +492,7 @@ public class GroupMetadataManagerTestContext { String memberId ) { return groupMetadataManager - .getOrMaybeCreateConsumerGroup(groupId, false) + .consumerGroup(groupId) .getOrMaybeCreateMember(memberId, false) .state(); }