This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 270948bf9d7 KAFKA-19115: Utilize initialized topics info to verify 
delete share group offsets (#19431)
270948bf9d7 is described below

commit 270948bf9d70a8913792259d7801c33c6a7a3235
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Mon Apr 14 16:40:40 2025 +0530

    KAFKA-19115: Utilize initialized topics info to verify delete share group 
offsets (#19431)
    
    Currently, in the deleteShareGroupOffsets method in
    GroupCoordinatorService, the user request was simply forwarded to the
    persister without checking if the requested share partitions were
    initialized for the group or not. This PR introduces such a check to
    make sure that the persister deleteState request is only called for
    share partitions that have been initialized for the group.
    
    Reviewers: Andrew Schofield <[email protected]>, Sushant Mahajan 
<[email protected]>
---
 .../coordinator/group/GroupCoordinatorService.java | 168 +++----
 .../coordinator/group/GroupCoordinatorShard.java   | 118 +++++
 .../coordinator/group/GroupMetadataManager.java    |  49 ++
 .../group/GroupCoordinatorServiceTest.java         | 515 ++++++++++++++++-----
 .../group/GroupCoordinatorShardTest.java           | 302 ++++++++++++
 .../group/GroupMetadataManagerTest.java            | 359 ++++++++++++++
 6 files changed, 1290 insertions(+), 221 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index c00502af48f..757bba87263 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -28,7 +28,6 @@ import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
 import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
-import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
@@ -1258,45 +1257,40 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         });
     }
 
-    private void populateDeleteShareGroupOffsetsFuture(
-        DeleteShareGroupOffsetsRequestData requestData,
-        CompletableFuture<DeleteShareGroupOffsetsResponseData> future,
-        Map<Uuid, String> requestTopicIdToNameMapping,
-        List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData,
-        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
deleteShareGroupOffsetsResponseTopicList
-
+    private CompletableFuture<DeleteShareGroupOffsetsResponseData> 
persistDeleteShareGroupOffsets(
+        DeleteShareGroupStateParameters deleteStateRequestParameters,
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList
     ) {
-        DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = 
new DeleteShareGroupStateRequestData()
-            .setGroupId(requestData.groupId())
-            .setTopics(deleteShareGroupStateRequestTopicsData);
-
-        
persister.deleteState(DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData))
-            .whenComplete((result, error) -> {
-                if (error != null) {
-                    log.error("Failed to delete share group state");
-                    future.completeExceptionally(error);
-                    return;
-                }
+        return persister.deleteState(deleteStateRequestParameters)
+            .thenCompose(result -> {
                 if (result == null || result.topicsData() == null) {
                     log.error("Result is null for the delete share group 
state");
-                    future.completeExceptionally(new 
IllegalStateException("Result is null for the delete share group state"));
-                    return;
+                    Exception exception = new IllegalStateException("Result is 
null for the delete share group state");
+                    return CompletableFuture.completedFuture(
+                        
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(exception))
+                    );
                 }
                 result.topicsData().forEach(topicData ->
-                    deleteShareGroupOffsetsResponseTopicList.add(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
-                        .setTopicId(topicData.topicId())
-                        
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
-                        .setPartitions(topicData.partitions().stream().map(
-                            partitionData -> new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
-                                .setPartitionIndex(partitionData.partition())
-                                .setErrorMessage(partitionData.errorCode() == 
Errors.NONE.code() ? null : Errors.forCode(partitionData.errorCode()).message())
-                                .setErrorCode(partitionData.errorCode())
-                        ).toList())
-                    ));
+                    errorTopicResponseList.add(
+                        new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                            .setTopicId(topicData.topicId())
+                            
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
+                            .setPartitions(topicData.partitions().stream().map(
+                                partitionData -> new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                                    
.setPartitionIndex(partitionData.partition())
+                                    .setErrorMessage(partitionData.errorCode() 
== Errors.NONE.code() ? null : 
Errors.forCode(partitionData.errorCode()).message())
+                                    .setErrorCode(partitionData.errorCode())
+                            ).toList())
+                    )
+                );
 
-                future.complete(
+                return CompletableFuture.completedFuture(
                     new DeleteShareGroupOffsetsResponseData()
-                        
.setResponses(deleteShareGroupOffsetsResponseTopicList));
+                        .setResponses(errorTopicResponseList)
+                );
+            }).exceptionally(throwable -> {
+                log.error("Failed to delete share group state");
+                return 
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable));
             });
     }
 
@@ -1590,83 +1584,53 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                 
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
         }
 
-        Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
-        List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData = new 
ArrayList<>(requestData.topics().size());
-        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
deleteShareGroupOffsetsResponseTopicList = new 
ArrayList<>(requestData.topics().size());
-
-        requestData.topics().forEach(topic -> {
-            Uuid topicId = 
metadataImage.topics().topicNameToIdView().get(topic.topicName());
-            if (topicId != null) {
-                requestTopicIdToNameMapping.put(topicId, topic.topicName());
-                deleteShareGroupStateRequestTopicsData.add(new 
DeleteShareGroupStateRequestData.DeleteStateData()
-                    .setTopicId(topicId)
-                    .setPartitions(
-                        topic.partitions().stream().map(
-                            partitionIndex -> new 
DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex)
-                        ).toList()
-                    ));
-            } else {
-                deleteShareGroupOffsetsResponseTopicList.add(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
-                    .setTopicName(topic.topicName())
-                    .setPartitions(topic.partitions().stream().map(
-                        partition -> new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
-                            .setPartitionIndex(partition)
-                            
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-                            
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
-                    ).toList()));
-            }
-        });
-
-        // If the request for the persister is empty, just complete the 
operation right away.
-        if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+        if (requestData.topics() == null || requestData.topics().isEmpty()) {
             return CompletableFuture.completedFuture(
                 new DeleteShareGroupOffsetsResponseData()
-                    .setResponses(deleteShareGroupOffsetsResponseTopicList));
+            );
         }
 
