This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 51481741963 KAFKA-16718-2/n: KafkaAdminClient and GroupCoordinator 
implementation for DeleteShareGroupOffsets RPC (#18976)
51481741963 is described below

commit 51481741963ee87b6a928d0fa9670efa0ee25a66
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Wed Apr 9 12:01:06 2025 +0530

    KAFKA-16718-2/n: KafkaAdminClient and GroupCoordinator implementation for 
DeleteShareGroupOffsets RPC (#18976)
    
    This PR contains the implementation of KafkaAdminClient and
    GroupCoordinator for DeleteShareGroupOffsets RPC.
    
    - Added `deleteShareGroupOffsets` to `KafkaAdminClient`
    - Added implementation for `handleDeleteShareGroupOffsetsRequest` in
    `KafkaApis.scala`
    - Added `deleteShareGroupOffsets` to `GroupCoordinator` as well.
    internally this makes use of `persister.deleteState` to persist the
    changes in share coordinator
    
    Reviewers: Andrew Schofield <[email protected]>, Sushant Mahajan 
<[email protected]>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  35 +-
 .../admin/DeleteShareGroupOffsetsOptions.java      |  31 ++
 .../admin/DeleteShareGroupOffsetsResult.java       | 102 ++++
 .../kafka/clients/admin/ForwardingAdmin.java       |   5 +
 .../kafka/clients/admin/KafkaAdminClient.java      |  11 +-
 .../internals/DeleteShareGroupOffsetsHandler.java  | 184 +++++++
 .../requests/DeleteShareGroupOffsetsRequest.java   |  37 +-
 .../requests/DeleteShareGroupOffsetsResponse.java  |   1 +
 .../common/requests/ShareGroupDescribeRequest.java |   1 +
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 154 ++++++
 .../kafka/clients/admin/MockAdminClient.java       |   5 +
 core/src/main/scala/kafka/server/KafkaApis.scala   |  63 ++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 354 ++++++++++++++
 .../kafka/coordinator/group/GroupCoordinator.java  |  16 +
 .../coordinator/group/GroupCoordinatorService.java | 153 +++++-
 .../group/GroupCoordinatorServiceTest.java         | 532 +++++++++++++++++++++
 .../persister/DeleteShareGroupStateParameters.java |  13 +
 .../TestingMetricsInterceptingAdminClient.java     |   7 +
 18 files changed, 1680 insertions(+), 24 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 3bd122c46d8..bad2dd8a86c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1947,13 +1947,28 @@ public interface Admin extends AutoCloseable {
     }
 
     /**
-     * Delete share groups from the cluster with the default options.
+     * Delete offsets for a set of partitions in a share group.
      *
-     * @param groupIds Collection of share group ids which are to be deleted.
-     * @return The DeleteShareGroupsResult.
+     * @param groupId The group for which to delete offsets.
+     * @param partitions The topic-partitions.
+     * @param options The options to use when deleting offsets in a share 
group.
+     * @return The DeleteShareGroupOffsetsResult.
      */
-    default DeleteShareGroupsResult deleteShareGroups(Collection<String> 
groupIds) {
-        return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
+    DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, 
Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options);
+
+    /**
+     * Delete offsets for a set of partitions in a share group with the 
default options.
+     *
+     * <p>
+     * This is a convenience method for {@link 
#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} with 
default options.
+     * See the overload for more details.
+     *
+     * @param groupId The group for which to delete offsets.
+     * @param partitions The topic-partitions.
+     * @return The DeleteShareGroupOffsetsResult.
+     */
+    default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String 
groupId, Set<TopicPartition> partitions) {
+        return deleteShareGroupOffsets(groupId, partitions, new 
DeleteShareGroupOffsetsOptions());
     }
 
     /**
@@ -1965,6 +1980,16 @@ public interface Admin extends AutoCloseable {
      */
     DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, 
DeleteShareGroupsOptions options);
 
