dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1344185641
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -827,4 +835,28 @@ public void shutdown() { private static boolean isGroupIdNotEmpty(String groupId) { return groupId != null && !groupId.isEmpty(); } + + /** + * Handles the exception in the scheduleWriteOperation. + * @return The Errors instance associated with the given exception. + */ + private Errors getErrorsForException(Throwable exception) { Review Comment: nit: If we keep it, the method could be static and we usually don't prefix methods with `get`. `normalizeException` maybe an alternative name. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -262,6 +278,51 @@ public HeartbeatResponseData genericGroupHeartbeat( ); } + /** + * Handles a DeleteGroups request. + * + * @param context The request context. + * @param groupIds The groupIds of the groups to be deleted + * @return A Result containing the DeleteGroupsResponseData.DeletableGroupResultCollection response and + * a list of records to update the state machine. + */ + public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> deleteGroups( + RequestContext context, + List<String> groupIds + ) throws ApiException { + final DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection = + new DeleteGroupsResponseData.DeletableGroupResultCollection(groupIds.size()); + final List<Record> records = new ArrayList<>(); + final AtomicInteger numDeletedOffsets = new AtomicInteger(); + final List<String> deletedGroups = new ArrayList<>(); + + groupIds.forEach(groupId -> { + try { + groupMetadataManager.validateDeleteGroup(groupId); + numDeletedOffsets.addAndGet(offsetMetadataManager.deleteAllOffsets(groupId, records)); + groupMetadataManager.deleteGroup(groupId, records); + deletedGroups.add(groupId); + + resultCollection.add( + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId(groupId) + ); + } catch (ApiException exception) { + resultCollection.add( + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId(groupId) + .setErrorCode(Errors.forException(exception).code()) + ); + } + }); + + log.info("The following groups were deleted: {}. A total of {} offsets were removed", + String.join(", ", deletedGroups), + numDeletedOffsets + ); Review Comment: nit: `... removed.`. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -105,11 +119,126 @@ public void testCommitOffset() { assertEquals(result, coordinator.commitOffset(context, request)); } + @Test + public void testDeleteGroups() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager + ); + + RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); + List<String> groupIds = Arrays.asList("group-id-1", "group-id-2"); + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + List<Record> expectedRecords = new ArrayList<>(); + for (String groupId : groupIds) { + expectedResultCollection.add(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId)); + expectedRecords.addAll(Arrays.asList( + RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord(groupId) + )); + } + + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> expectedResult = new CoordinatorResult<>( + expectedRecords, + expectedResultCollection + ); + + doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString()); + when(offsetMetadataManager.deleteAllOffsets(anyString(), anyList())).thenAnswer(invocation -> { + String groupId = invocation.getArgument(0); + List<Record> records = invocation.getArgument(1); + records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0)); + return 1; + }); + doAnswer(invocation -> { + String groupId = invocation.getArgument(0); + List<Record> records = invocation.getArgument(1); + records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId)); + return null; + }).when(groupMetadataManager).deleteGroup(anyString(), anyList()); + + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> coordinatorResult = + coordinator.deleteGroups(context, groupIds); + + for (String groupId : groupIds) { + verify(groupMetadataManager, times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId)); + verify(groupMetadataManager, times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList()); + verify(offsetMetadataManager, times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList()); + } + assertEquals(expectedResult, coordinatorResult); + } + + @Test + public void testDeleteGroupsInvalidGroupId() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager + ); + + RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); + List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", "group-id-3"); + + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = + new DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList( + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-1"), + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-2") + .setErrorCode(Errors.INVALID_GROUP_ID.code()), + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-3") + ).iterator()); + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"), + RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3", "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3") + ); + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> expectedResult = new CoordinatorResult<>( + expectedRecords, + expectedResultCollection + ); + + doThrow(Errors.INVALID_GROUP_ID.exception()) Review Comment: ditto. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -105,11 +119,126 @@ public void testCommitOffset() { assertEquals(result, coordinator.commitOffset(context, request)); } + @Test + public void testDeleteGroups() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager + ); + + RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); + List<String> groupIds = Arrays.asList("group-id-1", "group-id-2"); + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + List<Record> expectedRecords = new ArrayList<>(); + for (String groupId : groupIds) { + expectedResultCollection.add(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId)); + expectedRecords.addAll(Arrays.asList( + RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord(groupId) + )); + } + + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> expectedResult = new CoordinatorResult<>( + expectedRecords, + expectedResultCollection + ); + + doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString()); + when(offsetMetadataManager.deleteAllOffsets(anyString(), anyList())).thenAnswer(invocation -> { + String groupId = invocation.getArgument(0); + List<Record> records = invocation.getArgument(1); + records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0)); + return 1; + }); + doAnswer(invocation -> { + String groupId = invocation.getArgument(0); + List<Record> records = invocation.getArgument(1); + records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId)); + return null; + }).when(groupMetadataManager).deleteGroup(anyString(), anyList()); + + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> coordinatorResult = + coordinator.deleteGroups(context, groupIds); + + for (String groupId : groupIds) { + verify(groupMetadataManager, times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId)); + verify(groupMetadataManager, times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList()); + verify(offsetMetadataManager, times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList()); + } + assertEquals(expectedResult, coordinatorResult); + } + + @Test + public void testDeleteGroupsInvalidGroupId() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager + ); + + RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); + List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", "group-id-3"); + + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = + new DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList( + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-1"), + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-2") + .setErrorCode(Errors.INVALID_GROUP_ID.code()), + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-3") + ).iterator()); + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"), + RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3", "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3") + ); + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> expectedResult = new CoordinatorResult<>( + expectedRecords, + expectedResultCollection + ); + + doThrow(Errors.INVALID_GROUP_ID.exception()) + .when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.eq("group-id-2")); + doAnswer(invocation -> { + String groupId = invocation.getArgument(0); + List<Record> records = invocation.getArgument(1); + records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0)); + return null; + }).when(offsetMetadataManager).deleteAllOffsets(anyString(), anyList()); + doAnswer(invocation -> { Review Comment: ditto. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -259,30 +262,11 @@ public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartb "consumer-group-heartbeat", topicPartitionFor(request.groupId()), coordinator -> coordinator.consumerGroupHeartbeat(context, request) - ).exceptionally(exception -> { - if (exception instanceof UnknownTopicOrPartitionException || - exception instanceof NotEnoughReplicasException) { - return new ConsumerGroupHeartbeatResponseData() - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()); - } - - if (exception instanceof NotLeaderOrFollowerException || - exception instanceof KafkaStorageException) { - return new ConsumerGroupHeartbeatResponseData() - .setErrorCode(Errors.NOT_COORDINATOR.code()); - } - - if (exception instanceof RecordTooLargeException || - exception instanceof RecordBatchTooLargeException || - exception instanceof InvalidFetchSizeException) { - return new ConsumerGroupHeartbeatResponseData() - .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()); - } - - return new ConsumerGroupHeartbeatResponseData() - .setErrorCode(Errors.forException(exception).code()) - .setErrorMessage(exception.getMessage()); - }); + ).exceptionally(exception -> + new ConsumerGroupHeartbeatResponseData() + .setErrorCode(getErrorsForException(exception).code()) Review Comment: I think that we should be careful with this. The change is not 100% equivalent to the previous implementation here because the error message is not set for all errors whereas it was only set of a sub set before. While I agree that we could do better, I would suggest to tackle this in a separate PR. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -262,6 +278,51 @@ public HeartbeatResponseData genericGroupHeartbeat( ); } + /** + * Handles a DeleteGroups request. + * + * @param context The request context. + * @param groupIds The groupIds of the groups to be deleted + * @return A Result containing the DeleteGroupsResponseData.DeletableGroupResultCollection response and + * a list of records to update the state machine. + */ + public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> deleteGroups( + RequestContext context, + List<String> groupIds + ) throws ApiException { + final DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection = + new DeleteGroupsResponseData.DeletableGroupResultCollection(groupIds.size()); + final List<Record> records = new ArrayList<>(); + final AtomicInteger numDeletedOffsets = new AtomicInteger(); Review Comment: Why do we need an AtomicInteger here? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -333,6 +348,87 @@ public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( return new CoordinatorResult<>(records, response); } + /** + * Handles an OffsetDelete request. + * + * @param request The OffsetDelete request. + * + * @return A Result containing the OffsetDeleteResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( + OffsetDeleteRequestData request + ) throws ApiException { + final Group group = validateOffsetDelete(request); + final List<Record> records = new ArrayList<>(); + final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); + final OffsetDeleteResponseData response = new OffsetDeleteResponseData(); + final TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = + offsetsByGroup.get(request.groupId()); + + request.topics().forEach(topic -> { + final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); + final TimelineHashMap<Integer, OffsetAndMetadata> offsetsByPartition = offsetsByTopic == null ? + null : offsetsByTopic.get(topic.name()); + + if (group.isSubscribedToTopic(topic.name())) { + topic.partitions().forEach(partition -> + responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code()) + ) + ); + } else { + topic.partitions().forEach(partition -> { + if (offsetsByPartition != null && offsetsByPartition.containsKey(partition.partitionIndex())) { + responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + ); + records.add(RecordHelpers.newOffsetCommitTombstoneRecord( + request.groupId(), + topic.name(), + partition.partitionIndex() + )); + } + }); + } + + final OffsetDeleteResponseData.OffsetDeleteResponseTopic responseTopic = + new OffsetDeleteResponseData.OffsetDeleteResponseTopic() + .setName(topic.name()) + .setPartitions(responsePartitionCollection); + responseTopicCollection.add(responseTopic); + }); + response.setTopics(responseTopicCollection); + + return new CoordinatorResult<>(records, response); + } + + /** + * Deletes offsets as part of a DeleteGroups request. + * Populates the record list passed in with records to update the state machine. + * Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)} + * + * @param groupId The ID of the given group. + * @param records The record list to populate. + */ + public void deleteAllOffsets( + String groupId, + List<Record> records + ) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsetsByGroup.get(groupId); + + if (offsetsByTopic != null) { Review Comment: If you look at the usage in `[GroupCoordinatorShard.java](https://github.com/apache/kafka/pull/14408/files#diff-d6369ef583dce1f7570cf396d7a4762c679fd2af323e1e1f93c9b665258373a0)`, all offsets are removed before deleting the group. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -105,11 +119,126 @@ public void testCommitOffset() { assertEquals(result, coordinator.commitOffset(context, request)); } + @Test + public void testDeleteGroups() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager + ); + + RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); + List<String> groupIds = Arrays.asList("group-id-1", "group-id-2"); + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + List<Record> expectedRecords = new ArrayList<>(); + for (String groupId : groupIds) { + expectedResultCollection.add(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId)); + expectedRecords.addAll(Arrays.asList( + RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord(groupId) + )); + } + + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> expectedResult = new CoordinatorResult<>( + expectedRecords, + expectedResultCollection + ); + + doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString()); + when(offsetMetadataManager.deleteAllOffsets(anyString(), anyList())).thenAnswer(invocation -> { + String groupId = invocation.getArgument(0); + List<Record> records = invocation.getArgument(1); + records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0)); + return 1; + }); + doAnswer(invocation -> { + String groupId = invocation.getArgument(0); + List<Record> records = invocation.getArgument(1); + records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId)); + return null; + }).when(groupMetadataManager).deleteGroup(anyString(), anyList()); + + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> coordinatorResult = + coordinator.deleteGroups(context, groupIds); + + for (String groupId : groupIds) { + verify(groupMetadataManager, times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId)); + verify(groupMetadataManager, times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList()); + verify(offsetMetadataManager, times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList()); + } + assertEquals(expectedResult, coordinatorResult); + } + + @Test + public void testDeleteGroupsInvalidGroupId() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager + ); + + RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); + List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", "group-id-3"); + + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = + new DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList( + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-1"), + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-2") + .setErrorCode(Errors.INVALID_GROUP_ID.code()), + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-3") + ).iterator()); + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"), + RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3", "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3") + ); + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> expectedResult = new CoordinatorResult<>( + expectedRecords, + expectedResultCollection + ); + + doThrow(Errors.INVALID_GROUP_ID.exception()) + .when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.eq("group-id-2")); + doAnswer(invocation -> { Review Comment: ditto. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -105,11 +119,126 @@ public void testCommitOffset() { assertEquals(result, coordinator.commitOffset(context, request)); } + @Test + public void testDeleteGroups() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager + ); + + RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); + List<String> groupIds = Arrays.asList("group-id-1", "group-id-2"); + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + List<Record> expectedRecords = new ArrayList<>(); + for (String groupId : groupIds) { + expectedResultCollection.add(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId)); + expectedRecords.addAll(Arrays.asList( + RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord(groupId) + )); + } + + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> expectedResult = new CoordinatorResult<>( + expectedRecords, + expectedResultCollection + ); + + doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString()); + when(offsetMetadataManager.deleteAllOffsets(anyString(), anyList())).thenAnswer(invocation -> { + String groupId = invocation.getArgument(0); + List<Record> records = invocation.getArgument(1); + records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0)); + return 1; + }); + doAnswer(invocation -> { Review Comment: ditto. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -105,11 +119,126 @@ public void testCommitOffset() { assertEquals(result, coordinator.commitOffset(context, request)); } + @Test + public void testDeleteGroups() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager + ); + + RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); + List<String> groupIds = Arrays.asList("group-id-1", "group-id-2"); + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + List<Record> expectedRecords = new ArrayList<>(); + for (String groupId : groupIds) { + expectedResultCollection.add(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId)); + expectedRecords.addAll(Arrays.asList( + RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord(groupId) + )); + } + + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> expectedResult = new CoordinatorResult<>( + expectedRecords, + expectedResultCollection + ); + + doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString()); Review Comment: I agree. I mentioned this a few times a well. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -333,6 +349,94 @@ public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( return new CoordinatorResult<>(records, response); } + /** + * Handles an OffsetDelete request. + * + * @param request The OffsetDelete request. + * + * @return A Result containing the OffsetDeleteResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( + OffsetDeleteRequestData request + ) throws ApiException { + final Group group = validateOffsetDelete(request); + final List<Record> records = new ArrayList<>(); + final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = Review Comment: I guess that they don't hurt, isn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org