-        CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new 
CompletableFuture<>();
+        return runtime.scheduleReadOperation(
+            "share-group-delete-offsets-request",
+            topicPartitionFor(groupId),
+            (coordinator, lastCommittedOffset) -> 
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)
+        )
+            .thenCompose(resultHolder -> {
+                if (resultHolder == null) {
+                    log.error("Failed to retrieve deleteState request 
parameters from group coordinator for the group {}", groupId);
+                    return CompletableFuture.completedFuture(
+                        
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.UNKNOWN_SERVER_ERROR)
+                    );
+                }
+
+                if (resultHolder.topLevelErrorCode() != Errors.NONE.code()) {
+                    return CompletableFuture.completedFuture(
+                        
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(
+                            resultHolder.topLevelErrorCode(),
+                            resultHolder.topLevelErrorMessage()
+                        )
+                    );
+                }
 
-        TopicPartition topicPartition = topicPartitionFor(groupId);
+                
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList =
+                    resultHolder.errorTopicResponseList() == null ? new 
ArrayList<>() : new ArrayList<>(resultHolder.errorTopicResponseList());
 
-        // This is done to make sure the provided group is empty. Offsets can 
be deleted only for an empty share group.
-        CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> 
describeGroupFuture =
-            runtime.scheduleReadOperation(
-                "share-group-describe",
-                topicPartition,
-                (coordinator, lastCommittedOffset) -> 
coordinator.shareGroupDescribe(List.of(groupId), lastCommittedOffset)
-            ).exceptionally(exception -> handleOperationException(
-                "share-group-describe",
-                List.of(groupId),
-                exception,
-                (error, __) -> 
ShareGroupDescribeRequest.getErrorDescribedGroupList(List.of(groupId), error),
-                log
-            ));
+                if (resultHolder.deleteStateRequestParameters() == null) {
+                    return CompletableFuture.completedFuture(
+                        new DeleteShareGroupOffsetsResponseData()
+                            .setResponses(errorTopicResponseList)
+                    );
+                }
 
-        describeGroupFuture.whenComplete((groups, throwable) -> {
-            if (throwable != null) {
-                log.error("Failed to describe the share group {}", groupId, 
throwable);
-                
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable)));
-            } else if (groups == null || groups.isEmpty()) {
-                log.error("Describe share group resulted in empty response for 
group {}", groupId);
-                
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.GROUP_ID_NOT_FOUND));
-            } else if (groups.get(0).errorCode() != Errors.NONE.code()) {
-                log.error("Failed to describe the share group {}", groupId);
-                
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(groups.get(0).errorCode(),
 groups.get(0).errorMessage()));
-            } else if (groups.get(0).members() != null && 
!groups.get(0).members().isEmpty()) {
-                log.error("Provided group {} is not empty", groupId);
-                
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.NON_EMPTY_GROUP));
-            } else {
-                populateDeleteShareGroupOffsetsFuture(
-                    requestData,
-                    future,
-                    requestTopicIdToNameMapping,
-                    deleteShareGroupStateRequestTopicsData,
-                    deleteShareGroupOffsetsResponseTopicList
+                return persistDeleteShareGroupOffsets(
+                    resultHolder.deleteStateRequestParameters(),
+                    errorTopicResponseList
                 );
-            }
-        });
-
-        return future;
+            })
+            .exceptionally(throwable -> {
+                log.error("Failed to retrieve deleteState request parameters 
from group coordinator for the group {}", groupId, throwable);
+                return 
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable));
+            });
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 812641c9459..7f9fd576fd5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -26,6 +26,9 @@ import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -131,6 +134,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -290,6 +294,69 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         }
     }
 
