jeffkbkim commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1340788752
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -936,4 +940,216 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { assertEquals(expectedResponse, future.get()); } + + @Test + public void testDeleteOffsets() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); + requestTopicCollection.add( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ); + OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = + new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); + responsePartitionCollection.add( + new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0) + ); + OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = + new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); + responseTopicCollection.add( + new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection) + ); + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setTopics(responseTopicCollection); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteOffsetsInvalidGroupId() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); + requestTopicCollection.add( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ); + OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(response)); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteOffsetsCoordinatorNotAvailableException() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 1); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); + requestTopicCollection.add( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) + )) + ); + OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("group") + .setTopics(requestTopicCollection); + + OffsetDeleteResponseData response = new OffsetDeleteResponseData() + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-offsets"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture( + new CoordinatorLoadInProgressException(null) + )); + + CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets( + requestContext(ApiKeys.OFFSET_DELETE), + request, + BufferSupplier.NO_CACHING + ); + + assertTrue(future.isDone()); + assertEquals(response, future.get()); + } + + @Test + public void testDeleteGroups() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + service.startup(() -> 3); + CountDownLatch latch = new CountDownLatch(1); + + DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + DeleteGroupsResponseData.DeletableGroupResult result1 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-1"); + resultCollection1.add(result1); + + DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection2 = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + DeleteGroupsResponseData.DeletableGroupResult result2 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-2"); + resultCollection2.add(result2); + + DeleteGroupsResponseData.DeletableGroupResult result3 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-3") + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); + + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + expectedResultCollection.addAll(Arrays.asList( + new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()), + result1.duplicate(), + result2.duplicate(), + result3.duplicate() + )); + + when(runtime.partitions()).thenReturn(Sets.newSet( + new TopicPartition("__consumer_offsets", 0), + new TopicPartition("__consumer_offsets", 1), + new TopicPartition("__consumer_offsets", 2) + )); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(resultCollection1)); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("delete-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenAnswer(invocation -> CompletableFuture.supplyAsync(() -> { + try { + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ignored) { } + return resultCollection2; + })); Review Comment: this was my suggestion. your suggestion is much simpler, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org