jeffkbkim commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1340788752


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +940,216 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+        responsePartitionCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+        );
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        responseTopicCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+        );
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsCoordinatorNotAvailableException() throws 
Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            new CoordinatorLoadInProgressException(null)
+        ));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 3);
+        CountDownLatch latch = new CountDownLatch(1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-3")
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(Arrays.asList(
+            new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()),
+            result1.duplicate(),
+            result2.duplicate(),
+            result3.duplicate()
+        ));
+
+        when(runtime.partitions()).thenReturn(Sets.newSet(
+            new TopicPartition("__consumer_offsets", 0),
+            new TopicPartition("__consumer_offsets", 1),
+            new TopicPartition("__consumer_offsets", 2)
+        ));
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenAnswer(invocation -> CompletableFuture.supplyAsync(() -> {
+            try {
+                assertTrue(latch.await(5, TimeUnit.SECONDS));
+            } catch (InterruptedException ignored) { }
+            return resultCollection2;
+        }));

Review Comment:
   this was my suggestion. your suggestion is much simpler, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to