jeffkbkim commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1343258721
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3071,6 +3071,33 @@ private void removeCurrentMemberFromGenericGroup( group.remove(member.memberId()); } + /** + * Handles a DeleteGroups request. + * Populates the record list passed in with record to update the state machine. + * Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)} by + * calling {@link GroupMetadataManager#validateDeleteGroup(String)}. + * + * @param groupId The ID of the group to be deleted. It has been checked in {@link GroupMetadataManager#validateDeleteGroup}. + * @param records The record list to populate. + */ + public void deleteGroup( + String groupId, + List<Record> records + ) { + // In this method, we only populate records with tombstone records, so we don't expect an exception to be thrown here. Review Comment: "At this point, we have already validated the group id so we know that the group exists and that no exception will be thrown." how's this? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -341,6 +384,22 @@ public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave( return groupMetadataManager.genericGroupLeave(context, request); } + /** + * Handles a OffsetDelete request. Review Comment: nit: an ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -333,6 +349,94 @@ public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( return new CoordinatorResult<>(records, response); } + /** + * Handles an OffsetDelete request. + * + * @param request The OffsetDelete request. + * + * @return A Result containing the OffsetDeleteResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( + OffsetDeleteRequestData request + ) throws ApiException { + final Group group = validateOffsetDelete(request); + final List<Record> records = new ArrayList<>(); + final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = Review Comment: what's the benefit of using final variables here? ########## clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.DeleteGroupsRequestData; +import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class DeleteGroupsRequestTest { + + protected static String groupId1 = "group-id-1"; + protected static String groupId2 = "group-id-2"; Review Comment: we can move these into the test as well ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3071,6 +3071,33 @@ private void removeCurrentMemberFromGenericGroup( group.remove(member.memberId()); } + /** + * Handles a DeleteGroups request. + * Populates the record list passed in with record to update the state machine. + * Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)} by + * calling {@link GroupMetadataManager#validateDeleteGroup(String)}. + * + * @param groupId The ID of the group to be deleted. It has been checked in {@link GroupMetadataManager#validateDeleteGroup}. Review Comment: nit: can we change all usages of "ID" to "id"? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -936,4 +938,253 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { assertEquals(expectedResponse, future.get()); } + + @Test + public void testDeleteOffsets() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData() + .setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList( + new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0) + ).iterator()); + OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList( + new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection) + ).iterator()); + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setTopics(responseTopicCollection); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteOffsetsInvalidGroupId() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + private static Stream<Arguments> testDeleteOffsetsWithExceptionSource() { + return testConsumerGroupHeartbeatWithExceptionSource(); + } + + @ParameterizedTest + @MethodSource("testDeleteOffsetsWithExceptionSource") + public void testDeleteOffsetsWithException( + Throwable exception, + short expectedErrorCode + ) throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData() + .setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(expectedErrorCode); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture(exception)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteGroups() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 3); + + DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + DeleteGroupsResponseData.DeletableGroupResult result1 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-1"); + resultCollection1.add(result1); + + DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection2 = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + DeleteGroupsResponseData.DeletableGroupResult result2 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-2"); + resultCollection2.add(result2); + + DeleteGroupsResponseData.DeletableGroupResult result3 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-3") + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); + + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + expectedResultCollection.addAll(Arrays.asList( + new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()), + result2.duplicate(), + result3.duplicate(), + result1.duplicate() + )); + + when(runtime.partitions()).thenReturn(Sets.newSet( + new TopicPartition("__consumer_offsets", 0), + new TopicPartition("__consumer_offsets", 1), + new TopicPartition("__consumer_offsets", 2) + )); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(resultCollection1)); + + CompletableFuture<Object> resultCollectionFuture = new CompletableFuture<>(); + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(resultCollectionFuture); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())); + + List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", "group-id-3", null); + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = + service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), groupIds, BufferSupplier.NO_CACHING); + + assertFalse(future.isDone()); + resultCollectionFuture.complete(resultCollection2); + Review Comment: can we assert true that the future is done? ########## clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.DeleteGroupsRequestData; +import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class DeleteGroupsRequestTest { + + protected static String groupId1 = "group-id-1"; + protected static String groupId2 = "group-id-2"; + + private static DeleteGroupsRequestData data; + + @BeforeEach + public void setUp() { + data = new DeleteGroupsRequestData() + .setGroupsNames(Arrays.asList(groupId1, groupId2)); + } Review Comment: was this addressed? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -936,4 +938,253 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { assertEquals(expectedResponse, future.get()); } + + @Test + public void testDeleteOffsets() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData() + .setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList( + new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0) + ).iterator()); + OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList( + new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection) + ).iterator()); + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setTopics(responseTopicCollection); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteOffsetsInvalidGroupId() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + private static Stream<Arguments> testDeleteOffsetsWithExceptionSource() { + return testConsumerGroupHeartbeatWithExceptionSource(); + } + + @ParameterizedTest + @MethodSource("testDeleteOffsetsWithExceptionSource") + public void testDeleteOffsetsWithException( + Throwable exception, + short expectedErrorCode + ) throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData() + .setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(expectedErrorCode); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture(exception)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteGroups() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 3); + + DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + DeleteGroupsResponseData.DeletableGroupResult result1 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-1"); + resultCollection1.add(result1); + + DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection2 = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + DeleteGroupsResponseData.DeletableGroupResult result2 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-2"); + resultCollection2.add(result2); + + DeleteGroupsResponseData.DeletableGroupResult result3 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-3") + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); + + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + expectedResultCollection.addAll(Arrays.asList( + new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()), + result2.duplicate(), + result3.duplicate(), + result1.duplicate() + )); + + when(runtime.partitions()).thenReturn(Sets.newSet( + new TopicPartition("__consumer_offsets", 0), + new TopicPartition("__consumer_offsets", 1), + new TopicPartition("__consumer_offsets", 2) + )); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(resultCollection1)); + + CompletableFuture<Object> resultCollectionFuture = new CompletableFuture<>(); + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(resultCollectionFuture); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())); + + List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", "group-id-3", null); + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = + service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), groupIds, BufferSupplier.NO_CACHING); + + assertFalse(future.isDone()); + resultCollectionFuture.complete(resultCollection2); + + assertEquals(expectedResultCollection, future.get()); + } + + private static Stream<Arguments> testDeleteGroupsWithExceptionSource() { + return testConsumerGroupHeartbeatWithExceptionSource(); + } + + @ParameterizedTest + @MethodSource("testDeleteGroupsWithExceptionSource") Review Comment: can we use ``` @MethodSource("testConsumerGroupHeartbeatWithExceptionSource") ``` and remove the helper? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -523,9 +507,50 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + final List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>> futures = + new ArrayList<>(groupIds.size()); + + final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>(); + groupIds.forEach(groupId -> { + // For backwards compatibility, we support DeleteGroups for the empty group id. + if (groupId == null) { + futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection( + Collections.singletonList(null), + Errors.INVALID_GROUP_ID + ))); + } else { + final TopicPartition topicPartition = topicPartitionFor(groupId); + groupsByTopicPartition + .computeIfAbsent(topicPartition, __ -> new ArrayList<>()) + .add(groupId); + } + }); + + groupsByTopicPartition.forEach((topicPartition, groupList) -> { + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = + runtime.scheduleWriteOperation( + "delete-groups", + topicPartition, + coordinator -> coordinator.deleteGroups(context, groupList) + ).exceptionally(exception -> + DeleteGroupsRequest.getErrorResultCollection(groupList, getErrorsForException(exception)) + ); + + futures.add(future); + }); + + final CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + return allFutures.thenApply(v -> { Review Comment: we can remove the "v" ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ########## @@ -90,4 +92,28 @@ void validateOffsetFetch( int memberEpoch, long lastCommittedOffset ) throws KafkaException; + + /** + * Validates the OffsetDelete request. + */ + void validateOffsetDelete() throws KafkaException; + + /** + * Validates the DeleteGroups request. + */ + void validateDeleteGroup() throws KafkaException; + + /** + * Returns true if the group is actively subscribed to the topic. + * + * @param topic The topic name. + * @return Whether the group is subscribed to the topic. + */ + boolean isSubscribedToTopic(String topic); + /** Review Comment: nit: newline ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -523,9 +526,84 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + final List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>> futures = + new ArrayList<>(groupIds.size()); + + final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>(); + groupIds.forEach(groupId -> { + // For backwards compatibility, we support DeleteGroups for the empty group id. + if (groupId == null) { + futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection( + Collections.singletonList(null), + Errors.INVALID_GROUP_ID + ))); + } else { + final TopicPartition topicPartition = topicPartitionFor(groupId); + groupsByTopicPartition + .computeIfAbsent(topicPartition, __ -> new ArrayList<>()) + .add(groupId); + } + }); + + groupsByTopicPartition.forEach((topicPartition, groupList) -> { + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = + runtime.scheduleWriteOperation( + "delete-groups", + topicPartition, + coordinator -> coordinator.deleteGroups(context, groupList) + ).exceptionally(exception -> { + if (exception instanceof UnknownTopicOrPartitionException || + exception instanceof NotEnoughReplicasException) { + return DeleteGroupsRequest.getErrorResultCollection( + groupList, + Errors.COORDINATOR_NOT_AVAILABLE + ); + } + + if (exception instanceof NotLeaderOrFollowerException || + exception instanceof KafkaStorageException) { + return DeleteGroupsRequest.getErrorResultCollection( + groupList, + Errors.NOT_COORDINATOR + ); + } + + if (exception instanceof RecordTooLargeException || + exception instanceof RecordBatchTooLargeException || + exception instanceof InvalidFetchSizeException) { + return DeleteGroupsRequest.getErrorResultCollection( + groupList, + Errors.UNKNOWN_SERVER_ERROR + ); + } + + return DeleteGroupsRequest.getErrorResultCollection( + groupList, + Errors.forException(exception) + ); + }); + + futures.add(future); + }); + + final CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + final CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> resFuture = allFutures.thenApply(v -> { Review Comment: also, we can remove the `v` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -333,6 +349,94 @@ public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( return new CoordinatorResult<>(records, response); } + /** + * Handles an OffsetDelete request. + * + * @param request The OffsetDelete request. + * + * @return A Result containing the OffsetDeleteResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( + OffsetDeleteRequestData request + ) throws ApiException { + final Group group = validateOffsetDelete(request); + final List<Record> records = new ArrayList<>(); + final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); + final TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = + offsetsByGroup.get(request.groupId()); + + request.topics().forEach(topic -> { + final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); + + if (group.isSubscribedToTopic(topic.name())) { + topic.partitions().forEach(partition -> + responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code()) + ) + ); + } else { + final TimelineHashMap<Integer, OffsetAndMetadata> offsetsByPartition = offsetsByTopic == null ? + null : offsetsByTopic.get(topic.name()); + if (offsetsByPartition != null) { + topic.partitions().forEach(partition -> { + if (offsetsByPartition.containsKey(partition.partitionIndex())) { + responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + ); + records.add(RecordHelpers.newOffsetCommitTombstoneRecord( + request.groupId(), + topic.name(), + partition.partitionIndex() + )); + } + }); + } + } + + responseTopicCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponseTopic() + .setName(topic.name()) + .setPartitions(responsePartitionCollection) + ); + }); + + return new CoordinatorResult<>( + records, + new OffsetDeleteResponseData().setTopics(responseTopicCollection) + ); + } + + /** + * Deletes offsets as part of a DeleteGroups request. + * Populates the record list passed in with records to update the state machine. + * Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)} + * + * @param groupId The ID of the given group. + * @param records The record list to populate. + * + * @return The number of offsets to be deleted. + */ + public int deleteAllOffsets( + String groupId, + List<Record> records + ) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsetsByGroup.get(groupId); + AtomicInteger numDeletedOffsets = new AtomicInteger(); + + if (offsetsByTopic != null) { + offsetsByTopic.forEach((topic, offsetsByPartition) -> Review Comment: we can use ``` offsetsByPartition.keySet().forEach(partition -> ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -592,6 +607,33 @@ public void validateOffsetFetch( validateMemberEpoch(memberEpoch, member.memberEpoch()); } + /** + * Validates the OffsetDelete request. + */ + @Override + public void validateOffsetDelete() {} + + /** + * Validates the DeleteGroups request. + */ + @Override + public void validateDeleteGroup() throws ApiException { + if (state() != ConsumerGroupState.EMPTY) { + throw Errors.NON_EMPTY_GROUP.exception(); + } + } + + /** + * Populates the list of records with tombstone(s) for deleting the group. + * + * @param records The list of records. + */ + public void createGroupTombstoneRecords(List<Record> records) { Review Comment: @Override? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -105,11 +119,126 @@ public void testCommitOffset() { assertEquals(result, coordinator.commitOffset(context, request)); } + @Test + public void testDeleteGroups() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager + ); + + RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); + List<String> groupIds = Arrays.asList("group-id-1", "group-id-2"); + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + List<Record> expectedRecords = new ArrayList<>(); + for (String groupId : groupIds) { + expectedResultCollection.add(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId)); + expectedRecords.addAll(Arrays.asList( + RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord(groupId) + )); + } + + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> expectedResult = new CoordinatorResult<>( + expectedRecords, + expectedResultCollection + ); + + doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString()); Review Comment: can we change all of the `doSomething...when...` to `when().doSomething`? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -936,4 +938,253 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { assertEquals(expectedResponse, future.get()); } + + @Test + public void testDeleteOffsets() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData() + .setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList( + new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0) + ).iterator()); + OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList( + new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection) + ).iterator()); + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setTopics(responseTopicCollection); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteOffsetsInvalidGroupId() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ).iterator()); + OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + private static Stream<Arguments> testDeleteOffsetsWithExceptionSource() { + return testConsumerGroupHeartbeatWithExceptionSource(); + } + + @ParameterizedTest + @MethodSource("testDeleteOffsetsWithExceptionSource") Review Comment: can we use ``` @MethodSource("testConsumerGroupHeartbeatWithExceptionSource") ``` and remove the helper method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org