+    /**
+     * Delete share groups from the cluster with the default options.
+     *
+     * @param groupIds Collection of share group ids which are to be deleted.
+     * @return The DeleteShareGroupsResult.
+     */
+    default DeleteShareGroupsResult deleteShareGroups(Collection<String> 
groupIds) {
+        return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
+    }
+
     /**
      * Describe streams groups in the cluster.
      *
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsOptions.java
new file mode 100644
index 00000000000..d5a3601315c
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Set;
+
+/**
+ * Options for the {@link Admin#deleteShareGroupOffsets(String, Set, 
DeleteShareGroupOffsetsOptions)} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class DeleteShareGroupOffsetsOptions extends 
AbstractOptions<DeleteShareGroupOffsetsOptions> {
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsResult.java
new file mode 100644
index 00000000000..d611881a813
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsResult.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The result of the {@link Admin#deleteShareGroupOffsets(String, Set, 
DeleteShareGroupOffsetsOptions)} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class DeleteShareGroupOffsetsResult {
+
+    private final KafkaFuture<Map<TopicPartition, ApiException>> future;
+    private final Set<TopicPartition> partitions;
+
+    DeleteShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, 
ApiException>> future, Set<TopicPartition> partitions) {
+        this.future = future;
+        this.partitions = partitions;
+    }
+
+    /**
+     * Return a future which succeeds only if all the deletions succeed.
+     * If not, the first partition error shall be returned.
+     */
+    public KafkaFuture<Void> all() {
+        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+
+        this.future.whenComplete((topicPartitions, throwable) -> {
+            if (throwable != null) {
+                result.completeExceptionally(throwable);
+            } else {
+                for (TopicPartition partition : partitions) {
+                    if (maybeCompleteExceptionally(topicPartitions, partition, 
result)) {
+                        return;
+                    }
+                }
+                result.complete(null);
+            }
+        });
+        return result;
+    }
+
+    /**
+     * Return a future which can be used to check the result for a given 
partition.
+     */
+    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
+        if (!partitions.contains(partition)) {
+            throw new IllegalArgumentException("Partition " + partition + " 
was not included in the original request");
+        }
+        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+
+        this.future.whenComplete((topicPartitions, throwable) -> {
+            if (throwable != null) {
+                result.completeExceptionally(throwable);
+            } else if (!maybeCompleteExceptionally(topicPartitions, partition, 
result)) {
+                result.complete(null);
+            }
+        });
+        return result;
+    }
+
+    private boolean maybeCompleteExceptionally(Map<TopicPartition, 
ApiException> partitionLevelErrors,
+                                               TopicPartition partition,
+                                               KafkaFutureImpl<Void> result) {
+        Throwable exception;
+        if (!partitionLevelErrors.containsKey(partition)) {
+            exception = new IllegalArgumentException("Offset deletion result 
for partition \"" + partition + "\" was not included in the response");
+        } else {
+            exception = partitionLevelErrors.get(partition);
+        }
+
+        if (exception != null) {
+            result.completeExceptionally(exception);
+            return true;
+        } else {
+            return false;
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index 8b7db2f04f2..1cb53f127fc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -333,6 +333,11 @@ public class ForwardingAdmin implements Admin {
         return delegate.listShareGroupOffsets(groupSpecs, options);
     }
 
+    @Override
+    public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String 
groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions 
options) {
+        return delegate.deleteShareGroupOffsets(groupId, partitions, options);
+    }
+
     @Override
     public DeleteShareGroupsResult deleteShareGroups(Collection<String> 
groupIds, DeleteShareGroupsOptions options) {
         return delegate.deleteShareGroups(groupIds, options);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e85dcf21e03..4e37d527d76 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -50,6 +50,7 @@ import 
org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import 
org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
 import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
+import org.apache.kafka.clients.admin.internals.DeleteShareGroupOffsetsHandler;
 import org.apache.kafka.clients.admin.internals.DeleteShareGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
@@ -3841,6 +3842,14 @@ public class KafkaAdminClient extends AdminClient {
         return new ListShareGroupOffsetsResult(future.all());
     }
 
+    @Override
+    public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String 
groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions 
options) {
+        SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, 
ApiException>> future = DeleteShareGroupOffsetsHandler.newFuture(groupId);
+        DeleteShareGroupOffsetsHandler handler = new 
DeleteShareGroupOffsetsHandler(groupId, partitions, logContext);
+        invokeDriver(handler, future, options.timeoutMs);
+        return new 
DeleteShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), 
partitions);
+    }
+
     @Override
     public DescribeStreamsGroupsResult describeStreamsGroups(final 
Collection<String> groupIds,
                                                              final 
DescribeStreamsGroupsOptions options) {
@@ -3851,7 +3860,7 @@ public class KafkaAdminClient extends AdminClient {
         return new DescribeStreamsGroupsResult(future.all().entrySet().stream()
             .collect(Collectors.toMap(entry -> entry.getKey().idValue, 
Map.Entry::getValue)));
     }
-    
+
     @Override
     public DescribeClassicGroupsResult describeClassicGroups(final 
Collection<String> groupIds,
                                                              final 
DescribeClassicGroupsOptions options) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java
new file mode 100644
index 00000000000..c5911e4303e
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupOffsetsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is the handler for {@link 
KafkaAdminClient#deleteShareGroupOffsets(String, Set, 
DeleteShareGroupOffsetsOptions)} call
+ */
+public class DeleteShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, ApiException>> {
+
+    private final CoordinatorKey groupId;
+
+    private final Logger log;
+
+    private final Set<TopicPartition> partitions;
+
+    private final CoordinatorStrategy lookupStrategy;
+
+    public DeleteShareGroupOffsetsHandler(String groupId, Set<TopicPartition> 
partitions, LogContext logContext) {
+        this.groupId = CoordinatorKey.byGroupId(groupId);
+        this.partitions = partitions;
+        this.log = logContext.logger(DeleteShareGroupOffsetsHandler.class);
+        this.lookupStrategy = new 
CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
+    }
+
+    @Override
+    public String apiName() {
+        return "deleteShareGroupOffsets";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, ApiException>> newFuture(String groupId) {
+        return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+    }
+
+    private void validateKeys(Set<CoordinatorKey> groupIds) {
+        if (!groupIds.equals(Collections.singleton(groupId))) {
+            throw new IllegalArgumentException("Received unexpected group ids 
" + groupIds +
+                " (expected only " + Collections.singleton(groupId) + ")");
+        }
+    }
+
+    @Override
+    DeleteShareGroupOffsetsRequest.Builder buildBatchedRequest(int brokerId, 
Set<CoordinatorKey> groupIds) {
+        validateKeys(groupIds);
+
+        final 
List<DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic> 
topics =
+            new ArrayList<>();
+        
partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic,
 topicPartitions) -> topics.add(
+            new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(topic)
+                .setPartitions(topicPartitions.stream()
+                    .map(TopicPartition::partition)
+                    .collect(Collectors.toList())
+                )
+        ));
+
+        return new DeleteShareGroupOffsetsRequest.Builder(
+            new DeleteShareGroupOffsetsRequestData()
+                .setGroupId(groupId.idValue)
+                .setTopics(topics),
+            true
+        );
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, Map<TopicPartition, ApiException>> 
handleResponse(
+        Node coordinator,
+        Set<CoordinatorKey> groupIds,
+        AbstractResponse abstractResponse
+    ) {
+        validateKeys(groupIds);
+
+        final DeleteShareGroupOffsetsResponse response = 
(DeleteShareGroupOffsetsResponse) abstractResponse;
+
+        final Errors groupError = Errors.forCode(response.data().errorCode());
+        final String groupErrorMessage = response.data().errorMessage();
+
+        if (groupError != Errors.NONE) {
+            final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+            final Map<CoordinatorKey, Throwable> groupsFailed = new 
HashMap<>();
+            handleGroupError(groupId, groupError, groupErrorMessage, 
groupsFailed, groupsToUnmap);
+
+            return new ApiResult<>(Collections.emptyMap(), groupsFailed, new 
ArrayList<>(groupsToUnmap));
+        } else {
+            final Map<TopicPartition, ApiException> partitionResults = new 
HashMap<>();
+            response.data().responses().forEach(topic ->
+                topic.partitions().forEach(partition -> {
+                    if (partition.errorCode() != Errors.NONE.code()) {
+                        final Errors partitionError = 
Errors.forCode(partition.errorCode());
+                        final String partitionErrorMessage = 
partition.errorMessage();
+                        log.debug("DeleteShareGroupOffsets request for group 
id {}, topic {} and partition {} failed and returned error {}." + 
partitionErrorMessage,
+                            groupId.idValue, topic.topicName(), 
partition.partitionIndex(), partitionError);
+                    }
+                    partitionResults.put(
+                        new TopicPartition(topic.topicName(), 
partition.partitionIndex()),
+                        
Errors.forCode(partition.errorCode()).exception(partition.errorMessage())
+                    );
+                })
+            );
+
+            return ApiResult.completed(groupId, partitionResults);
+        }
+    }
+
+    private void handleGroupError(
+        CoordinatorKey groupId,
+        Errors error,
+        String errorMessage,
+        Map<CoordinatorKey, Throwable> failed,
+        Set<CoordinatorKey> groupsToUnmap
+    ) {
+        switch (error) {
+            case COORDINATOR_LOAD_IN_PROGRESS:
+            case REBALANCE_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("DeleteShareGroupOffsets request for group id {} 
failed because the coordinator" +
+                    " is still in the process of loading state. Will retry. " 
+ errorMessage, groupId.idValue);
+                break;
+            case COORDINATOR_NOT_AVAILABLE:
+            case NOT_COORDINATOR:
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("DeleteShareGroupOffsets request for group id {} 
returned error {}. Will rediscover the coordinator and retry. " + errorMessage,
+                    groupId.idValue, error);
+                groupsToUnmap.add(groupId);
+                break;
+            case INVALID_GROUP_ID:
+            case GROUP_ID_NOT_FOUND:
+            case NON_EMPTY_GROUP:
+            case INVALID_REQUEST:
+            case UNKNOWN_SERVER_ERROR:
+            case KAFKA_STORAGE_ERROR:
+            case GROUP_AUTHORIZATION_FAILED:
+                log.debug("DeleteShareGroupOffsets request for group id {} 
failed due to error {}. " + errorMessage, groupId.idValue, error);
+                failed.put(groupId, error.exception(errorMessage));
+                break;
+            default:
+                log.error("DeleteShareGroupOffsets request for group id {} 
failed due to unexpected error {}. " + errorMessage, groupId.idValue, error);
+                failed.put(groupId, error.exception(errorMessage));
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
index e52e665e909..f96bad8d178 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
@@ -23,10 +23,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
 public class DeleteShareGroupOffsetsRequest extends AbstractRequest {
     public static class Builder extends 
AbstractRequest.Builder<DeleteShareGroupOffsetsRequest> {
 
@@ -59,19 +55,20 @@ public class DeleteShareGroupOffsetsRequest extends 
AbstractRequest {
         this.data = data;
     }
 
+    DeleteShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, 
Errors error) {
+        return getErrorResponse(throttleTimeMs, error.code(), error.message());
+    }
+
+    public DeleteShareGroupOffsetsResponse getErrorResponse(int 
throttleTimeMs, short errorCode, String errorMessage) {
+        return new DeleteShareGroupOffsetsResponse(new 
DeleteShareGroupOffsetsResponseData()
+            .setThrottleTimeMs(throttleTimeMs)
+            .setErrorMessage(errorMessage)
+            .setErrorCode(errorCode));
+    }
+
     @Override
     public DeleteShareGroupOffsetsResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
-        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
results = new ArrayList<>();
-        data.topics().forEach(
-            topicResult -> results.add(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
-                .setTopicName(topicResult.topicName())
-                .setPartitions(topicResult.partitions().stream()
-                    .map(partitionData -> new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
-                        .setPartitionIndex(partitionData)
-                        .setErrorCode(Errors.forException(e).code()))
-                    .collect(Collectors.toList()))));
-        return new DeleteShareGroupOffsetsResponse(new 
DeleteShareGroupOffsetsResponseData()
-            .setResponses(results));
+        return getErrorResponse(throttleTimeMs, Errors.forException(e));
     }
 
     @Override
@@ -85,4 +82,14 @@ public class DeleteShareGroupOffsetsRequest extends 
AbstractRequest {
             version
         );
     }
+
+    public static DeleteShareGroupOffsetsResponseData 
getErrorDeleteResponseData(Errors error) {
+        return getErrorDeleteResponseData(error.code(), error.message());
+    }
+
+    public static DeleteShareGroupOffsetsResponseData 
getErrorDeleteResponseData(short errorCode, String errorMessage) {
+        return new DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(errorCode)
+            .setErrorMessage(errorMessage);
+    }
 }
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsResponse.java
index 49e4d64a790..192d11b0b73 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsResponse.java
@@ -42,6 +42,7 @@ public class DeleteShareGroupOffsetsResponse extends 
AbstractResponse {
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> counts = new EnumMap<>(Errors.class);
+        updateErrorCounts(counts, Errors.forCode(data.errorCode()));
         data.responses().forEach(
             topicResult -> topicResult.partitions().forEach(
                 partitionResult -> updateErrorCounts(counts, 
Errors.forCode(partitionResult.errorCode()))
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeRequest.java
index e95d33385a4..14dd429b8a4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeRequest.java
@@ -94,6 +94,7 @@ public class ShareGroupDescribeRequest extends 
AbstractRequest {
                 .map(groupId -> new 
ShareGroupDescribeResponseData.DescribedGroup()
                         .setGroupId(groupId)
                         .setErrorCode(error.code())
+                        .setErrorMessage(error.message())
                 ).collect(Collectors.toList());
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index a478092cfc6..c4988d13165 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -106,6 +106,8 @@ import 
org.apache.kafka.common.message.DeleteGroupsResponseData;
 import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
 import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
 import org.apache.kafka.common.message.DeleteRecordsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
 import org.apache.kafka.common.message.DeleteTopicsResponseData;
 import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResultCollection;
@@ -191,6 +193,8 @@ import 
org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupOffsetsResponse;
 import org.apache.kafka.common.requests.DeleteTopicsRequest;
 import org.apache.kafka.common.requests.DeleteTopicsResponse;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
@@ -10780,6 +10784,156 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testDeleteShareGroupOffsetsOptionsWithBatchedApi() throws 
Exception {
+        final Cluster cluster = mockCluster(3, 0);
+        final Time time = new MockTime();
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster,
+            AdminClientConfig.RETRIES_CONFIG, "0")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            final Set<TopicPartition> partitions = Collections.singleton(new 
TopicPartition("A", 0));
+            final DeleteShareGroupOffsetsOptions options = new 
DeleteShareGroupOffsetsOptions();
+
+            env.adminClient().deleteShareGroupOffsets(GROUP_ID, partitions, 
options);
+
+            final MockClient mockClient = env.kafkaClient();
+            waitForRequest(mockClient, ApiKeys.DELETE_SHARE_GROUP_OFFSETS);
+
+            ClientRequest clientRequest = mockClient.requests().peek();
+            assertNotNull(clientRequest);
+            DeleteShareGroupOffsetsRequestData data = 
((DeleteShareGroupOffsetsRequest.Builder) 
clientRequest.requestBuilder()).build().data();
+            assertEquals(GROUP_ID, data.groupId());
+            assertEquals(1, data.topics().size());
+            assertEquals(Collections.singletonList("A"),
+                
data.topics().stream().map(DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic::topicName).collect(Collectors.toList()));
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsets() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            DeleteShareGroupOffsetsResponseData data = new 
DeleteShareGroupOffsetsResponseData().setResponses(
+                List.of(
+                    new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new
 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0),
 new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(1))),
+                    new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new
 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
+                )
+            );
+
+            TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
+            TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
+            TopicPartition barPartition0 = new TopicPartition("bar", 0);
+            TopicPartition zooTopicPartition0 = new TopicPartition("zoo", 0);
+
+            env.kafkaClient().prepareResponse(new 
DeleteShareGroupOffsetsResponse(data));
+            final DeleteShareGroupOffsetsResult result = 
env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0, 
fooTopicPartition1, barPartition0));
+
+            assertNull(result.all().get());
+            assertNull(result.partitionResult(fooTopicPartition0).get());
+            assertNull(result.partitionResult(fooTopicPartition1).get());
+            assertNull(result.partitionResult(barPartition0).get());
+            assertThrows(IllegalArgumentException.class, () -> 
result.partitionResult(zooTopicPartition0));
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsEmpty() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            DeleteShareGroupOffsetsResponseData data = new 
DeleteShareGroupOffsetsResponseData().setResponses(
+                Collections.emptyList()
+            );
+            env.kafkaClient().prepareResponse(new 
DeleteShareGroupOffsetsResponse(data));
+
+            final DeleteShareGroupOffsetsResult result = 
env.adminClient().deleteShareGroupOffsets(GROUP_ID, Collections.emptySet());
+            assertDoesNotThrow(() -> result.all().get());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsWithErrorInGroup() throws Exception 
{
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            DeleteShareGroupOffsetsResponseData data = new 
DeleteShareGroupOffsetsResponseData()
+                .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())
+                .setErrorMessage(Errors.GROUP_AUTHORIZATION_FAILED.message());
+
+            TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
+            TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
+            TopicPartition barTopicPartition0 = new TopicPartition("bar", 0);
+
+            env.kafkaClient().prepareResponse(new 
DeleteShareGroupOffsetsResponse(data));
+            final DeleteShareGroupOffsetsResult result = 
env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0, 
fooTopicPartition1, barTopicPartition0));
+
+            
TestUtils.assertFutureThrows(Errors.GROUP_AUTHORIZATION_FAILED.exception().getClass(),
 result.all());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsWithErrorInOnePartition() throws 
Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            DeleteShareGroupOffsetsResponseData data = new 
DeleteShareGroupOffsetsResponseData().setResponses(
+                List.of(
+                    new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new
 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0),
 new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.KAFKA_STORAGE_ERROR.code()).setErrorMessage(Errors.KAFKA_STORAGE_ERROR.message()))),