+    public static class DeleteShareGroupOffsetsResultHolder {
+        private final short topLevelErrorCode;
+        private final String topLevelErrorMessage;
+        private final 
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList;
+        private final DeleteShareGroupStateParameters 
deleteStateRequestParameters;
+
+        DeleteShareGroupOffsetsResultHolder(short topLevelErrorCode, String 
topLevelErrorMessage) {
+            this(topLevelErrorCode, topLevelErrorMessage, null,  null);
+        }
+
+        DeleteShareGroupOffsetsResultHolder(
+            short topLevelErrorCode,
+            String topLevelErrorMessage,
+            
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList
+        ) {
+            this(topLevelErrorCode, topLevelErrorMessage, 
errorTopicResponseList, null);
+        }
+
+        DeleteShareGroupOffsetsResultHolder(
+            short topLevelErrorCode,
+            String topLevelErrorMessage,
+            
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList,
+            DeleteShareGroupStateParameters deleteStateRequestParameters
+        ) {
+            this.topLevelErrorCode = topLevelErrorCode;
+            this.topLevelErrorMessage = topLevelErrorMessage;
+            this.errorTopicResponseList = errorTopicResponseList;
+            this.deleteStateRequestParameters = deleteStateRequestParameters;
+        }
+
+        public short topLevelErrorCode() {
+            return this.topLevelErrorCode;
+        }
+
+        public String topLevelErrorMessage() {
+            return this.topLevelErrorMessage;
+        }
+
+        public 
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList() {
+            return this.errorTopicResponseList;
+        }
+
+        public DeleteShareGroupStateParameters deleteStateRequestParameters() {
+            return this.deleteStateRequestParameters;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            DeleteShareGroupOffsetsResultHolder other = 
(DeleteShareGroupOffsetsResultHolder) o;
+            return topLevelErrorCode == other.topLevelErrorCode &&
+                Objects.equals(topLevelErrorMessage, 
other.topLevelErrorMessage) &&
+                Objects.equals(errorTopicResponseList, 
other.errorTopicResponseList) &&
+                Objects.equals(deleteStateRequestParameters, 
other.deleteStateRequestParameters);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(topLevelErrorCode, topLevelErrorMessage, 
errorTopicResponseList, deleteStateRequestParameters);
+        }
+    }
+
     /**
      * The group/offsets expiration key to schedule a timer task.
      *
@@ -613,6 +680,57 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         return new CoordinatorResult<>(records, responseMap);
     }
 
+    /**
+     * Does the following checks to make sure that a DeleteShareGroupOffsets 
request is valid and can be processed further
+     * 1. Checks whether the provided group is empty
+     * 2. Checks the requested topics are presented in the metadataImage
+     * 3. Checks the requested share partitions are initialized for the group
+     *
+     * @param groupId - The group ID
+     * @param requestData - The request data for DeleteShareGroupOffsetsRequest
+     * @return {@link DeleteShareGroupOffsetsResultHolder} an object 
containing top level error code, list of topic responses
+     *                                               and persister deleteState 
request parameters
+     */
+    public DeleteShareGroupOffsetsResultHolder shareGroupDeleteOffsetsRequest(
+        String groupId,
+        DeleteShareGroupOffsetsRequestData requestData
+    ) {
+        try {
+            ShareGroup group = groupMetadataManager.shareGroup(groupId);
+            group.validateDeleteGroup();
+
+            
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList = new ArrayList<>();
+            List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData =
+                groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(
+                    groupId,
+                    requestData,
+                    errorTopicResponseList
+                );
+
+            if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+                return new 
DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), null, 
errorTopicResponseList);
+            }
+
+            DeleteShareGroupStateRequestData deleteShareGroupStateRequestData 
= new DeleteShareGroupStateRequestData()
+                .setGroupId(requestData.groupId())
+                .setTopics(deleteShareGroupStateRequestTopicsData);
+
+            return new DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE.code(),
+                null,
+                errorTopicResponseList,
+                
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
+            );
+
+        } catch (GroupIdNotFoundException exception) {
+            log.error("groupId {} not found", groupId, exception);
+            return new 
DeleteShareGroupOffsetsResultHolder(Errors.GROUP_ID_NOT_FOUND.code(), 
exception.getMessage());
+        } catch (GroupNotEmptyException exception) {
+            log.error("Provided group {} is not empty", groupId);
+            return new 
DeleteShareGroupOffsetsResultHolder(Errors.NON_EMPTY_GROUP.code(), 
exception.getMessage());
+        }
+    }
+
     /**
      * Fetch offsets for a given set of partitions and a given group.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index f8a114d999f..2e2b02f9432 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -41,6 +41,9 @@ import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.ConsumerProtocolSubscription;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -8195,6 +8198,52 @@ public class GroupMetadataManager {
         );
     }
 
+    /**
+     * Returns a list of delete share group state request topic objects to be 
used with the persister.
+     * @param groupId - group ID of the share group
+     * @param requestData - the request data for DeleteShareGroupOffsets 
request
+     * @param errorTopicResponseList - the list of topics not found in the 
metadata image
+     * @return List of objects representing the share group state delete 
request for topics.
+     */
+    public List<DeleteShareGroupStateRequestData.DeleteStateData> 
sharePartitionsEligibleForOffsetDeletion(
+        String groupId,
+        DeleteShareGroupOffsetsRequestData requestData,
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList
+    ) {
+        List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData = new ArrayList<>();
+
+        Map<Uuid, Set<Integer>> initializedSharePartitions = 
initializedShareGroupPartitions(groupId);
+        requestData.topics().forEach(topic -> {
+            Uuid topicId = 
metadataImage.topics().topicNameToIdView().get(topic.topicName());
+            if (topicId != null) {
+                // A deleteState request to persister should only be sent with 
those topic partitions for which corresponding
+                // share partitions are initialized for the group.
+                if (initializedSharePartitions.containsKey(topicId)) {
+                    List<DeleteShareGroupStateRequestData.PartitionData> 
partitions = new ArrayList<>();
+                    topic.partitions().forEach(partition -> {
+                        if 
(initializedSharePartitions.get(topicId).contains(partition)) {
+                            partitions.add(new 
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition));
+                        }
+                    });
+                    deleteShareGroupStateRequestTopicsData.add(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                        .setTopicId(topicId)
+                        .setPartitions(partitions));
+                }
+            } else {
+                errorTopicResponseList.add(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                    .setTopicName(topic.topicName())
+                    .setPartitions(topic.partitions().stream().map(
+                        partition -> new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                            .setPartitionIndex(partition)
+                            
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                            
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+                    ).collect(Collectors.toCollection(ArrayList::new))));
+            }
+        });
+
+        return deleteShareGroupStateRequestTopicsData;
+    }
+
     /**
      * Validates the DeleteGroups request.
      *
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 6b117a0ec45..01e74ea558a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -121,7 +121,6 @@ import org.mockito.ArgumentMatchers;
 import java.net.InetAddress;
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -3298,9 +3297,11 @@ public class GroupCoordinatorServiceTest {
             .build(true);
         service.startup(() -> 1);
 
+        String groupId = "share-group-id";
+
         int partition = 1;
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("share-group-id")
+            .setGroupId(groupId)
             .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
                 .setTopicName(TOPIC_NAME)
                 .setPartitions(List.of(partition))
@@ -3317,14 +3318,30 @@ public class GroupCoordinatorServiceTest {
                         .setErrorMessage(null))))
             );
 
-        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
-            .setGroupId("share-group-id-1");
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
deleteShareGroupOffsetsResultHolder =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE.code(),
+                null,
+                List.of(),
+                DeleteShareGroupStateParameters.from(
+                    new DeleteShareGroupStateRequestData()
+                        .setGroupId(groupId)
+                        .setTopics(List.of(
+                            new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                                .setTopicId(TOPIC_ID)
+                                .setPartitions(List.of(
+                                    new 
DeleteShareGroupStateRequestData.PartitionData()
+                                        .setPartition(partition)
+                                ))
+                        ))
+                )
+            );
 
         when(runtime.scheduleReadOperation(
-            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq("share-group-delete-offsets-request"),
             ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
             ArgumentMatchers.any()
-        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
 
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
@@ -3343,16 +3360,18 @@ public class GroupCoordinatorServiceTest {
             .build(true);
         service.startup(() -> 1);
 
+        String groupId = "share-group-id";
+
         int partition = 1;
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("share-group-id")
+            .setGroupId(groupId)
             .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
                 .setTopicName(TOPIC_NAME)
                 .setPartitions(List.of(partition))
             ));
 
         DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = 
new DeleteShareGroupStateRequestData()
-            .setGroupId("share-group-id")
+            .setGroupId(groupId)
             .setTopics(List.of(new 
DeleteShareGroupStateRequestData.DeleteStateData()
                 .setTopicId(TOPIC_ID)
                 .setPartitions(List.of(new 
DeleteShareGroupStateRequestData.PartitionData()
@@ -3380,14 +3399,19 @@ public class GroupCoordinatorServiceTest {
                 )
             );
 
-        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
-            .setGroupId("share-group-id-1");
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
deleteShareGroupOffsetsResultHolder =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE.code(),
+                null,
+                List.of(),
+                
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
+            );
 
         when(runtime.scheduleReadOperation(
-            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq("share-group-delete-offsets-request"),
             ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
             ArgumentMatchers.any()
-        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
 
         DeleteShareGroupStateParameters deleteShareGroupStateParameters = 
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData);
         DeleteShareGroupStateResult deleteShareGroupStateResult = 
DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData);
@@ -3402,33 +3426,53 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void 
testDeleteShareGroupOffsetsNonexistentTopicWithDefaultPersister() throws 
InterruptedException, ExecutionException {
+    public void testDeleteShareGroupOffsetsCoordinatorNotActive() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .build();
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+            .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsMetadataImageNull() throws 
ExecutionException, InterruptedException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
-        Persister persister = mock(DefaultStatePersister.class);
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
             .setConfig(createConfig())
             .setRuntime(runtime)
-            .setPersister(persister)
             .build(true);
-        service.startup(() -> 1);
+
+        // Forcing a null Metadata Image
+        service.onNewMetadataImage(null, null);
 
         int partition = 1;
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
             .setGroupId("share-group-id")
             .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
-                .setTopicName("badtopic")
+                .setTopicName(TOPIC_NAME)
                 .setPartitions(List.of(partition))
             ));
 
         DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
-            .setResponses(
-                List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
-                    .setTopicName("badtopic")
-                    .setPartitions(List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
-                        .setPartitionIndex(partition)
-                        .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-                        
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))))
-            );
+            .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+            .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
 
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
@@ -3437,7 +3481,7 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testDeleteShareGroupOffsetsWithDefaultPersisterThrowsError() {
+    public void testDeleteShareGroupOffsetsInvalidGroupId() throws 
InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         Persister persister = mock(DefaultStatePersister.class);
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3449,31 +3493,46 @@ public class GroupCoordinatorServiceTest {
 
         int partition = 1;
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("share-group-id")
+            .setGroupId("")
             .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
                 .setTopicName(TOPIC_NAME)
                 .setPartitions(List.of(partition))
             ));
 
-        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
-            .setGroupId("share-group-id-1");
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code())
+            .setErrorMessage(Errors.INVALID_GROUP_ID.message());
 
-        when(runtime.scheduleReadOperation(
-            ArgumentMatchers.eq("share-group-describe"),
-            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
-            ArgumentMatchers.any()
-        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
 
-        when(persister.deleteState(ArgumentMatchers.any()))
-            .thenReturn(CompletableFuture.failedFuture(new Exception("Unable 
to validate delete share group state request")));
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsEmptyRequest() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id");
+
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData();
 
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
-        assertFutureThrows(Exception.class, future, "Unable to validate delete 
share group state request");
+
+        assertEquals(responseData, future.get());
     }
 
     @Test
-    public void testDeleteShareGroupOffsetsWithDefaultPersisterNullResult() {
+    public void testDeleteShareGroupOffsetsRequestThrowsError() throws 
InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         Persister persister = mock(DefaultStatePersister.class);
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3483,33 +3542,34 @@ public class GroupCoordinatorServiceTest {
             .build(true);
         service.startup(() -> 1);
 
+        String groupId = "share-group-id";
+
         int partition = 1;
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("share-group-id")
+            .setGroupId(groupId)
             .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
                 .setTopicName(TOPIC_NAME)
                 .setPartitions(List.of(partition))
             ));
 
-        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
-            .setGroupId("share-group-id-1");
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+            .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
 
         when(runtime.scheduleReadOperation(
-            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq("share-group-delete-offsets-request"),
             ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
             ArgumentMatchers.any()
-        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
-
-        when(persister.deleteState(ArgumentMatchers.any()))
-            .thenReturn(CompletableFuture.completedFuture(null));
+        
)).thenReturn(CompletableFuture.completedFuture(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())));
 
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
-        assertFutureThrows(IllegalStateException.class, future, "Result is 
null for the delete share group state");
+
+        assertEquals(responseData, future.get());
     }
 
     @Test
-    public void testDeleteShareGroupOffsetsWithDefaultPersisterNullTopicData() 
{
+    public void testDeleteShareGroupOffsetsRequestReturnsNull() throws 
InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         Persister persister = mock(DefaultStatePersister.class);
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3519,53 +3579,70 @@ public class GroupCoordinatorServiceTest {
             .build(true);
         service.startup(() -> 1);
 
+        String groupId = "share-group-id";
+
         int partition = 1;
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("share-group-id")
+            .setGroupId(groupId)
             .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
                 .setTopicName(TOPIC_NAME)
                 .setPartitions(List.of(partition))
             ));
 
-        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
-            .setGroupId("share-group-id-1");
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+            .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
 
         when(runtime.scheduleReadOperation(
-            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq("share-group-delete-offsets-request"),
             ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
             ArgumentMatchers.any()
-        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
-
-        DeleteShareGroupStateResult deleteShareGroupStateResult =
-            new 
DeleteShareGroupStateResult.Builder().setTopicsData(null).build();
-
-        when(persister.deleteState(ArgumentMatchers.any()))
-            
.thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
+        )).thenReturn(CompletableFuture.completedFuture(null));
 
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
-        assertFutureThrows(IllegalStateException.class, future, "Result is 
null for the delete share group state");
+
+        assertEquals(responseData, future.get());
     }
 
     @Test
-    public void testDeleteShareGroupOffsetsCoordinatorNotActive() throws 
ExecutionException, InterruptedException {
+    public void testDeleteShareGroupOffsetsRequestReturnsGroupIdNotFound() 
throws InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
             .setConfig(createConfig())
             .setRuntime(runtime)
-            .build();
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        String groupId = "share-group-id";
 
         int partition = 1;
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("share-group-id")
+            .setGroupId(groupId)
             .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
                 .setTopicName(TOPIC_NAME)
                 .setPartitions(List.of(partition))
             ));
 
         DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
-            .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
-            .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+            .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
deleteShareGroupOffsetsResultHolder =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.GROUP_ID_NOT_FOUND.code(),
+                Errors.GROUP_ID_NOT_FOUND.message(),
+                null,
+                null
+            );
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-delete-offsets-request"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
 
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
@@ -3574,27 +3651,43 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testDeleteShareGroupOffsetsMetadataImageNull() throws 
ExecutionException, InterruptedException {
+    public void testDeleteShareGroupOffsetsRequestReturnsGroupNotEmpty() 
throws InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
             .setConfig(createConfig())
             .setRuntime(runtime)
+            .setPersister(persister)
             .build(true);
+        service.startup(() -> 1);
 
-        // Forcing a null Metadata Image
-        service.onNewMetadataImage(null, null);
+        String groupId = "share-group-id";
 
         int partition = 1;
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("share-group-id")
+            .setGroupId(groupId)
             .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
                 .setTopicName(TOPIC_NAME)
                 .setPartitions(List.of(partition))
             ));
 
         DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
-            .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
-            .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+            .setErrorCode(Errors.NON_EMPTY_GROUP.code())
+            .setErrorMessage(Errors.NON_EMPTY_GROUP.message());
+
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
deleteShareGroupOffsetsResultHolder =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.NON_EMPTY_GROUP.code(),
+                Errors.NON_EMPTY_GROUP.message(),
+                null,
+                null
+            );
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-delete-offsets-request"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
 
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
@@ -3603,7 +3696,7 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testDeleteShareGroupOffsetsInvalidGroupId() throws 
InterruptedException, ExecutionException {
+    public void testDeleteShareGroupOffsetsRequestReturnsNullParameters() 
throws InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         Persister persister = mock(DefaultStatePersister.class);
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3613,17 +3706,31 @@ public class GroupCoordinatorServiceTest {
             .build(true);
         service.startup(() -> 1);
 
+        String groupId = "share-group-id";
+
         int partition = 1;
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("")
+            .setGroupId(groupId)
             .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
                 .setTopicName(TOPIC_NAME)
                 .setPartitions(List.of(partition))
             ));
 
-        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
-            .setErrorCode(Errors.INVALID_GROUP_ID.code())
-            .setErrorMessage(Errors.INVALID_GROUP_ID.message());
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData();
+
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
deleteShareGroupOffsetsResultHolder =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE.code(),
+                null,
+                null,
+                null
+            );
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-delete-offsets-request"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
 
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
@@ -3632,7 +3739,7 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testDeleteShareGroupOffsetsDescribeThrowsError() throws 
InterruptedException, ExecutionException {
+    public void 
testDeleteShareGroupOffsetsRequestReturnsNullParametersWithErrorTopics() throws 
InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         Persister persister = mock(DefaultStatePersister.class);
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3642,23 +3749,52 @@ public class GroupCoordinatorServiceTest {
             .build(true);
         service.startup(() -> 1);
 
+        String badTopicName = "bad-topic";
+        Uuid badTopicId = Uuid.randomUuid();
+        String groupId = "share-group-id";
+
         int partition = 1;
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("share-group-id")
-            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
-                .setTopicName(TOPIC_NAME)
-                .setPartitions(List.of(partition))
+            .setGroupId(groupId)
+            .setTopics(List.of(
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setPartitions(List.of(partition)),
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(badTopicName)
+                    .setPartitions(List.of(partition))
             ));
 
         DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
-            .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
-            .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
+            .setResponses(List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                .setTopicName(badTopicName)
+                .setTopicId(badTopicId)
+                .setPartitions(List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                    .setPartitionIndex(partition)
+                    .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                    
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+                ))));
+
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
deleteShareGroupOffsetsResultHolder =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE.code(),
+                null,
+                List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                    .setTopicName(badTopicName)
+                    .setTopicId(badTopicId)
+                    .setPartitions(List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                        
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+                    ))),
+                null
+            );
 
         when(runtime.scheduleReadOperation(
-            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq("share-group-delete-offsets-request"),
             ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
             ArgumentMatchers.any()
-        
)).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
 
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
@@ -3667,7 +3803,7 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testDeleteShareGroupOffsetsDescribeReturnsNull() throws 
InterruptedException, ExecutionException {
+    public void testDeleteShareGroupOffsetsWithDefaultPersisterThrowsError() 
throws InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         Persister persister = mock(DefaultStatePersister.class);
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3677,23 +3813,49 @@ public class GroupCoordinatorServiceTest {
             .build(true);
         service.startup(() -> 1);
 
+        String groupId = "share-group-id";
+
         int partition = 1;
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("share-group-id")
+            .setGroupId(groupId)
             .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
                 .setTopicName(TOPIC_NAME)
                 .setPartitions(List.of(partition))
             ));
 
+        Exception persisterException = new Exception("Unable to validate 
delete share group state request");
+
         DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
-            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
-            .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+            .setErrorCode(Errors.forException(persisterException).code())
+            
.setErrorMessage(Errors.forException(persisterException).message());
+
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
deleteShareGroupOffsetsResultHolder =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE.code(),
+                null,
+                List.of(),
+                DeleteShareGroupStateParameters.from(
+                    new DeleteShareGroupStateRequestData()
+                        .setGroupId(groupId)
+                        .setTopics(List.of(
+                            new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                                .setTopicId(TOPIC_ID)
+                                .setPartitions(List.of(
+                                    new 
DeleteShareGroupStateRequestData.PartitionData()
+                                        .setPartition(partition)
+                                ))
+                        ))
+                )
+            );
 
         when(runtime.scheduleReadOperation(
-            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq("share-group-delete-offsets-request"),
             ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
             ArgumentMatchers.any()
-        )).thenReturn(CompletableFuture.completedFuture(null));
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
+
+        when(persister.deleteState(ArgumentMatchers.any()))
+            .thenReturn(CompletableFuture.failedFuture(persisterException));
 
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
@@ -3702,7 +3864,7 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testDeleteShareGroupOffsetsDescribeReturnsEmpty() throws 
InterruptedException, ExecutionException {
+    public void testDeleteShareGroupOffsetsWithDefaultPersisterNullResult() 
throws InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         Persister persister = mock(DefaultStatePersister.class);
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3712,23 +3874,49 @@ public class GroupCoordinatorServiceTest {
             .build(true);
         service.startup(() -> 1);
 
+        String groupId = "share-group-id";
+
         int partition = 1;
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("share-group-id")
+            .setGroupId(groupId)
             .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
                 .setTopicName(TOPIC_NAME)
                 .setPartitions(List.of(partition))
             ));
 
+        Exception persisterException = new IllegalStateException("Result is 
null for the delete share group state");
+
         DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
-            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
-            .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+            .setErrorCode(Errors.forException(persisterException).code())
+            
.setErrorMessage(Errors.forException(persisterException).message());
+
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
deleteShareGroupOffsetsResultHolder =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE.code(),
+                null,
+                List.of(),
+                DeleteShareGroupStateParameters.from(
+                    new DeleteShareGroupStateRequestData()
+                        .setGroupId(groupId)
+                        .setTopics(List.of(
+                            new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                                .setTopicId(TOPIC_ID)
+                                .setPartitions(List.of(
+                                    new 
DeleteShareGroupStateRequestData.PartitionData()
+                                        .setPartition(partition)
+                                ))
+                        ))
+                )
+            );
 
         when(runtime.scheduleReadOperation(
-            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq("share-group-delete-offsets-request"),
             ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
             ArgumentMatchers.any()
-        
)).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
+
+        when(persister.deleteState(ArgumentMatchers.any()))
+            .thenReturn(CompletableFuture.completedFuture(null));
 
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
@@ -3737,7 +3925,7 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testDeleteShareGroupOffsetsDescribeReturnsError() throws 
InterruptedException, ExecutionException {
+    public void testDeleteShareGroupOffsetsWithDefaultPersisterNullTopicData() 
throws InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         Persister persister = mock(DefaultStatePersister.class);
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3747,27 +3935,52 @@ public class GroupCoordinatorServiceTest {
             .build(true);
         service.startup(() -> 1);
 
+        String groupId = "share-group-id";
+
         int partition = 1;
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("share-group-id")
+            .setGroupId(groupId)
             .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
                 .setTopicName(TOPIC_NAME)
                 .setPartitions(List.of(partition))
             ));
 
-        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
-            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
-            .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+        Exception persisterException = new IllegalStateException("Result is 
null for the delete share group state");
 
-        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
-            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
-            .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(Errors.forException(persisterException).code())
+            
.setErrorMessage(Errors.forException(persisterException).message());
+
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
deleteShareGroupOffsetsResultHolder =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE.code(),
+                null,
+                List.of(),
+                DeleteShareGroupStateParameters.from(
+                    new DeleteShareGroupStateRequestData()
+                        .setGroupId(groupId)
+                        .setTopics(List.of(
+                            new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                                .setTopicId(TOPIC_ID)
+                                .setPartitions(List.of(
+                                    new 
DeleteShareGroupStateRequestData.PartitionData()
+                                        .setPartition(partition)
+                                ))
+                        ))
+                )
+            );
 
         when(runtime.scheduleReadOperation(
-            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq("share-group-delete-offsets-request"),
             ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
             ArgumentMatchers.any()
-        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
+
+        DeleteShareGroupStateResult deleteShareGroupStateResult =
+            new 
DeleteShareGroupStateResult.Builder().setTopicsData(null).build();
+
+        when(persister.deleteState(ArgumentMatchers.any()))
+            
.thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
 
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
@@ -3776,7 +3989,7 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void testDeleteShareGroupOffsetsGroupIsNotEmpty() throws 
InterruptedException, ExecutionException {
+    public void testDeleteShareGroupOffsetsSuccessWithErrorTopicPartitions() 
throws InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         Persister persister = mock(DefaultStatePersister.class);
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3786,27 +3999,91 @@ public class GroupCoordinatorServiceTest {
             .build(true);
         service.startup(() -> 1);
 
+        String badTopicName = "bad-topic";
+        Uuid badTopicId = Uuid.randomUuid();
+        String groupId = "share-group-id";
+
         int partition = 1;
+
         DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
-            .setGroupId("share-group-id")
-            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
-                .setTopicName(TOPIC_NAME)
-                .setPartitions(List.of(partition))
+            .setGroupId(groupId)
+            .setTopics(List.of(
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setPartitions(List.of(partition)),
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(badTopicName)
+                    .setPartitions(List.of(partition))
             ));
 
+        DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = 
new DeleteShareGroupStateRequestData()
+            .setGroupId(groupId)
+            .setTopics(List.of(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(partition)))));
+
         DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
-            .setErrorCode(Errors.NON_EMPTY_GROUP.code())
-            .setErrorMessage(Errors.NON_EMPTY_GROUP.message());
+            .setResponses(
+                List.of(
+                    new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                        .setTopicName(badTopicName)
+                        .setTopicId(badTopicId)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                                .setPartitionIndex(partition)
+                                
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                                
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+                        )),
+                    new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                        .setTopicName(TOPIC_NAME)
+                        .setTopicId(TOPIC_ID)
+                        .setPartitions(List.of(
+                            new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                                .setPartitionIndex(partition)
+                                .setErrorCode(Errors.NONE.code())
+                                .setErrorMessage(null)
+                        ))
+                )
+            );
 
-        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
-            .setGroupId("share-group-id-1")
-            .setMembers(List.of(new ShareGroupDescribeResponseData.Member()));
+        DeleteShareGroupStateResponseData deleteShareGroupStateResponseData = 
new DeleteShareGroupStateResponseData()
+            .setResults(
+                List.of(new 
DeleteShareGroupStateResponseData.DeleteStateResult()
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
DeleteShareGroupStateResponseData.PartitionResult()
+                        .setPartition(partition)
+                        .setErrorCode(Errors.NONE.code())
+                        .setErrorMessage(null)))
+                )
+            );
+
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
deleteShareGroupOffsetsResultHolder =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE.code(),
+                null,
+                List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                    .setTopicName(badTopicName)
+                    .setTopicId(badTopicId)
+                    .setPartitions(List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                            .setPartitionIndex(partition)
+                        .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                        
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+                    ))),
+                
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
+            );
 
         when(runtime.scheduleReadOperation(
-            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq("share-group-delete-offsets-request"),
             ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
             ArgumentMatchers.any()
-        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
+
+        DeleteShareGroupStateParameters deleteShareGroupStateParameters = 
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData);
+        DeleteShareGroupStateResult deleteShareGroupStateResult = 
DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData);
+        when(persister.deleteState(
+            ArgumentMatchers.eq(deleteShareGroupStateParameters)
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
 
         CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
             
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 16665f98bf2..25af1ca34a6 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -23,6 +23,9 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
@@ -124,6 +127,7 @@ import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -1949,4 +1953,302 @@ public class GroupCoordinatorShardTest {
         verify(groupMetadataManager, times(0)).group(eq("share-group"));
         verify(groupMetadataManager, 
times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
     }
+
+    @Test
+    public void testShareGroupDeleteOffsetsRequestGroupNotFound() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        String groupId = "share-group";
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId(groupId)
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName("topic-1")
+                .setPartitions(List.of(0))
+            ));
+
+        GroupIdNotFoundException exception = new 
GroupIdNotFoundException("group Id not found");
+
+        doThrow(exception).when(groupMetadataManager).shareGroup(eq(groupId));
+
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
expectedResult =
+            new 
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.forException(exception).code(),
 exception.getMessage());
+
+        assertEquals(expectedResult, 
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
+        verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
+        // Not called because of Group not found.
+        verify(groupMetadataManager, 
times(0)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
+    }
+
+    @Test
+    public void testShareGroupDeleteOffsetsRequestNonEmptyShareGroup() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        String groupId = "share-group";
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId(groupId)
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName("topic-1")
+                .setPartitions(List.of(0))
+            ));
+
+        ShareGroup shareGroup = mock(ShareGroup.class);
+        GroupNotEmptyException exception = new GroupNotEmptyException("group 
is not empty");
+        doThrow(exception).when(shareGroup).validateDeleteGroup();
+
+        
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
+
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
expectedResult =
+            new 
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.forException(exception).code(),
 exception.getMessage());
+
+        assertEquals(expectedResult, 
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
+        verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
+        // Not called because of Group not found.
+        verify(groupMetadataManager, 
times(0)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
+    }
+
+    @Test
+    public void testShareGroupDeleteOffsetsRequestEmptyResult() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        String groupId = "share-group";
+        String topicName = "topic-1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 0;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId(groupId)
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(topicName)
+                .setPartitions(List.of(partition))
+            ));
+
+        ShareGroup shareGroup = mock(ShareGroup.class);
+        doNothing().when(shareGroup).validateDeleteGroup();
+
+        
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
+
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList = List.of(
+            new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                .setTopicName(topicName)
+                .setTopicId(topicId)
+                .setPartitions(List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                    .setPartitionIndex(partition)
+                    .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                    
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))
+        );
+
+        
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), 
eq(requestData), any()))
+            .thenAnswer(invocation -> {
+                
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
inputList = invocation.getArgument(2);
+                inputList.addAll(errorTopicResponseList);
+                return List.of();
+            });
+
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
expectedResult =
+            new 
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), 
null, errorTopicResponseList);
+
+        assertEquals(expectedResult, 
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
+        verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
+        verify(groupMetadataManager, 
times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
+    }
+
+    @Test
+    public void testShareGroupDeleteOffsetsRequestSuccess() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        String groupId = "share-group";
+        String topicName1 = "topic-1";
+        Uuid topicId1 = Uuid.randomUuid();
+        String topicName2 = "topic-2";
+        Uuid topicId2 = Uuid.randomUuid();
+        int partition = 0;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId(groupId)
+            .setTopics(List.of(
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicName1)
+                    .setPartitions(List.of(partition)),
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicName2)
+                    .setPartitions(List.of(partition))
+            ));
+
+        ShareGroup shareGroup = mock(ShareGroup.class);
+        doNothing().when(shareGroup).validateDeleteGroup();
+
+        
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
+
+        List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData =
+            List.of(
+                new DeleteShareGroupStateRequestData.DeleteStateData()
+                    .setTopicId(topicId1)
+                    .setPartitions(List.of(
+                        new DeleteShareGroupStateRequestData.PartitionData()
+                            .setPartition(partition)
+                    )),
+                new DeleteShareGroupStateRequestData.DeleteStateData()
+                    .setTopicId(topicId2)
+                    .setPartitions(List.of(
+                        new DeleteShareGroupStateRequestData.PartitionData()
+                            .setPartition(partition)
+                    ))
+            );
+
+        
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), 
eq(requestData), any()))
+            .thenReturn(deleteShareGroupStateRequestTopicsData);
+
+
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
expectedResult =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE.code(),
+                null,
+                List.of(),
+                DeleteShareGroupStateParameters.from(
+                    new DeleteShareGroupStateRequestData()
+                        .setGroupId(requestData.groupId())
+                        .setTopics(deleteShareGroupStateRequestTopicsData)
+                ));
+
+        assertEquals(expectedResult, 
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
+        verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
+        verify(groupMetadataManager, 
times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
+    }
+
+    @Test
+    public void testShareGroupDeleteOffsetsRequestSuccessWithErrorTopics() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        String groupId = "share-group";
+        String topicName1 = "topic-1";
+        Uuid topicId1 = Uuid.randomUuid();
+        String topicName2 = "topic-2";
+        Uuid topicId2 = Uuid.randomUuid();
+        int partition = 0;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId(groupId)
+            .setTopics(List.of(
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicName1)
+                    .setPartitions(List.of(partition)),
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicName2)
+                    .setPartitions(List.of(partition))
+            ));
+
+        ShareGroup shareGroup = mock(ShareGroup.class);
+        doNothing().when(shareGroup).validateDeleteGroup();
+
+        
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
+
+        List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData =
+            List.of(
+                new DeleteShareGroupStateRequestData.DeleteStateData()
+                    .setTopicId(topicId1)
+                    .setPartitions(List.of(
+                        new DeleteShareGroupStateRequestData.PartitionData()
+                            .setPartition(partition)
+                    ))
+            );
+
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList =
+            List.of(
+                new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                    .setTopicName(topicName2)
+                    .setTopicId(topicId2)
+                    .setPartitions(List.of(
+                        new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                            .setPartitionIndex(partition)
+                            
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                            
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+                    ))
+            );
+
+        
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), 
eq(requestData), any()))
+            .thenAnswer(invocation -> {
+                
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
inputList = invocation.getArgument(2);
+
+                inputList.addAll(errorTopicResponseList);
+                return deleteShareGroupStateRequestTopicsData;
+            });
+
+
+        GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder 
expectedResult =
+            new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE.code(),
+                null,
+                errorTopicResponseList,
+                DeleteShareGroupStateParameters.from(
+                    new DeleteShareGroupStateRequestData()
+                        .setGroupId(requestData.groupId())
+                        .setTopics(deleteShareGroupStateRequestTopicsData)
+                ));
+
+        assertEquals(expectedResult, 
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
+        verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
+        verify(groupMetadataManager, 
times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 35574d35ca3..4665e81e9be 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -43,6 +43,9 @@ import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.ConsumerProtocolAssignment;
 import org.apache.kafka.common.message.ConsumerProtocolSubscription;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -20804,6 +20807,362 @@ public class GroupMetadataManagerTest {
         assertRecordsEquals(expectedRecords, records);
     }
 
+    @Test
+    public void testSharePartitionsEligibleForOffsetDeletionSuccess() {
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .build();
+
+        String groupId = "share-group";
+        Uuid memberId = Uuid.randomUuid();
+        String topicName1 = "topic-1";
+        String topicName2 = "topic-2";
+        Uuid topicId1 = Uuid.randomUuid();
+        Uuid topicId2 = Uuid.randomUuid();
+
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(topicId1, topicName1, 3)
+            .addTopic(topicId2, topicName2, 2)
+            .build();
+
+        context.groupMetadataManager.onNewMetadataImage(image, 
mock(MetadataDelta.class));
+
+        context.shareGroupHeartbeat(
+            new ShareGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId.toString())
+                .setMemberEpoch(0)
+                .setSubscribedTopicNames(List.of(topicName1, topicName2)));
+
+        context.groupMetadataManager.replay(
+            new ShareGroupStatePartitionMetadataKey()
+                .setGroupId(groupId),
+            new ShareGroupStatePartitionMetadataValue()
+                .setInitializingTopics(List.of())
+                .setInitializedTopics(List.of(
+                    new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+                        .setTopicId(topicId1)
+                        .setTopicName(topicName1)
+                        .setPartitions(List.of(0, 1, 2)),
+                    new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+                        .setTopicId(topicId2)
+                        .setTopicName(topicName2)
+                        .setPartitions(List.of(0, 1))
+                ))
+                .setDeletingTopics(List.of())
+        );
+
+        List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult 
= List.of(
+            new DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(topicId1)
+                .setPartitions(List.of(
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(0),
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(1),
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(2)
+                )),
+            new DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(topicId2)
+                .setPartitions(List.of(
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(0),
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(1)
+                ))
+        );
+
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId(groupId)
+            .setTopics(List.of(
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicName1)
+                    .setPartitions(List.of(0, 1, 2)),
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicName2)
+                    .setPartitions(List.of(0, 1))
+            ));
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList = new ArrayList<>();
+
+        List<DeleteShareGroupStateRequestData.DeleteStateData> result =
+            
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, 
requestData, errorTopicResponseList);
+
+        assertTrue(errorTopicResponseList.isEmpty());
+        assertEquals(expectedResult, result);
+    }
+
+    @Test
+    public void testSharePartitionsEligibleForOffsetDeletionErrorTopics() {
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .build();
+
+        String groupId = "share-group";
+        Uuid memberId = Uuid.randomUuid();
+        String topicName1 = "topic-1";
+        String topicName2 = "topic-2";
+        Uuid topicId1 = Uuid.randomUuid();
+
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(topicId1, topicName1, 3)
+            .build();
+
+        context.groupMetadataManager.onNewMetadataImage(image, 
mock(MetadataDelta.class));
+
+        context.shareGroupHeartbeat(
+            new ShareGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId.toString())
+                .setMemberEpoch(0)
+                .setSubscribedTopicNames(List.of(topicName1)));
+
+        context.groupMetadataManager.replay(
+            new ShareGroupStatePartitionMetadataKey()
+                .setGroupId(groupId),
+            new ShareGroupStatePartitionMetadataValue()
+                .setInitializingTopics(List.of())
+                .setInitializedTopics(List.of(
+                    new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+                        .setTopicId(topicId1)
+                        .setTopicName(topicName1)
+                        .setPartitions(List.of(0, 1, 2))
+                ))
+                .setDeletingTopics(List.of())
+        );
+
+        List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult 
= List.of(
+            new DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(topicId1)
+                .setPartitions(List.of(
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(0),
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(1),
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(2)
+                ))
+        );
+
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId(groupId)
+            .setTopics(List.of(
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicName1)
+                    .setPartitions(List.of(0, 1, 2)),
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicName2)
+                    .setPartitions(List.of(0, 1))
+            ));
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList = new ArrayList<>();
+
+        List<DeleteShareGroupStateRequestData.DeleteStateData> result =
+            
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, 
requestData, errorTopicResponseList);
+
+        assertEquals(
+            List.of(
+                new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                    .setTopicName(topicName2)
+                    .setPartitions(List.of(
+                        new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                            .setPartitionIndex(0)
+                            
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                            
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()),
+                        new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                            .setPartitionIndex(1)
+                            
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                            
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+                    ))
+            ),
+            errorTopicResponseList
+        );
+        assertEquals(expectedResult, result);
+    }
+
+    @Test
+    public void 
testSharePartitionsEligibleForOffsetDeletionUninitializedTopics() {
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .build();
+
+        String groupId = "share-group";
+        Uuid memberId = Uuid.randomUuid();
+        String topicName1 = "topic-1";
+        String topicName2 = "topic-2";
+        Uuid topicId1 = Uuid.randomUuid();
+        Uuid topicId2 = Uuid.randomUuid();
+
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(topicId1, topicName1, 3)
+            .addTopic(topicId2, topicName2, 2)
+            .build();
+
+        context.groupMetadataManager.onNewMetadataImage(image, 
mock(MetadataDelta.class));
+
+        context.shareGroupHeartbeat(
+            new ShareGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId.toString())
+                .setMemberEpoch(0)
+                .setSubscribedTopicNames(List.of(topicName1, topicName2)));
+
+        context.groupMetadataManager.replay(
+            new ShareGroupStatePartitionMetadataKey()
+                .setGroupId(groupId),
+            new ShareGroupStatePartitionMetadataValue()
+                .setInitializedTopics(List.of(
+                    new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+                        .setTopicId(topicId1)
+                        .setTopicName(topicName1)
+                        .setPartitions(List.of(0, 1, 2))
+                ))
+                .setInitializingTopics(List.of(
+                    new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+                        .setTopicId(topicId2)
+                        .setTopicName(topicName2)
+                        .setPartitions(List.of(0, 1))
+                ))
+                .setDeletingTopics(List.of())
+        );
+
+        List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult 
= List.of(
+            new DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(topicId1)
+                .setPartitions(List.of(
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(0),
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(1),
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(2)
+                ))
+        );
+
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId(groupId)
+            .setTopics(List.of(
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicName1)
+                    .setPartitions(List.of(0, 1, 2)),
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicName2)
+                    .setPartitions(List.of(0, 1))
+            ));
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList = new ArrayList<>();
+
+        List<DeleteShareGroupStateRequestData.DeleteStateData> result =
+            
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, 
requestData, errorTopicResponseList);
+
+        assertTrue(errorTopicResponseList.isEmpty());
+        assertEquals(expectedResult, result);
+    }
+
+    @Test
+    public void 
testSharePartitionsEligibleForOffsetDeletionUninitializedAndErrorTopics() {
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .build();
+
+        String groupId = "share-group";
+        Uuid memberId = Uuid.randomUuid();
+        String topicName1 = "topic-1";
+        String topicName2 = "topic-2";
+        String topicName3 = "topic-3";
+        Uuid topicId1 = Uuid.randomUuid();
+        Uuid topicId2 = Uuid.randomUuid();
+
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(topicId1, topicName1, 3)
+            .addTopic(topicId2, topicName2, 2)
+            .build();
+
+        context.groupMetadataManager.onNewMetadataImage(image, 
mock(MetadataDelta.class));
+
+        context.shareGroupHeartbeat(
+            new ShareGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId.toString())
+                .setMemberEpoch(0)
+                .setSubscribedTopicNames(List.of(topicName1, topicName2)));
+
+        context.groupMetadataManager.replay(
+            new ShareGroupStatePartitionMetadataKey()
+                .setGroupId(groupId),
+            new ShareGroupStatePartitionMetadataValue()
+                .setInitializedTopics(List.of(
+                    new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+                        .setTopicId(topicId1)
+                        .setTopicName(topicName1)
+                        .setPartitions(List.of(0, 1, 2))
+                ))
+                .setInitializingTopics(List.of(
+                    new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+                        .setTopicId(topicId2)
+                        .setTopicName(topicName2)
+                        .setPartitions(List.of(0, 1))
+                ))
+                .setDeletingTopics(List.of())
+        );
+
+        List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult 
= List.of(
+            new DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(topicId1)
+                .setPartitions(List.of(
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(0),
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(1),
+                    new DeleteShareGroupStateRequestData.PartitionData()
+                        .setPartition(2)
+                ))
+        );
+
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId(groupId)
+            .setTopics(List.of(
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicName1)
+                    .setPartitions(List.of(0, 1, 2)),
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicName2)
+                    .setPartitions(List.of(0, 1)),
+                new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicName3)
+                    .setPartitions(List.of(0, 1))
+            ));
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList = new ArrayList<>();
+
+        List<DeleteShareGroupStateRequestData.DeleteStateData> result =
+            
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, 
requestData, errorTopicResponseList);
+
+        assertEquals(
+            List.of(
+                new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                    .setTopicName(topicName3)
+                    .setPartitions(List.of(
+                        new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                            .setPartitionIndex(0)
+                            
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                            
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()),
+                        new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                            .setPartitionIndex(1)
+                            
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                            
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+                    ))
+            ),
+            errorTopicResponseList
+        );
+        assertEquals(expectedResult, result);
+    }
+
     @Test
     public void testShareGroupHeartbeatInitializeOnPartitionUpdate() {
         MockPartitionAssignor assignor = new MockPartitionAssignor("range");

Reply via email to