dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1344185641


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -827,4 +835,28 @@ public void shutdown() {
     private static boolean isGroupIdNotEmpty(String groupId) {
         return groupId != null && !groupId.isEmpty();
     }
+
+    /**
+     * Handles the exception in the scheduleWriteOperation.
+     * @return The Errors instance associated with the given exception.
+     */
+    private Errors getErrorsForException(Throwable exception) {

Review Comment:
   nit: If we keep it, the method could be static and we usually don't prefix 
methods with `get`. `normalizeException` maybe an alternative name.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +278,51 @@ public HeartbeatResponseData genericGroupHeartbeat(
         );
     }
 
+    /**
+     * Handles a DeleteGroups request.
+     *
+     * @param context   The request context.
+     * @param groupIds  The groupIds of the groups to be deleted
+     * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+     *         a list of records to update the state machine.
+     */
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> deleteGroups(
+        RequestContext context,
+        List<String> groupIds
+    ) throws ApiException {
+        final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+            new 
DeleteGroupsResponseData.DeletableGroupResultCollection(groupIds.size());
+        final List<Record> records = new ArrayList<>();
+        final AtomicInteger numDeletedOffsets = new AtomicInteger();
+        final List<String> deletedGroups = new ArrayList<>();
+
+        groupIds.forEach(groupId -> {
+            try {
+                groupMetadataManager.validateDeleteGroup(groupId);
+                
numDeletedOffsets.addAndGet(offsetMetadataManager.deleteAllOffsets(groupId, 
records));
+                groupMetadataManager.deleteGroup(groupId, records);
+                deletedGroups.add(groupId);
+
+                resultCollection.add(
+                    new DeleteGroupsResponseData.DeletableGroupResult()
+                        .setGroupId(groupId)
+                );
+            } catch (ApiException exception) {
+                resultCollection.add(
+                    new DeleteGroupsResponseData.DeletableGroupResult()
+                        .setGroupId(groupId)
+                        .setErrorCode(Errors.forException(exception).code())
+                );
+            }
+        });
+
+        log.info("The following groups were deleted: {}. A total of {} offsets 
were removed",
+            String.join(", ", deletedGroups),
+            numDeletedOffsets
+        );

Review Comment:
   nit: `... removed.`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,11 +119,126 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        List<Record> expectedRecords = new ArrayList<>();
+        for (String groupId : groupIds) {
+            expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+            expectedRecords.addAll(Arrays.asList(
+                RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0),
+                RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+            ));
+        }
+
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+        when(offsetMetadataManager.deleteAllOffsets(anyString(), 
anyList())).thenAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0));
+            return 1;
+        });
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+            return null;
+        }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> coordinatorResult =
+            coordinator.deleteGroups(context, groupIds);
+
+        for (String groupId : groupIds) {
+            verify(groupMetadataManager, 
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
+            verify(groupMetadataManager, 
times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
+            verify(offsetMetadataManager, 
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
+        }
+        assertEquals(expectedResult, coordinatorResult);
+    }
+
+    @Test
+    public void testDeleteGroupsInvalidGroupId() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", 
"group-id-3");
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new 
DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId("group-id-1"),
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId("group-id-2")
+                    .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId("group-id-3")
+            ).iterator());
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3")
+        );
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        doThrow(Errors.INVALID_GROUP_ID.exception())

Review Comment:
   ditto.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,11 +119,126 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        List<Record> expectedRecords = new ArrayList<>();
+        for (String groupId : groupIds) {
+            expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+            expectedRecords.addAll(Arrays.asList(
+                RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0),
+                RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+            ));
+        }
+
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+        when(offsetMetadataManager.deleteAllOffsets(anyString(), 
anyList())).thenAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0));
+            return 1;
+        });
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+            return null;
+        }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> coordinatorResult =
+            coordinator.deleteGroups(context, groupIds);
+
+        for (String groupId : groupIds) {
+            verify(groupMetadataManager, 
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
+            verify(groupMetadataManager, 
times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
+            verify(offsetMetadataManager, 
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
+        }
+        assertEquals(expectedResult, coordinatorResult);
+    }
+
+    @Test
+    public void testDeleteGroupsInvalidGroupId() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", 
"group-id-3");
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new 
DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId("group-id-1"),
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId("group-id-2")
+                    .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId("group-id-3")
+            ).iterator());
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3")
+        );
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        doThrow(Errors.INVALID_GROUP_ID.exception())
+            
.when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.eq("group-id-2"));
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0));
+            return null;
+        }).when(offsetMetadataManager).deleteAllOffsets(anyString(), 
anyList());
+        doAnswer(invocation -> {

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -259,30 +262,11 @@ public 
CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartb
             "consumer-group-heartbeat",
             topicPartitionFor(request.groupId()),
             coordinator -> coordinator.consumerGroupHeartbeat(context, request)
-        ).exceptionally(exception -> {
-            if (exception instanceof UnknownTopicOrPartitionException ||
-                exception instanceof NotEnoughReplicasException) {
-                return new ConsumerGroupHeartbeatResponseData()
-                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
-            }
-
-            if (exception instanceof NotLeaderOrFollowerException ||
-                exception instanceof KafkaStorageException) {
-                return new ConsumerGroupHeartbeatResponseData()
-                    .setErrorCode(Errors.NOT_COORDINATOR.code());
-            }
-
-            if (exception instanceof RecordTooLargeException ||
-                exception instanceof RecordBatchTooLargeException ||
-                exception instanceof InvalidFetchSizeException) {
-                return new ConsumerGroupHeartbeatResponseData()
-                    .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code());
-            }
-
-            return new ConsumerGroupHeartbeatResponseData()
-                .setErrorCode(Errors.forException(exception).code())
-                .setErrorMessage(exception.getMessage());
-        });
+        ).exceptionally(exception ->
+            new ConsumerGroupHeartbeatResponseData()
+                .setErrorCode(getErrorsForException(exception).code())

Review Comment:
   I think that we should be careful with this. The change is not 100% 
equivalent to the previous implementation here because the error message is not 
set for all errors whereas it was only set of a sub set before. While I agree 
that we could do better, I would suggest to tackle this in a separate PR.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +278,51 @@ public HeartbeatResponseData genericGroupHeartbeat(
         );
     }
 