+                    new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new
 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
+                )
+            );
+
+            TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
+            TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
+            TopicPartition barTopicPartition0 = new TopicPartition("bar", 0);
+
+            env.kafkaClient().prepareResponse(new 
DeleteShareGroupOffsetsResponse(data));
+            final DeleteShareGroupOffsetsResult result = 
env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0, 
fooTopicPartition1, barTopicPartition0));
+
+            
TestUtils.assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), 
result.all());
+            assertNull(result.partitionResult(fooTopicPartition0).get());
+            
TestUtils.assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), 
result.partitionResult(fooTopicPartition1));
+            assertNull(result.partitionResult(barTopicPartition0).get());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsWithPartitionNotPresentInResult() 
throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            DeleteShareGroupOffsetsResponseData data = new 
DeleteShareGroupOffsetsResponseData().setResponses(
+                List.of(
+                    new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new
 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0),
 new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(1))),
+                    new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new
 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
+                )
+            );
+
+            TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
+            TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
+            TopicPartition barTopicPartition0 = new TopicPartition("bar", 0);
+            TopicPartition barTopicPartition1 = new TopicPartition("bar", 1);
+
+            env.kafkaClient().prepareResponse(new 
DeleteShareGroupOffsetsResponse(data));
+            final DeleteShareGroupOffsetsResult result = 
env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0, 
fooTopicPartition1, barTopicPartition0));
+
+            assertDoesNotThrow(() -> result.all().get());
+            assertThrows(IllegalArgumentException.class, () -> 
result.partitionResult(barTopicPartition1));
+            assertNull(result.partitionResult(barTopicPartition0).get());
+        }
+    }
+
     private static StreamsGroupDescribeResponseData 
