jeffkbkim commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1336348269


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -90,4 +92,29 @@ void validateOffsetFetch(
         int memberEpoch,
         long lastCommittedOffset
     ) throws KafkaException;
+
+    /**
+     * Validates the OffsetDelete request.
+     */
+    void validateOffsetDelete() throws KafkaException;
+
+    /**
+     * Validates the GroupDelete request.
+     */
+    void validateGroupDelete() throws KafkaException;
+
+    /**
+     * Returns true if the group is actively subscribed to the topic.
+     *
+     * @param topic The topic name.
+     * @return whether the group is subscribed to the topic.
+     */
+    boolean isSubscribedToTopic(String topic);
+
+    /**
+     * Creates tombstone(s) for deleting the group.
+     *
+     * @return The list of tombstone record(s).
+     */
+    List<Record> createMetadataTombstoneRecords();

Review Comment:
   i wonder if createGroupTombstoneRecords() makes more sense



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +114,70 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroup() {

Review Comment:
   nit: testDeleteGroups
   
   also, can we verify the number of method invocations and also test that we 
append records correctly for multiple groups?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1567,156 @@ public void 
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
             () -> context.fetchAllOffsets("group", "member", 10, 
Long.MAX_VALUE));
     }
 
+    private void testOffsetDeleteWith(

Review Comment:
   should this be a static method?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##########
@@ -1026,6 +1028,30 @@ public void testValidateOffsetCommit() {
             () -> group.validateOffsetCommit("member-id", "new-instance-id", 
1));
     }
 
+    @Test
+    public void testValidateOffsetDelete() {
+        group.transitionTo(PREPARING_REBALANCE);
+        assertThrows(GroupNotEmptyException.class, () -> 
group.validateOffsetDelete());
+        group.transitionTo(COMPLETING_REBALANCE);
+        assertThrows(GroupNotEmptyException.class, () -> 
group.validateOffsetDelete());
+        group.transitionTo(STABLE);
+        assertThrows(GroupNotEmptyException.class, () -> 
group.validateOffsetDelete());
+        group.transitionTo(DEAD);
+        assertThrows(GroupIdNotFoundException.class, () -> 
group.validateOffsetDelete());

Review Comment:
   should we add EMPTY test case? also for testValidateGroupDelete



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,44 @@ public HeartbeatResponseData genericGroupHeartbeat(
         );
     }
 
+    /**
+     * Handles a GroupDelete request.

Review Comment:
   nit: "DeleteGroups" request.
   
   This should reflect the actual ApiKeys#DELETE_GROUPS name



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +114,70 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroup() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Collections.singletonList("group-id");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id"));
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id")
+        );
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        
doNothing().when(groupMetadataManager).validateGroupDelete(ArgumentMatchers.eq("group-id"));
+        doAnswer(invocation -> {
+            List<Record> records = invocation.getArgument(1);
+            
records.add(RecordHelpers.newOffsetCommitTombstoneRecord("group-id", 
"topic-name", 0));
+            return null;
+        
}).when(offsetMetadataManager).deleteAllOffsets(ArgumentMatchers.eq("group-id"),
 anyList());
+        doAnswer(invocation -> {
+            List<Record> records = invocation.getArgument(1);
+            
records.add(RecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
+            return null;
+        
}).when(groupMetadataManager).deleteGroup(ArgumentMatchers.eq("group-id"), 
anyList());
+
+        assertEquals(expectedResult, coordinator.deleteGroups(context, 
groupIds));
+    }
+
+    @Test
+    public void testDeleteInvalidGroup() {

Review Comment:
   nit: testDeleteGroupsInvalidGroupId
   
   can we also add a valid group id and verify the first stores invalid group 
id error and the second stores NONE?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,81 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            final boolean subscribedToTopic = 
group.isSubscribedToTopic(topic.name());

Review Comment:
   we can inline this to L380



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -673,4 +675,32 @@ public void testValidateOffsetFetch() {
         // This should succeed.
         group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE);
     }
+
+    @Test
+    public void testValidateGroupDelete() {
+        Uuid fooTopicId = Uuid.randomUuid();

Review Comment:
   this can be removed



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -673,4 +675,32 @@ public void testValidateOffsetFetch() {
         // This should succeed.
         group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE);
     }
+
+    @Test
+    public void testValidateGroupDelete() {
+        Uuid fooTopicId = Uuid.randomUuid();
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+        assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, 
consumerGroup.state());
+        assertDoesNotThrow(() -> consumerGroup.validateGroupDelete());

Review Comment:
   we can do `consumerGroup::validateGroupDelete` for this along with the other 
invocations in the test



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +939,206 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+        responsePartitionCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+        );
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        responseTopicCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+        );
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offset"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offset"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsCoordinatorNotAvailableException() throws 
Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offset"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            new CoordinatorLoadInProgressException(null)
+        ));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteGroups() throws Exception {

Review Comment:
   can we add a test with three __consumer_offsets topic partitions where one 
finishes immediately, another takes a while, and the last coordinator throws an 
exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to