+    /**
+     * Handles a DeleteGroups request.
+     *
+     * @param context   The request context.
+     * @param groupIds  The groupIds of the groups to be deleted
+     * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+     *         a list of records to update the state machine.
+     */
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> deleteGroups(
+        RequestContext context,
+        List<String> groupIds
+    ) throws ApiException {
+        final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+            new 
DeleteGroupsResponseData.DeletableGroupResultCollection(groupIds.size());
+        final List<Record> records = new ArrayList<>();
+        final AtomicInteger numDeletedOffsets = new AtomicInteger();

Review Comment:
   Why do we need an AtomicInteger here?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,87 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic == null ?
+                null : offsetsByTopic.get(topic.name());
+
+            if (group.isSubscribedToTopic(topic.name())) {
+                topic.partitions().forEach(partition ->
+                    responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
+                    )
+                );
+            } else {
+                topic.partitions().forEach(partition -> {
+                    if (offsetsByPartition != null && 
offsetsByPartition.containsKey(partition.partitionIndex())) {
+                        responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                            .setPartitionIndex(partition.partitionIndex())
+                        );
+                        
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+                            request.groupId(),
+                            topic.name(),
+                            partition.partitionIndex()
+                        ));
+                    }
+                });
+            }
+
+            final OffsetDeleteResponseData.OffsetDeleteResponseTopic 
responseTopic =
+                new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+                    .setName(topic.name())
+                    .setPartitions(responsePartitionCollection);
+            responseTopicCollection.add(responseTopic);
+        });
+        response.setTopics(responseTopicCollection);
+
+        return new CoordinatorResult<>(records, response);
+    }
+
+    /**
+     * Deletes offsets as part of a DeleteGroups request.
+     * Populates the record list passed in with records to update the state 
machine.
+     * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)}
+     *
+     * @param groupId The ID of the given group.
+     * @param records The record list to populate.
+     */
+    public void deleteAllOffsets(
+        String groupId,
+        List<Record> records
+    ) {
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic = offsetsByGroup.get(groupId);
+
+        if (offsetsByTopic != null) {

Review Comment:
   If you look at the usage in 
`[GroupCoordinatorShard.java](https://github.com/apache/kafka/pull/14408/files#diff-d6369ef583dce1f7570cf396d7a4762c679fd2af323e1e1f93c9b665258373a0)`,
 all offsets are removed before deleting the group.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,11 +119,126 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        List<Record> expectedRecords = new ArrayList<>();
+        for (String groupId : groupIds) {
+            expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+            expectedRecords.addAll(Arrays.asList(
+                RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0),
+                RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+            ));
+        }
+
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+        when(offsetMetadataManager.deleteAllOffsets(anyString(), 
anyList())).thenAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0));
+            return 1;
+        });
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+            return null;
+        }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> coordinatorResult =
+            coordinator.deleteGroups(context, groupIds);
+
+        for (String groupId : groupIds) {
+            verify(groupMetadataManager, 
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
+            verify(groupMetadataManager, 
times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
+            verify(offsetMetadataManager, 
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
+        }
+        assertEquals(expectedResult, coordinatorResult);
+    }
+
+    @Test
+    public void testDeleteGroupsInvalidGroupId() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", 
"group-id-3");
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new 
DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId("group-id-1"),
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId("group-id-2")
+                    .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+                new DeleteGroupsResponseData.DeletableGroupResult()
+                    .setGroupId("group-id-3")
+            ).iterator());
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3")
+        );
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        doThrow(Errors.INVALID_GROUP_ID.exception())
+            
.when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.eq("group-id-2"));
+        doAnswer(invocation -> {

Review Comment:
   ditto.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,11 +119,126 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        List<Record> expectedRecords = new ArrayList<>();
+        for (String groupId : groupIds) {
+            expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+            expectedRecords.addAll(Arrays.asList(
+                RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0),
+                RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+            ));
+        }
+
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+        when(offsetMetadataManager.deleteAllOffsets(anyString(), 
anyList())).thenAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0));
+            return 1;
+        });
+        doAnswer(invocation -> {

Review Comment:
   ditto.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,11 +119,126 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        List<Record> expectedRecords = new ArrayList<>();
+        for (String groupId : groupIds) {
+            expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+            expectedRecords.addAll(Arrays.asList(
+                RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0),
+                RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+            ));
+        }
+
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());

Review Comment:
   I agree. I mentioned this a few times a well.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +349,94 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =

Review Comment:
   I guess that they don't hurt, isn't it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to