jeffkbkim commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1336348269
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ########## @@ -90,4 +92,29 @@ void validateOffsetFetch( int memberEpoch, long lastCommittedOffset ) throws KafkaException; + + /** + * Validates the OffsetDelete request. + */ + void validateOffsetDelete() throws KafkaException; + + /** + * Validates the GroupDelete request. + */ + void validateGroupDelete() throws KafkaException; + + /** + * Returns true if the group is actively subscribed to the topic. + * + * @param topic The topic name. + * @return whether the group is subscribed to the topic. + */ + boolean isSubscribedToTopic(String topic); + + /** + * Creates tombstone(s) for deleting the group. + * + * @return The list of tombstone record(s). + */ + List<Record> createMetadataTombstoneRecords(); Review Comment: i wonder if createGroupTombstoneRecords() makes more sense ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -105,6 +114,70 @@ public void testCommitOffset() { assertEquals(result, coordinator.commitOffset(context, request)); } + @Test + public void testDeleteGroup() { Review Comment: nit: testDeleteGroups also, can we verify the number of method invocations and also test that we append records correctly for multiple groups? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ########## @@ -1561,6 +1567,156 @@ public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() { () -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE)); } + private void testOffsetDeleteWith( Review Comment: should this be a static method? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java: ########## @@ -1026,6 +1028,30 @@ public void testValidateOffsetCommit() { () -> group.validateOffsetCommit("member-id", "new-instance-id", 1)); } + @Test + public void testValidateOffsetDelete() { + group.transitionTo(PREPARING_REBALANCE); + assertThrows(GroupNotEmptyException.class, () -> group.validateOffsetDelete()); + group.transitionTo(COMPLETING_REBALANCE); + assertThrows(GroupNotEmptyException.class, () -> group.validateOffsetDelete()); + group.transitionTo(STABLE); + assertThrows(GroupNotEmptyException.class, () -> group.validateOffsetDelete()); + group.transitionTo(DEAD); + assertThrows(GroupIdNotFoundException.class, () -> group.validateOffsetDelete()); Review Comment: should we add EMPTY test case? also for testValidateGroupDelete ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -262,6 +267,44 @@ public HeartbeatResponseData genericGroupHeartbeat( ); } + /** + * Handles a GroupDelete request. Review Comment: nit: "DeleteGroups" request. This should reflect the actual ApiKeys#DELETE_GROUPS name ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -105,6 +114,70 @@ public void testCommitOffset() { assertEquals(result, coordinator.commitOffset(context, request)); } + @Test + public void testDeleteGroup() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + groupMetadataManager, + offsetMetadataManager + ); + + RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); + List<String> groupIds = Collections.singletonList("group-id"); + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + expectedResultCollection.add(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id")); + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "topic-name", 0), + RecordHelpers.newGroupMetadataTombstoneRecord("group-id") + ); + CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> expectedResult = new CoordinatorResult<>( + expectedRecords, + expectedResultCollection + ); + + doNothing().when(groupMetadataManager).validateGroupDelete(ArgumentMatchers.eq("group-id")); + doAnswer(invocation -> { + List<Record> records = invocation.getArgument(1); + records.add(RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "topic-name", 0)); + return null; + }).when(offsetMetadataManager).deleteAllOffsets(ArgumentMatchers.eq("group-id"), anyList()); + doAnswer(invocation -> { + List<Record> records = invocation.getArgument(1); + records.add(RecordHelpers.newGroupMetadataTombstoneRecord("group-id")); + return null; + }).when(groupMetadataManager).deleteGroup(ArgumentMatchers.eq("group-id"), anyList()); + + assertEquals(expectedResult, coordinator.deleteGroups(context, groupIds)); + } + + @Test + public void testDeleteInvalidGroup() { Review Comment: nit: testDeleteGroupsInvalidGroupId can we also add a valid group id and verify the first stores invalid group id error and the second stores NONE? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -333,6 +348,81 @@ public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( return new CoordinatorResult<>(records, response); } + /** + * Handles an OffsetDelete request. + * + * @param request The OffsetDelete request. + * + * @return A Result containing the OffsetDeleteResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( + OffsetDeleteRequestData request + ) throws ApiException { + final Group group = validateOffsetDelete(request); + final List<Record> records = new ArrayList<>(); + final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); + final OffsetDeleteResponseData response = new OffsetDeleteResponseData(); + final TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = + offsetsByGroup.get(request.groupId()); + + request.topics().forEach(topic -> { + final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); + final boolean subscribedToTopic = group.isSubscribedToTopic(topic.name()); Review Comment: we can inline this to L380 ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ########## @@ -673,4 +675,32 @@ public void testValidateOffsetFetch() { // This should succeed. group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE); } + + @Test + public void testValidateGroupDelete() { + Uuid fooTopicId = Uuid.randomUuid(); Review Comment: this can be removed ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ########## @@ -673,4 +675,32 @@ public void testValidateOffsetFetch() { // This should succeed. group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE); } + + @Test + public void testValidateGroupDelete() { + Uuid fooTopicId = Uuid.randomUuid(); + ConsumerGroup consumerGroup = createConsumerGroup("foo"); + assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state()); + assertDoesNotThrow(() -> consumerGroup.validateGroupDelete()); Review Comment: we can do `consumerGroup::validateGroupDelete` for this along with the other invocations in the test ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -936,4 +939,206 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { assertEquals(expectedResponse, future.get()); } + + @Test + public void testDeleteOffsets() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); + requestTopicCollection.add( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ); + OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); + responsePartitionCollection.add( + new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0) + ); + OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); + responseTopicCollection.add( + new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection) + ); + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setTopics(responseTopicCollection); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offset"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteOffsetsInvalidGroupId() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); + requestTopicCollection.add( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ); + OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offset"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteOffsetsCoordinatorNotAvailableException() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); + requestTopicCollection.add( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ); + OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offset"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture( + new CoordinatorLoadInProgressException(null) + )); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteGroups() throws Exception { Review Comment: can we add a test with three __consumer_offsets topic partitions where one finishes immediately, another takes a while, and the last coordinator throws an exception? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org