makeFullStreamsGroupDescribeResponse() {
         StreamsGroupDescribeResponseData data;
         StreamsGroupDescribeResponseData.TaskIds activeTasks1 = new 
StreamsGroupDescribeResponseData.TaskIds()
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index f58548465d5..f9c2be19b07 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1429,6 +1429,11 @@ public class MockAdminClient extends AdminClient {
         throw new UnsupportedOperationException("Not implemented yet");
     }
 
+    @Override
+    public synchronized DeleteShareGroupOffsetsResult 
deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, 
DeleteShareGroupOffsetsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
     @Override
     public synchronized DeleteShareGroupsResult 
deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions 
options) {
         throw new UnsupportedOperationException("Not implemented yet");
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 30f53f3d26b..5894d3a40d3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -32,6 +32,8 @@ import 
org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE
 import org.apache.kafka.common.internals.{FatalExitError, Topic}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult,
 AddPartitionsToTxnResultCollection}
 import 
org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult,
 DeleteRecordsTopicResult}
+import 
org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic
+import 
org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic
 import 
org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
 import 
org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse,
 ListOffsetsTopicResponse}
@@ -75,6 +77,7 @@ import java.util.concurrent.{CompletableFuture, 
ConcurrentHashMap}
 import java.util.stream.Collectors
 import java.util.{Collections, Optional}
 import scala.annotation.nowarn
+import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Map, Seq, Set, mutable}
 import scala.jdk.CollectionConverters._
@@ -3625,8 +3628,64 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleDeleteShareGroupOffsetsRequest(request: RequestChannel.Request): 
Unit = {
     val deleteShareGroupOffsetsRequest = 
request.body[DeleteShareGroupOffsetsRequest]
-    requestHelper.sendMaybeThrottle(request, 
deleteShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    val groupId = deleteShareGroupOffsetsRequest.data.groupId
+
+    if (!isShareGroupProtocolEnabled) {
+      requestHelper.sendMaybeThrottle(request, 
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 Errors.UNSUPPORTED_VERSION.exception))
+      return
+    } else if (!authHelper.authorize(request.context, DELETE, GROUP, groupId)) 
{
+      requestHelper.sendMaybeThrottle(request, 
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      return
+    }
+
+    val deleteShareGroupOffsetsResponseTopics: 
util.List[DeleteShareGroupOffsetsResponseTopic] = new 
util.ArrayList[DeleteShareGroupOffsetsResponseTopic]()
+
+    val authorizedTopics: util.List[DeleteShareGroupOffsetsRequestTopic] =
+      new util.ArrayList[DeleteShareGroupOffsetsRequestTopic]
+
+    deleteShareGroupOffsetsRequest.data.topics.forEach{ topic =>
+      if (!authHelper.authorize(request.context, READ, TOPIC, 
topic.topicName)) {
+        deleteShareGroupOffsetsResponseTopics.add(
+          new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+            .setTopicName(topic.topicName)
+            .setPartitions(topic.partitions.map(partition => {
+              new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                .setPartitionIndex(partition)
+                .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+                .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
+            }).toList.asJava)
+        )
+      } else {
+        authorizedTopics.add(topic)
+      }
+    }
+
+    if (authorizedTopics.isEmpty) {
+      requestHelper.sendMaybeThrottle(request, new 
DeleteShareGroupOffsetsResponse(new DeleteShareGroupOffsetsResponseData()))
+      return
+    }
+
+    groupCoordinator.deleteShareGroupOffsets(
+      request.context,
+      new 
DeleteShareGroupOffsetsRequestData().setGroupId(groupId).setTopics(authorizedTopics)
+    ).handle[Unit] {(responseData, exception) => {
+      if (exception != null) {
+        requestHelper.sendMaybeThrottle(request, 
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 exception))
+      } else if (responseData.errorCode() != Errors.NONE.code) {
+        requestHelper.sendMaybeThrottle(
+          request,
+          
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 responseData.errorCode(), responseData.errorMessage())
+        )
+      } else {
+        responseData.responses.forEach { topic => {
+          deleteShareGroupOffsetsResponseTopics.add(topic)
+        }}
+        val deleteShareGroupStateResponse = new 
DeleteShareGroupOffsetsResponse(new DeleteShareGroupOffsetsResponseData()
+          .setResponses(deleteShareGroupOffsetsResponseTopics))
+        requestHelper.sendMaybeThrottle(request, deleteShareGroupStateResponse)
+      }
+    }}
   }
 
   // Visible for Testing
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e490be540e2..e933b1985c7 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -43,6 +43,8 @@ import 
org.apache.kafka.common.message.ApiMessageType.ListenerType
 import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{DescribedGroup,
 TopicPartitions}
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
+import 
org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic
+import 
org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.{DeleteShareGroupOffsetsResponsePartition,
 DeleteShareGroupOffsetsResponseTopic}
 import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.{DescribeShareGroupOffsetsRequestGroup,
 DescribeShareGroupOffsetsRequestTopic}
 import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.{DescribeShareGroupOffsetsResponseGroup,
 DescribeShareGroupOffsetsResponsePartition, 
DescribeShareGroupOffsetsResponseTopic}
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource
 => IAlterConfigsResource, AlterConfigsResourceCollection => 
IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, 
AlterableConfigCollection => IAlterableConfigCollection}
@@ -11809,6 +11811,358 @@ class KafkaApisTest extends Logging {
     assertEquals(describeShareGroupOffsetsResponse, response.data)
   }
 
+  @Test
+  def testDeleteShareGroupOffsetsReturnsUnsupportedVersion(): Unit = {
+    val deleteShareGroupOffsetsRequest = new 
DeleteShareGroupOffsetsRequestData()
+      .setGroupId("group")
+      .setTopics(util.List.of(new 
DeleteShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1))))
+
+    val requestChannelRequest = buildRequest(new 
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest, 
true).build())
+    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val response = 
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+    response.data.responses.forEach(topic => 
topic.partitions.forEach(partition => 
assertEquals(Errors.UNSUPPORTED_VERSION.code, partition.errorCode)))
+  }
+
+  @Test
+  def testDeleteShareGroupOffsetsRequestsGroupAuthorizationFailed(): Unit = {
+    val deleteShareGroupOffsetsRequest = new 
DeleteShareGroupOffsetsRequestData()
+      .setGroupId("group")
+      .setTopics(util.List.of(new 
DeleteShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1))))
+
+    val requestChannelRequest = buildRequest(new 
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest, 
true).build)
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(util.List.of(AuthorizationResult.DENIED))
+    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
+      authorizer = Some(authorizer),
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val response = 
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
+  }
+
+  @Test
+  def testDeleteShareGroupOffsetsRequestsTopicAuthorizationFailed(): Unit = {
+
+    def buildExpectedActionsTopic(topic: String): util.List[Action] = {
+      val pattern = new ResourcePattern(ResourceType.TOPIC, topic, 
PatternType.LITERAL)
+      val action = new Action(AclOperation.READ, pattern, 1, true, true)
+      Collections.singletonList(action)
+    }
+
+    def buildExpectedActionsGroup(topic: String): util.List[Action] = {
+      val pattern = new ResourcePattern(ResourceType.GROUP, topic, 
PatternType.LITERAL)
+      val action = new Action(AclOperation.DELETE, pattern, 1, true, true)
+      Collections.singletonList(action)
+    }
+
+    val groupId = "group"
+
+    val topicName1 = "topic-1"
+    val topicId1 = Uuid.randomUuid
+    val topicName2 = "topic-2"
+    val topicId2 = Uuid.randomUuid
+    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName1, 2, topicId = topicId1)
+    addTopicToMetadataCache(topicName2, 2, topicId = topicId2)
+
+    val deleteShareGroupOffsetsRequestTopic1 = new 
DeleteShareGroupOffsetsRequestTopic()
+      .setTopicName(topicName1)
+      .setPartitions(util.List.of(0, 1))
+
+    val deleteShareGroupOffsetsRequestTopic2 = new 
DeleteShareGroupOffsetsRequestTopic()
+      .setTopicName(topicName2)
+      .setPartitions(util.List.of(0, 1))
+
+    val deleteShareGroupOffsetsRequestData = new 
DeleteShareGroupOffsetsRequestData()
+      .setGroupId(groupId)
+      .setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic1, 
deleteShareGroupOffsetsRequestTopic2))
+
+    val deleteShareGroupOffsetsGroupCoordinatorRequestData = new 
DeleteShareGroupOffsetsRequestData()
+      .setGroupId(groupId)
+      .setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic2))
+
+    val requestChannelRequest = buildRequest(new 
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData, 
true).build)
+
+    val resultFuture = new 
CompletableFuture[DeleteShareGroupOffsetsResponseData]
+    when(groupCoordinator.deleteShareGroupOffsets(
+      requestChannelRequest.context,
+      deleteShareGroupOffsetsGroupCoordinatorRequestData
+    )).thenReturn(resultFuture)
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(buildExpectedActionsGroup(groupId))))
+      .thenReturn(util.List.of(AuthorizationResult.ALLOWED))
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(buildExpectedActionsTopic(topicName1))))
+      .thenReturn(util.List.of(AuthorizationResult.DENIED))
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(buildExpectedActionsTopic(topicName2))))
+      .thenReturn(util.List.of(AuthorizationResult.ALLOWED))
+
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
+      authorizer = Some(authorizer)
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val deleteShareGroupOffsetsResponseData = new 
DeleteShareGroupOffsetsResponseData()
+      .setErrorMessage(null)
+      .setErrorCode(Errors.NONE.code())
+      .setResponses(util.List.of(
+        new DeleteShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName2)
+          .setTopicId(topicId2)
+          .setPartitions(util.List.of(
+            new DeleteShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorMessage(null)
+              .setErrorCode(Errors.NONE.code()),
+            new DeleteShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(1)
+              .setErrorMessage(null)
+              .setErrorCode(Errors.NONE.code())
+          ))
+      ))
+
+    val expectedResponseTopics: 
util.List[DeleteShareGroupOffsetsResponseTopic] = new 
util.ArrayList[DeleteShareGroupOffsetsResponseTopic]()
+
+    expectedResponseTopics.add(
+      new DeleteShareGroupOffsetsResponseTopic()
+        .setTopicName(topicName1)
+        .setPartitions(util.List.of(
+          new DeleteShareGroupOffsetsResponsePartition()
+            .setPartitionIndex(0)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message()),
+          new DeleteShareGroupOffsetsResponsePartition()
+            .setPartitionIndex(1)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
+        ))
+    )
+
+    deleteShareGroupOffsetsResponseData.responses.forEach{ topic => {
+      expectedResponseTopics.add(topic)
+    }}
+
+    val expectedResponseData: DeleteShareGroupOffsetsResponseData = new 
DeleteShareGroupOffsetsResponseData()
+      .setErrorCode(Errors.NONE.code())
+      .setErrorMessage(null)
+      .setResponses(expectedResponseTopics)
+
+    resultFuture.complete(deleteShareGroupOffsetsResponseData)
+    val response = 
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+    assertEquals(expectedResponseData, response.data)
+  }
+
+  @Test
+  def testDeleteShareGroupOffsetsRequestSuccess(): Unit = {
+    val topicName1 = "topic-1"
+    val topicId1 = Uuid.randomUuid
+    val topicName2 = "topic-2"
+    val topicId2 = Uuid.randomUuid
+    val topicName3 = "topic-3"
+    val topicId3 = Uuid.randomUuid
+    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+    addTopicToMetadataCache(topicName2, 2, topicId = topicId2)
+    addTopicToMetadataCache(topicName3, 3, topicId = topicId3)
+
+    val deleteShareGroupOffsetsRequestTopic1 = new 
DeleteShareGroupOffsetsRequestTopic()
+      .setTopicName(topicName1)
+      .setPartitions(util.List.of(0))
+
+    val deleteShareGroupOffsetsRequestTopic2 = new 
DeleteShareGroupOffsetsRequestTopic()
+      .setTopicName(topicName2)
+      .setPartitions(util.List.of(0, 1))
+
+    val deleteShareGroupOffsetsRequestTopic3 = new 
DeleteShareGroupOffsetsRequestTopic()
+      .setTopicName(topicName3)
+      .setPartitions(util.List.of(0, 1, 2))
+
+    val deleteShareGroupOffsetsRequestData = new 
DeleteShareGroupOffsetsRequestData()
+      .setGroupId("group")
+      .setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic1, 
deleteShareGroupOffsetsRequestTopic2, deleteShareGroupOffsetsRequestTopic3))
+
+    val requestChannelRequest = buildRequest(new 
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData, 
true).build)
+
+    val resultFuture = new 
CompletableFuture[DeleteShareGroupOffsetsResponseData]
+    when(groupCoordinator.deleteShareGroupOffsets(
+      requestChannelRequest.context,
+      deleteShareGroupOffsetsRequestData
+    )).thenReturn(resultFuture)
+
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val deleteShareGroupOffsetsResponseData = new 
DeleteShareGroupOffsetsResponseData()
+      .setErrorMessage(null)
+      .setErrorCode(Errors.NONE.code())
+      .setResponses(util.List.of(
+        new DeleteShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName1)
+          .setTopicId(topicId1)
+          .setPartitions(util.List.of(
+            new DeleteShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorMessage(null)
+              .setErrorCode(Errors.NONE.code())
+          )),
+        new DeleteShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName2)
+          .setTopicId(topicId2)
+          .setPartitions(util.List.of(
+            new DeleteShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorMessage(null)
+              .setErrorCode(Errors.NONE.code()),
+            new DeleteShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(1)
+              .setErrorMessage(null)
+              .setErrorCode(Errors.NONE.code())
+          )),
+        new DeleteShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName3)
+          .setTopicId(topicId3)
+          .setPartitions(util.List.of(
+            new DeleteShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorMessage(null)
+              .setErrorCode(Errors.NONE.code()),
+            new DeleteShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(1)
+              .setErrorMessage(null)
+              .setErrorCode(Errors.NONE.code()),
+            new DeleteShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(2)
+              .setErrorMessage(null)
+              .setErrorCode(Errors.NONE.code())
+          ))
+      ))
+
+    resultFuture.complete(deleteShareGroupOffsetsResponseData)
+    val response = 
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+    assertEquals(deleteShareGroupOffsetsResponseData, response.data)
+  }
+
+  @Test
+  def testDeleteShareGroupOffsetsRequestGroupCoordinatorThrowsError(): Unit = {
+    val topicName1 = "topic-1"
+    val topicId1 = Uuid.randomUuid
+    val topicName2 = "topic-2"
+    val topicId2 = Uuid.randomUuid
+    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+    addTopicToMetadataCache(topicName2, 2, topicId = topicId2)
+
+    val deleteShareGroupOffsetsRequestTopic1 = new 
DeleteShareGroupOffsetsRequestTopic()
+      .setTopicName(topicName1)
+      .setPartitions(util.List.of(0))
+
+    val deleteShareGroupOffsetsRequestTopic2 = new 
DeleteShareGroupOffsetsRequestTopic()
+      .setTopicName(topicName2)
+      .setPartitions(util.List.of(0, 1))
+
+    val deleteShareGroupOffsetsRequestData = new 
DeleteShareGroupOffsetsRequestData()
+      .setGroupId("group")
+      .setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic1, 
deleteShareGroupOffsetsRequestTopic2))
+
+    val requestChannelRequest = buildRequest(new 
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData, 
true).build)
+
+    when(groupCoordinator.deleteShareGroupOffsets(
+      requestChannelRequest.context,
+      deleteShareGroupOffsetsRequestData
+    
)).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception))
+
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val deleteShareGroupOffsetsResponseData = new 
DeleteShareGroupOffsetsResponseData()
+      .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
+      .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+
+    val response = 
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+    assertEquals(deleteShareGroupOffsetsResponseData, response.data)
+  }
+
+  @Test
+  def testDeleteShareGroupOffsetsRequestGroupCoordinatorErrorResponse(): Unit 
= {
+    val topicName1 = "topic-1"
+    val topicId1 = Uuid.randomUuid
+    val topicName2 = "topic-2"
+    val topicId2 = Uuid.randomUuid
+    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+    addTopicToMetadataCache(topicName2, 2, topicId = topicId2)
+
+    val deleteShareGroupOffsetsRequestTopic1 = new 
DeleteShareGroupOffsetsRequestTopic()
+      .setTopicName(topicName1)
+      .setPartitions(util.List.of(0))
+
+    val deleteShareGroupOffsetsRequestTopic2 = new 
DeleteShareGroupOffsetsRequestTopic()
+      .setTopicName(topicName2)
+      .setPartitions(util.List.of(0, 1))
+
+    val deleteShareGroupOffsetsRequestData = new 
DeleteShareGroupOffsetsRequestData()
+      .setGroupId("group")
+      .setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic1, 
deleteShareGroupOffsetsRequestTopic2))
+
+    val requestChannelRequest = buildRequest(new 
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData, 
true).build)
+
+    val groupCoordinatorResponse: DeleteShareGroupOffsetsResponseData = new 
DeleteShareGroupOffsetsResponseData()
+      .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+      .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
+
+    when(groupCoordinator.deleteShareGroupOffsets(
+      requestChannelRequest.context,
+      deleteShareGroupOffsetsRequestData
+    )).thenReturn(CompletableFuture.completedFuture(groupCoordinatorResponse))
+
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val deleteShareGroupOffsetsResponseData = new 
DeleteShareGroupOffsetsResponseData()
+      .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
+      .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+
+    val response = 
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+    assertEquals(deleteShareGroupOffsetsResponseData, response.data)
+  }
+
+  @Test
+  def testDeleteShareGroupOffsetsRequestEmptyTopicsSuccess(): Unit = {
+    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+
+    val deleteShareGroupOffsetsRequest = new 
DeleteShareGroupOffsetsRequestData()
+      .setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest, 
true).build)
+
+    val resultFuture = new 
CompletableFuture[DeleteShareGroupOffsetsResponseData]
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val deleteShareGroupOffsetsResponse = new 
DeleteShareGroupOffsetsResponseData()
+
+    resultFuture.complete(deleteShareGroupOffsetsResponse)
+    val response = 
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+    assertEquals(deleteShareGroupOffsetsResponse, response.data)
+  }
+
   @Test
   def testWriteShareGroupStateSuccess(): Unit = {
     val topicId = Uuid.randomUuid();
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index c1072aa8d86..98d33e9f254 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -21,6 +21,8 @@ import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
@@ -311,6 +313,20 @@ public interface GroupCoordinator {
         
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
request
     );
 
+    /**
+     * Delete the Share Group Offsets for a given group.
+     *
+     * @param context           The request context
+     * @param request           The DeleteShareGroupOffsetsRequestGroup 
request.
+     *
+     * @return  A future yielding the results.
+     *          The error codes of the response are set to indicate the errors 
occurred during the execution.
+     */
+    CompletableFuture<DeleteShareGroupOffsetsResponseData> 
deleteShareGroupOffsets(
+        RequestContext context,
+        DeleteShareGroupOffsetsRequestData request
+    );
+
     /**
      * Commit offsets for a given Group.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 310bb64bde8..97bc7e69622 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -26,6 +26,9 @@ import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
@@ -59,6 +62,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest;
 import org.apache.kafka.common.requests.DeleteGroupsRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
 import org.apache.kafka.common.requests.DescribeGroupsRequest;
 import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
@@ -932,7 +936,8 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
     @Override
     public CompletableFuture<List<DescribedGroup>> shareGroupDescribe(
         RequestContext context,
-        List<String> groupIds) {
+        List<String> groupIds
+    ) {
         if (!isActive.get()) {
             return 
CompletableFuture.completedFuture(ShareGroupDescribeRequest.getErrorDescribedGroupList(
                 groupIds,
@@ -1243,6 +1248,48 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         });
     }
 
+    private void populateDeleteShareGroupOffsetsFuture(
+        DeleteShareGroupOffsetsRequestData requestData,
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future,
+        Map<Uuid, String> requestTopicIdToNameMapping,
+        List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData,
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
deleteShareGroupOffsetsResponseTopicList
+
+    ) {
+        DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = 
new DeleteShareGroupStateRequestData()
+            .setGroupId(requestData.groupId())
+            .setTopics(deleteShareGroupStateRequestTopicsData);
+
+        
persister.deleteState(DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData))
+            .whenComplete((result, error) -> {
+                if (error != null) {
+                    log.error("Failed to delete share group state");
+                    future.completeExceptionally(error);
+                    return;
+                }
+                if (result == null || result.topicsData() == null) {
+                    log.error("Result is null for the delete share group 
state");
+                    future.completeExceptionally(new 
IllegalStateException("Result is null for the delete share group state"));
+                    return;
+                }
+                result.topicsData().forEach(topicData ->
+                    deleteShareGroupOffsetsResponseTopicList.add(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                        .setTopicId(topicData.topicId())
+                        
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
+                        .setPartitions(topicData.partitions().stream().map(
+                            partitionData -> new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                                .setPartitionIndex(partitionData.partition())
+                                .setErrorMessage(partitionData.errorCode() == 
Errors.NONE.code() ? null : Errors.forCode(partitionData.errorCode()).message())
+                                .setErrorCode(partitionData.errorCode())
+                        ).toList())
+                    ));
+
+                future.complete(
+                    new DeleteShareGroupOffsetsResponseData()
+                        
.setResponses(deleteShareGroupOffsetsResponseTopicList));
+            });
+    }
+
     /**
      * See {@link GroupCoordinator#fetchOffsets(RequestContext, 
OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
      */
@@ -1508,6 +1555,110 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         return future;
     }
 
+    /**
+     * See {@link GroupCoordinator#deleteShareGroupOffsets(RequestContext, 
DeleteShareGroupOffsetsRequestData)}.
+     */
+    @Override
+    public CompletableFuture<DeleteShareGroupOffsetsResponseData> 
deleteShareGroupOffsets(
+        RequestContext context,
+        DeleteShareGroupOffsetsRequestData requestData
+    ) {
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(
+                
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
+        }
+
+        if (metadataImage == null) {
+            return CompletableFuture.completedFuture(
+                
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
+        }
+
+        String groupId = requestData.groupId();
+
+        if (!isGroupIdNotEmpty(groupId)) {
+            return CompletableFuture.completedFuture(
+                
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
+        }
+
+        Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
+        List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData = new 
ArrayList<>(requestData.topics().size());
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
deleteShareGroupOffsetsResponseTopicList = new 
ArrayList<>(requestData.topics().size());
+
+        requestData.topics().forEach(topic -> {
+            Uuid topicId = 
metadataImage.topics().topicNameToIdView().get(topic.topicName());
+            if (topicId != null) {
+                requestTopicIdToNameMapping.put(topicId, topic.topicName());
+                deleteShareGroupStateRequestTopicsData.add(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                    .setTopicId(topicId)
+                    .setPartitions(
+                        topic.partitions().stream().map(
+                            partitionIndex -> new 
DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex)
+                        ).toList()
+                    ));
+            } else {
+                deleteShareGroupOffsetsResponseTopicList.add(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                    .setTopicName(topic.topicName())
+                    .setPartitions(topic.partitions().stream().map(
+                        partition -> new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                            .setPartitionIndex(partition)
+                            
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                            
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+                    ).toList()));
+            }
+        });
+
+        // If the request for the persister is empty, just complete the 
operation right away.
+        if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+            return CompletableFuture.completedFuture(
+                new DeleteShareGroupOffsetsResponseData()
+                    .setResponses(deleteShareGroupOffsetsResponseTopicList));
+        }
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new 
CompletableFuture<>();
+
+        TopicPartition topicPartition = topicPartitionFor(groupId);
+
+        // This is done to make sure the provided group is empty. Offsets can 
be deleted only for an empty share group.
+        CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> 
describeGroupFuture =
+            runtime.scheduleReadOperation(
+                "share-group-describe",
+                topicPartition,
+                (coordinator, lastCommittedOffset) -> 
coordinator.shareGroupDescribe(List.of(groupId), lastCommittedOffset)
+            ).exceptionally(exception -> handleOperationException(
+                "share-group-describe",
+                List.of(groupId),
+                exception,
+                (error, __) -> 
ShareGroupDescribeRequest.getErrorDescribedGroupList(List.of(groupId), error),
+                log
+            ));
+
+        describeGroupFuture.whenComplete((groups, throwable) -> {
+            if (throwable != null) {
+                log.error("Failed to describe the share group {}", groupId, 
throwable);
+                
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable)));
+            } else if (groups == null || groups.isEmpty()) {
+                log.error("Describe share group resulted in empty response for 
group {}", groupId);
+                
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.GROUP_ID_NOT_FOUND));
+            } else if (groups.get(0).errorCode() != Errors.NONE.code()) {
+                log.error("Failed to describe the share group {}", groupId);
+                
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(groups.get(0).errorCode(),
 groups.get(0).errorMessage()));
+            } else if (groups.get(0).members() != null && 
!groups.get(0).members().isEmpty()) {
+                log.error("Provided group {} is not empty", groupId);
+                
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.NON_EMPTY_GROUP));
+            } else {
+                populateDeleteShareGroupOffsetsFuture(
+                    requestData,
+                    future,
+                    requestTopicIdToNameMapping,
+                    deleteShareGroupStateRequestTopicsData,
+                    deleteShareGroupOffsetsResponseTopicList
+                );
+            }
+        });
+
+        return future;
+    }
+
     /**
      * See {@link GroupCoordinator#commitOffsets(RequestContext, 
OffsetCommitRequestData, BufferSupplier)}.
      */
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 26e3cbd9469..6b117a0ec45 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -42,6 +42,10 @@ import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
@@ -117,6 +121,7 @@ import org.mockito.ArgumentMatchers;
 import java.net.InetAddress;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -2789,6 +2794,7 @@ public class GroupCoordinatorServiceTest {
             List.of(new ShareGroupDescribeResponseData.DescribedGroup()
                 .setGroupId("share-group-id")
                 .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                .setErrorMessage(Errors.COORDINATOR_LOAD_IN_PROGRESS.message())
             ),
             future.get()
         );
@@ -2816,6 +2822,7 @@ public class GroupCoordinatorServiceTest {
             List.of(new ShareGroupDescribeResponseData.DescribedGroup()
                 .setGroupId("share-group-id")
                 .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message())
             ),
             future.get()
         );
@@ -3282,6 +3289,531 @@ public class GroupCoordinatorServiceTest {
         assertEquals(responseData, future.get());
     }
 
+    @Test
+    public void testDeleteShareGroupOffsetsWithNoOpPersister() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setResponses(
+                List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
+                        .setErrorMessage(null))))
+            );
+
+        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("share-group-id-1");
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsWithDefaultPersister() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = 
new DeleteShareGroupStateRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(partition)))));
+
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setResponses(
+                List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setErrorCode(Errors.NONE.code())
+                        .setErrorMessage(null))))
+            );
+
+        DeleteShareGroupStateResponseData deleteShareGroupStateResponseData = 
new DeleteShareGroupStateResponseData()
+            .setResults(
+                List.of(new 
DeleteShareGroupStateResponseData.DeleteStateResult()
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
DeleteShareGroupStateResponseData.PartitionResult()
+                        .setPartition(partition)
+                        .setErrorCode(Errors.NONE.code())
+                        .setErrorMessage(null)))
+                )
+            );
+
+        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("share-group-id-1");
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+        DeleteShareGroupStateParameters deleteShareGroupStateParameters = 
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData);
+        DeleteShareGroupStateResult deleteShareGroupStateResult = 
DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData);
+        when(persister.deleteState(
+            ArgumentMatchers.eq(deleteShareGroupStateParameters)
+        
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void 
testDeleteShareGroupOffsetsNonexistentTopicWithDefaultPersister() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName("badtopic")
+                .setPartitions(List.of(partition))
+            ));
+
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setResponses(
+                List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                    .setTopicName("badtopic")
+                    .setPartitions(List.of(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                        
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))))
+            );
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsWithDefaultPersisterThrowsError() {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("share-group-id-1");
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+        when(persister.deleteState(ArgumentMatchers.any()))
+            .thenReturn(CompletableFuture.failedFuture(new Exception("Unable 
to validate delete share group state request")));
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+        assertFutureThrows(Exception.class, future, "Unable to validate delete 
share group state request");
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsWithDefaultPersisterNullResult() {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("share-group-id-1");
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+        when(persister.deleteState(ArgumentMatchers.any()))
+            .thenReturn(CompletableFuture.completedFuture(null));
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+        assertFutureThrows(IllegalStateException.class, future, "Result is 
null for the delete share group state");
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsWithDefaultPersisterNullTopicData() 
{
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("share-group-id-1");
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+        DeleteShareGroupStateResult deleteShareGroupStateResult =
+            new 
DeleteShareGroupStateResult.Builder().setTopicsData(null).build();
+
+        when(persister.deleteState(ArgumentMatchers.any()))
+            
.thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+        assertFutureThrows(IllegalStateException.class, future, "Result is 
null for the delete share group state");
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsCoordinatorNotActive() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .build();
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+            .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsMetadataImageNull() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .build(true);
+
+        // Forcing a null Metadata Image
+        service.onNewMetadataImage(null, null);
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+            .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsInvalidGroupId() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code())
+            .setErrorMessage(Errors.INVALID_GROUP_ID.message());
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsDescribeThrowsError() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+            .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsDescribeReturnsNull() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+            .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(null));
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsDescribeReturnsEmpty() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+            .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsDescribeReturnsError() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+            .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+
+        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
+            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+            .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsGroupIsNotEmpty() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DeleteShareGroupOffsetsRequestData requestData = new 
DeleteShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DeleteShareGroupOffsetsResponseData responseData = new 
DeleteShareGroupOffsetsResponseData()
+            .setErrorCode(Errors.NON_EMPTY_GROUP.code())
+            .setErrorMessage(Errors.NON_EMPTY_GROUP.message());
+
+        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("share-group-id-1")
+            .setMembers(List.of(new ShareGroupDescribeResponseData.Member()));
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+        CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+            
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
     @Test
     public void testPersisterInitializeSuccess() {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DeleteShareGroupStateParameters.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DeleteShareGroupStateParameters.java
index 8ca27760709..fa6ae7da934 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DeleteShareGroupStateParameters.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DeleteShareGroupStateParameters.java
@@ -20,6 +20,7 @@ package org.apache.kafka.server.share.persister;
 import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
@@ -64,4 +65,16 @@ public class DeleteShareGroupStateParameters implements 
PersisterParameters {
             return new 
DeleteShareGroupStateParameters(groupTopicPartitionData);
         }
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) return false;
+        DeleteShareGroupStateParameters that = 
(DeleteShareGroupStateParameters) o;
+        return Objects.equals(groupTopicPartitionData, 
that.groupTopicPartitionData);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(groupTopicPartitionData);
+    }
 }
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index 2005da2d2ce..d6684810c47 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -57,6 +57,8 @@ import 
org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
 import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
 import org.apache.kafka.clients.admin.DeleteRecordsOptions;
 import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
 import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
 import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
 import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsOptions;
@@ -473,6 +475,11 @@ public class TestingMetricsInterceptingAdminClient extends 
AdminClient {
         return adminDelegate.listShareGroupOffsets(groupSpecs, options);
     }
 
+    @Override
+    public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(final String 
groupId, final Set<TopicPartition> partitions, final 
DeleteShareGroupOffsetsOptions options) {
+        return adminDelegate.deleteShareGroupOffsets(groupId, partitions, 
options);
+    }
+
     @Override
     public DeleteShareGroupsResult deleteShareGroups(final Collection<String> 
groupIds, final DeleteShareGroupsOptions options) {
         return adminDelegate.deleteShareGroups(groupIds, options);

Reply via email to