This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 51481741963 KAFKA-16718-2/n: KafkaAdminClient and GroupCoordinator
implementation for DeleteShareGroupOffsets RPC (#18976)
51481741963 is described below
commit 51481741963ee87b6a928d0fa9670efa0ee25a66
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Wed Apr 9 12:01:06 2025 +0530
KAFKA-16718-2/n: KafkaAdminClient and GroupCoordinator implementation for
DeleteShareGroupOffsets RPC (#18976)
This PR contains the implementation of KafkaAdminClient and
GroupCoordinator for DeleteShareGroupOffsets RPC.
- Added `deleteShareGroupOffsets` to `KafkaAdminClient`
- Added implementation for `handleDeleteShareGroupOffsetsRequest` in
`KafkaApis.scala`
- Added `deleteShareGroupOffsets` to `GroupCoordinator` as well.
internally this makes use of `persister.deleteState` to persist the
changes in share coordinator
Reviewers: Andrew Schofield <[email protected]>, Sushant Mahajan
<[email protected]>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 35 +-
.../admin/DeleteShareGroupOffsetsOptions.java | 31 ++
.../admin/DeleteShareGroupOffsetsResult.java | 102 ++++
.../kafka/clients/admin/ForwardingAdmin.java | 5 +
.../kafka/clients/admin/KafkaAdminClient.java | 11 +-
.../internals/DeleteShareGroupOffsetsHandler.java | 184 +++++++
.../requests/DeleteShareGroupOffsetsRequest.java | 37 +-
.../requests/DeleteShareGroupOffsetsResponse.java | 1 +
.../common/requests/ShareGroupDescribeRequest.java | 1 +
.../kafka/clients/admin/KafkaAdminClientTest.java | 154 ++++++
.../kafka/clients/admin/MockAdminClient.java | 5 +
core/src/main/scala/kafka/server/KafkaApis.scala | 63 ++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 354 ++++++++++++++
.../kafka/coordinator/group/GroupCoordinator.java | 16 +
.../coordinator/group/GroupCoordinatorService.java | 153 +++++-
.../group/GroupCoordinatorServiceTest.java | 532 +++++++++++++++++++++
.../persister/DeleteShareGroupStateParameters.java | 13 +
.../TestingMetricsInterceptingAdminClient.java | 7 +
18 files changed, 1680 insertions(+), 24 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 3bd122c46d8..bad2dd8a86c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1947,13 +1947,28 @@ public interface Admin extends AutoCloseable {
}
/**
- * Delete share groups from the cluster with the default options.
+ * Delete offsets for a set of partitions in a share group.
*
- * @param groupIds Collection of share group ids which are to be deleted.
- * @return The DeleteShareGroupsResult.
+ * @param groupId The group for which to delete offsets.
+ * @param partitions The topic-partitions.
+ * @param options The options to use when deleting offsets in a share
group.
+ * @return The DeleteShareGroupOffsetsResult.
*/
- default DeleteShareGroupsResult deleteShareGroups(Collection<String>
groupIds) {
- return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
+ DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options);
+
+ /**
+ * Delete offsets for a set of partitions in a share group with the
default options.
+ *
+ * <p>
+ * This is a convenience method for {@link
#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} with
default options.
+ * See the overload for more details.
+ *
+ * @param groupId The group for which to delete offsets.
+ * @param partitions The topic-partitions.
+ * @return The DeleteShareGroupOffsetsResult.
+ */
+ default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String
groupId, Set<TopicPartition> partitions) {
+ return deleteShareGroupOffsets(groupId, partitions, new
DeleteShareGroupOffsetsOptions());
}
/**
@@ -1965,6 +1980,16 @@ public interface Admin extends AutoCloseable {
*/
DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds,
DeleteShareGroupsOptions options);
+ /**
+ * Delete share groups from the cluster with the default options.
+ *
+ * @param groupIds Collection of share group ids which are to be deleted.
+ * @return The DeleteShareGroupsResult.
+ */
+ default DeleteShareGroupsResult deleteShareGroups(Collection<String>
groupIds) {
+ return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
+ }
+
/**
* Describe streams groups in the cluster.
*
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsOptions.java
new file mode 100644
index 00000000000..d5a3601315c
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Set;
+
+/**
+ * Options for the {@link Admin#deleteShareGroupOffsets(String, Set,
DeleteShareGroupOffsetsOptions)} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class DeleteShareGroupOffsetsOptions extends
AbstractOptions<DeleteShareGroupOffsetsOptions> {
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsResult.java
new file mode 100644
index 00000000000..d611881a813
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsResult.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The result of the {@link Admin#deleteShareGroupOffsets(String, Set,
DeleteShareGroupOffsetsOptions)} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class DeleteShareGroupOffsetsResult {
+
+ private final KafkaFuture<Map<TopicPartition, ApiException>> future;
+ private final Set<TopicPartition> partitions;
+
+ DeleteShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition,
ApiException>> future, Set<TopicPartition> partitions) {
+ this.future = future;
+ this.partitions = partitions;
+ }
+
+ /**
+ * Return a future which succeeds only if all the deletions succeed.
+ * If not, the first partition error shall be returned.
+ */
+ public KafkaFuture<Void> all() {
+ final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+
+ this.future.whenComplete((topicPartitions, throwable) -> {
+ if (throwable != null) {
+ result.completeExceptionally(throwable);
+ } else {
+ for (TopicPartition partition : partitions) {
+ if (maybeCompleteExceptionally(topicPartitions, partition,
result)) {
+ return;
+ }
+ }
+ result.complete(null);
+ }
+ });
+ return result;
+ }
+
+ /**
+ * Return a future which can be used to check the result for a given
partition.
+ */
+ public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
+ if (!partitions.contains(partition)) {
+ throw new IllegalArgumentException("Partition " + partition + "
was not included in the original request");
+ }
+ final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+
+ this.future.whenComplete((topicPartitions, throwable) -> {
+ if (throwable != null) {
+ result.completeExceptionally(throwable);
+ } else if (!maybeCompleteExceptionally(topicPartitions, partition,
result)) {
+ result.complete(null);
+ }
+ });
+ return result;
+ }
+
+ private boolean maybeCompleteExceptionally(Map<TopicPartition,
ApiException> partitionLevelErrors,
+ TopicPartition partition,
+ KafkaFutureImpl<Void> result) {
+ Throwable exception;
+ if (!partitionLevelErrors.containsKey(partition)) {
+ exception = new IllegalArgumentException("Offset deletion result
for partition \"" + partition + "\" was not included in the response");
+ } else {
+ exception = partitionLevelErrors.get(partition);
+ }
+
+ if (exception != null) {
+ result.completeExceptionally(exception);
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index 8b7db2f04f2..1cb53f127fc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -333,6 +333,11 @@ public class ForwardingAdmin implements Admin {
return delegate.listShareGroupOffsets(groupSpecs, options);
}
+ @Override
+ public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String
groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions
options) {
+ return delegate.deleteShareGroupOffsets(groupId, partitions, options);
+ }
+
@Override
public DeleteShareGroupsResult deleteShareGroups(Collection<String>
groupIds, DeleteShareGroupsOptions options) {
return delegate.deleteShareGroups(groupIds, options);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e85dcf21e03..4e37d527d76 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -50,6 +50,7 @@ import
org.apache.kafka.clients.admin.internals.CoordinatorKey;
import
org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
+import org.apache.kafka.clients.admin.internals.DeleteShareGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteShareGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
@@ -3841,6 +3842,14 @@ public class KafkaAdminClient extends AdminClient {
return new ListShareGroupOffsetsResult(future.all());
}
+ @Override
+ public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String
groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions
options) {
+ SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition,
ApiException>> future = DeleteShareGroupOffsetsHandler.newFuture(groupId);
+ DeleteShareGroupOffsetsHandler handler = new
DeleteShareGroupOffsetsHandler(groupId, partitions, logContext);
+ invokeDriver(handler, future, options.timeoutMs);
+ return new
DeleteShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)),
partitions);
+ }
+
@Override
public DescribeStreamsGroupsResult describeStreamsGroups(final
Collection<String> groupIds,
final
DescribeStreamsGroupsOptions options) {
@@ -3851,7 +3860,7 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeStreamsGroupsResult(future.all().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().idValue,
Map.Entry::getValue)));
}
-
+
@Override
public DescribeClassicGroupsResult describeClassicGroups(final
Collection<String> groupIds,
final
DescribeClassicGroupsOptions options) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java
new file mode 100644
index 00000000000..c5911e4303e
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupOffsetsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is the handler for {@link
KafkaAdminClient#deleteShareGroupOffsets(String, Set,
DeleteShareGroupOffsetsOptions)} call
+ */
+public class DeleteShareGroupOffsetsHandler extends
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, ApiException>> {
+
+ private final CoordinatorKey groupId;
+
+ private final Logger log;
+
+ private final Set<TopicPartition> partitions;
+
+ private final CoordinatorStrategy lookupStrategy;
+
+ public DeleteShareGroupOffsetsHandler(String groupId, Set<TopicPartition>
partitions, LogContext logContext) {
+ this.groupId = CoordinatorKey.byGroupId(groupId);
+ this.partitions = partitions;
+ this.log = logContext.logger(DeleteShareGroupOffsetsHandler.class);
+ this.lookupStrategy = new
CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
+ }
+
+ @Override
+ public String apiName() {
+ return "deleteShareGroupOffsets";
+ }
+
+ @Override
+ public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+ return lookupStrategy;
+ }
+
+ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey,
Map<TopicPartition, ApiException>> newFuture(String groupId) {
+ return
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+ }
+
+ private void validateKeys(Set<CoordinatorKey> groupIds) {
+ if (!groupIds.equals(Collections.singleton(groupId))) {
+ throw new IllegalArgumentException("Received unexpected group ids
" + groupIds +
+ " (expected only " + Collections.singleton(groupId) + ")");
+ }
+ }
+
+ @Override
+ DeleteShareGroupOffsetsRequest.Builder buildBatchedRequest(int brokerId,
Set<CoordinatorKey> groupIds) {
+ validateKeys(groupIds);
+
+ final
List<DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic>
topics =
+ new ArrayList<>();
+
partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic,
topicPartitions) -> topics.add(
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topic)
+ .setPartitions(topicPartitions.stream()
+ .map(TopicPartition::partition)
+ .collect(Collectors.toList())
+ )
+ ));
+
+ return new DeleteShareGroupOffsetsRequest.Builder(
+ new DeleteShareGroupOffsetsRequestData()
+ .setGroupId(groupId.idValue)
+ .setTopics(topics),
+ true
+ );
+ }
+
+ @Override
+ public ApiResult<CoordinatorKey, Map<TopicPartition, ApiException>>
handleResponse(
+ Node coordinator,
+ Set<CoordinatorKey> groupIds,
+ AbstractResponse abstractResponse
+ ) {
+ validateKeys(groupIds);
+
+ final DeleteShareGroupOffsetsResponse response =
(DeleteShareGroupOffsetsResponse) abstractResponse;
+
+ final Errors groupError = Errors.forCode(response.data().errorCode());
+ final String groupErrorMessage = response.data().errorMessage();
+
+ if (groupError != Errors.NONE) {
+ final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+ final Map<CoordinatorKey, Throwable> groupsFailed = new
HashMap<>();
+ handleGroupError(groupId, groupError, groupErrorMessage,
groupsFailed, groupsToUnmap);
+
+ return new ApiResult<>(Collections.emptyMap(), groupsFailed, new
ArrayList<>(groupsToUnmap));
+ } else {
+ final Map<TopicPartition, ApiException> partitionResults = new
HashMap<>();
+ response.data().responses().forEach(topic ->
+ topic.partitions().forEach(partition -> {
+ if (partition.errorCode() != Errors.NONE.code()) {
+ final Errors partitionError =
Errors.forCode(partition.errorCode());
+ final String partitionErrorMessage =
partition.errorMessage();
+ log.debug("DeleteShareGroupOffsets request for group
id {}, topic {} and partition {} failed and returned error {}." +
partitionErrorMessage,
+ groupId.idValue, topic.topicName(),
partition.partitionIndex(), partitionError);
+ }
+ partitionResults.put(
+ new TopicPartition(topic.topicName(),
partition.partitionIndex()),
+
Errors.forCode(partition.errorCode()).exception(partition.errorMessage())
+ );
+ })
+ );
+
+ return ApiResult.completed(groupId, partitionResults);
+ }
+ }
+
+ private void handleGroupError(
+ CoordinatorKey groupId,
+ Errors error,
+ String errorMessage,
+ Map<CoordinatorKey, Throwable> failed,
+ Set<CoordinatorKey> groupsToUnmap
+ ) {
+ switch (error) {
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ case REBALANCE_IN_PROGRESS:
+ // If the coordinator is in the middle of loading, then we
just need to retry
+ log.debug("DeleteShareGroupOffsets request for group id {}
failed because the coordinator" +
+ " is still in the process of loading state. Will retry. "
+ errorMessage, groupId.idValue);
+ break;
+ case COORDINATOR_NOT_AVAILABLE:
+ case NOT_COORDINATOR:
+ // If the coordinator is unavailable or there was a
coordinator change, then we unmap
+ // the key so that we retry the `FindCoordinator` request
+ log.debug("DeleteShareGroupOffsets request for group id {}
returned error {}. Will rediscover the coordinator and retry. " + errorMessage,
+ groupId.idValue, error);
+ groupsToUnmap.add(groupId);
+ break;
+ case INVALID_GROUP_ID:
+ case GROUP_ID_NOT_FOUND:
+ case NON_EMPTY_GROUP:
+ case INVALID_REQUEST:
+ case UNKNOWN_SERVER_ERROR:
+ case KAFKA_STORAGE_ERROR:
+ case GROUP_AUTHORIZATION_FAILED:
+ log.debug("DeleteShareGroupOffsets request for group id {}
failed due to error {}. " + errorMessage, groupId.idValue, error);
+ failed.put(groupId, error.exception(errorMessage));
+ break;
+ default:
+ log.error("DeleteShareGroupOffsets request for group id {}
failed due to unexpected error {}. " + errorMessage, groupId.idValue, error);
+ failed.put(groupId, error.exception(errorMessage));
+ }
+ }
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
index e52e665e909..f96bad8d178 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
@@ -23,10 +23,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
public class DeleteShareGroupOffsetsRequest extends AbstractRequest {
public static class Builder extends
AbstractRequest.Builder<DeleteShareGroupOffsetsRequest> {
@@ -59,19 +55,20 @@ public class DeleteShareGroupOffsetsRequest extends
AbstractRequest {
this.data = data;
}
+ DeleteShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs,
Errors error) {
+ return getErrorResponse(throttleTimeMs, error.code(), error.message());
+ }
+
+ public DeleteShareGroupOffsetsResponse getErrorResponse(int
throttleTimeMs, short errorCode, String errorMessage) {
+ return new DeleteShareGroupOffsetsResponse(new
DeleteShareGroupOffsetsResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorMessage(errorMessage)
+ .setErrorCode(errorCode));
+ }
+
@Override
public DeleteShareGroupOffsetsResponse getErrorResponse(int
throttleTimeMs, Throwable e) {
-
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
results = new ArrayList<>();
- data.topics().forEach(
- topicResult -> results.add(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
- .setTopicName(topicResult.topicName())
- .setPartitions(topicResult.partitions().stream()
- .map(partitionData -> new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partitionData)
- .setErrorCode(Errors.forException(e).code()))
- .collect(Collectors.toList()))));
- return new DeleteShareGroupOffsetsResponse(new
DeleteShareGroupOffsetsResponseData()
- .setResponses(results));
+ return getErrorResponse(throttleTimeMs, Errors.forException(e));
}
@Override
@@ -85,4 +82,14 @@ public class DeleteShareGroupOffsetsRequest extends
AbstractRequest {
version
);
}
+
+ public static DeleteShareGroupOffsetsResponseData
getErrorDeleteResponseData(Errors error) {
+ return getErrorDeleteResponseData(error.code(), error.message());
+ }
+
+ public static DeleteShareGroupOffsetsResponseData
getErrorDeleteResponseData(short errorCode, String errorMessage) {
+ return new DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(errorCode)
+ .setErrorMessage(errorMessage);
+ }
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsResponse.java
index 49e4d64a790..192d11b0b73 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsResponse.java
@@ -42,6 +42,7 @@ public class DeleteShareGroupOffsetsResponse extends
AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new EnumMap<>(Errors.class);
+ updateErrorCounts(counts, Errors.forCode(data.errorCode()));
data.responses().forEach(
topicResult -> topicResult.partitions().forEach(
partitionResult -> updateErrorCounts(counts,
Errors.forCode(partitionResult.errorCode()))
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeRequest.java
index e95d33385a4..14dd429b8a4 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeRequest.java
@@ -94,6 +94,7 @@ public class ShareGroupDescribeRequest extends
AbstractRequest {
.map(groupId -> new
ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(error.code())
+ .setErrorMessage(error.message())
).collect(Collectors.toList());
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index a478092cfc6..c4988d13165 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -106,6 +106,8 @@ import
org.apache.kafka.common.message.DeleteGroupsResponseData;
import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
import org.apache.kafka.common.message.DeleteRecordsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DeleteTopicsResponseData;
import
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResultCollection;
@@ -191,6 +193,8 @@ import
org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupOffsetsResponse;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.DescribeAclsResponse;
@@ -10780,6 +10784,156 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testDeleteShareGroupOffsetsOptionsWithBatchedApi() throws
Exception {
+ final Cluster cluster = mockCluster(3, 0);
+ final Time time = new MockTime();
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
cluster,
+ AdminClientConfig.RETRIES_CONFIG, "0")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ final Set<TopicPartition> partitions = Collections.singleton(new
TopicPartition("A", 0));
+ final DeleteShareGroupOffsetsOptions options = new
DeleteShareGroupOffsetsOptions();
+
+ env.adminClient().deleteShareGroupOffsets(GROUP_ID, partitions,
options);
+
+ final MockClient mockClient = env.kafkaClient();
+ waitForRequest(mockClient, ApiKeys.DELETE_SHARE_GROUP_OFFSETS);
+
+ ClientRequest clientRequest = mockClient.requests().peek();
+ assertNotNull(clientRequest);
+ DeleteShareGroupOffsetsRequestData data =
((DeleteShareGroupOffsetsRequest.Builder)
clientRequest.requestBuilder()).build().data();
+ assertEquals(GROUP_ID, data.groupId());
+ assertEquals(1, data.topics().size());
+ assertEquals(Collections.singletonList("A"),
+
data.topics().stream().map(DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic::topicName).collect(Collectors.toList()));
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsets() throws Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ DeleteShareGroupOffsetsResponseData data = new
DeleteShareGroupOffsetsResponseData().setResponses(
+ List.of(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0),
new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(1))),
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
+ )
+ );
+
+ TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
+ TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
+ TopicPartition barPartition0 = new TopicPartition("bar", 0);
+ TopicPartition zooTopicPartition0 = new TopicPartition("zoo", 0);
+
+ env.kafkaClient().prepareResponse(new
DeleteShareGroupOffsetsResponse(data));
+ final DeleteShareGroupOffsetsResult result =
env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0,
fooTopicPartition1, barPartition0));
+
+ assertNull(result.all().get());
+ assertNull(result.partitionResult(fooTopicPartition0).get());
+ assertNull(result.partitionResult(fooTopicPartition1).get());
+ assertNull(result.partitionResult(barPartition0).get());
+ assertThrows(IllegalArgumentException.class, () ->
result.partitionResult(zooTopicPartition0));
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsEmpty() throws Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ DeleteShareGroupOffsetsResponseData data = new
DeleteShareGroupOffsetsResponseData().setResponses(
+ Collections.emptyList()
+ );
+ env.kafkaClient().prepareResponse(new
DeleteShareGroupOffsetsResponse(data));
+
+ final DeleteShareGroupOffsetsResult result =
env.adminClient().deleteShareGroupOffsets(GROUP_ID, Collections.emptySet());
+ assertDoesNotThrow(() -> result.all().get());
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsWithErrorInGroup() throws Exception
{
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ DeleteShareGroupOffsetsResponseData data = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())
+ .setErrorMessage(Errors.GROUP_AUTHORIZATION_FAILED.message());
+
+ TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
+ TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
+ TopicPartition barTopicPartition0 = new TopicPartition("bar", 0);
+
+ env.kafkaClient().prepareResponse(new
DeleteShareGroupOffsetsResponse(data));
+ final DeleteShareGroupOffsetsResult result =
env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0,
fooTopicPartition1, barTopicPartition0));
+
+
TestUtils.assertFutureThrows(Errors.GROUP_AUTHORIZATION_FAILED.exception().getClass(),
result.all());
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsWithErrorInOnePartition() throws
Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ DeleteShareGroupOffsetsResponseData data = new
DeleteShareGroupOffsetsResponseData().setResponses(
+ List.of(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0),
new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.KAFKA_STORAGE_ERROR.code()).setErrorMessage(Errors.KAFKA_STORAGE_ERROR.message()))),
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
+ )
+ );
+
+ TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
+ TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
+ TopicPartition barTopicPartition0 = new TopicPartition("bar", 0);
+
+ env.kafkaClient().prepareResponse(new
DeleteShareGroupOffsetsResponse(data));
+ final DeleteShareGroupOffsetsResult result =
env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0,
fooTopicPartition1, barTopicPartition0));
+
+
TestUtils.assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(),
result.all());
+ assertNull(result.partitionResult(fooTopicPartition0).get());
+
TestUtils.assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(),
result.partitionResult(fooTopicPartition1));
+ assertNull(result.partitionResult(barTopicPartition0).get());
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsWithPartitionNotPresentInResult()
throws Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ DeleteShareGroupOffsetsResponseData data = new
DeleteShareGroupOffsetsResponseData().setResponses(
+ List.of(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0),
new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(1))),
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
+ )
+ );
+
+ TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
+ TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
+ TopicPartition barTopicPartition0 = new TopicPartition("bar", 0);
+ TopicPartition barTopicPartition1 = new TopicPartition("bar", 1);
+
+ env.kafkaClient().prepareResponse(new
DeleteShareGroupOffsetsResponse(data));
+ final DeleteShareGroupOffsetsResult result =
env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0,
fooTopicPartition1, barTopicPartition0));
+
+ assertDoesNotThrow(() -> result.all().get());
+ assertThrows(IllegalArgumentException.class, () ->
result.partitionResult(barTopicPartition1));
+ assertNull(result.partitionResult(barTopicPartition0).get());
+ }
+ }
+
private static StreamsGroupDescribeResponseData
makeFullStreamsGroupDescribeResponse() {
StreamsGroupDescribeResponseData data;
StreamsGroupDescribeResponseData.TaskIds activeTasks1 = new
StreamsGroupDescribeResponseData.TaskIds()
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index f58548465d5..f9c2be19b07 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1429,6 +1429,11 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet");
}
+ @Override
+ public synchronized DeleteShareGroupOffsetsResult
deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions,
DeleteShareGroupOffsetsOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
@Override
public synchronized DeleteShareGroupsResult
deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions
options) {
throw new UnsupportedOperationException("Not implemented yet");
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 30f53f3d26b..5894d3a40d3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -32,6 +32,8 @@ import
org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE
import org.apache.kafka.common.internals.{FatalExitError, Topic}
import
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult,
AddPartitionsToTxnResultCollection}
import
org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult,
DeleteRecordsTopicResult}
+import
org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic
+import
org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic
import
org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
import
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
import
org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse,
ListOffsetsTopicResponse}
@@ -75,6 +77,7 @@ import java.util.concurrent.{CompletableFuture,
ConcurrentHashMap}
import java.util.stream.Collectors
import java.util.{Collections, Optional}
import scala.annotation.nowarn
+import scala.collection.convert.ImplicitConversions.`collection
AsScalaIterable`
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._
@@ -3625,8 +3628,64 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDeleteShareGroupOffsetsRequest(request: RequestChannel.Request):
Unit = {
val deleteShareGroupOffsetsRequest =
request.body[DeleteShareGroupOffsetsRequest]
- requestHelper.sendMaybeThrottle(request,
deleteShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
- CompletableFuture.completedFuture[Unit](())
+
+ val groupId = deleteShareGroupOffsetsRequest.data.groupId
+
+ if (!isShareGroupProtocolEnabled) {
+ requestHelper.sendMaybeThrottle(request,
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
Errors.UNSUPPORTED_VERSION.exception))
+ return
+ } else if (!authHelper.authorize(request.context, DELETE, GROUP, groupId))
{
+ requestHelper.sendMaybeThrottle(request,
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
Errors.GROUP_AUTHORIZATION_FAILED.exception))
+ return
+ }
+
+ val deleteShareGroupOffsetsResponseTopics:
util.List[DeleteShareGroupOffsetsResponseTopic] = new
util.ArrayList[DeleteShareGroupOffsetsResponseTopic]()
+
+ val authorizedTopics: util.List[DeleteShareGroupOffsetsRequestTopic] =
+ new util.ArrayList[DeleteShareGroupOffsetsRequestTopic]
+
+ deleteShareGroupOffsetsRequest.data.topics.forEach{ topic =>
+ if (!authHelper.authorize(request.context, READ, TOPIC,
topic.topicName)) {
+ deleteShareGroupOffsetsResponseTopics.add(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topic.topicName)
+ .setPartitions(topic.partitions.map(partition => {
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
+ }).toList.asJava)
+ )
+ } else {
+ authorizedTopics.add(topic)
+ }
+ }
+
+ if (authorizedTopics.isEmpty) {
+ requestHelper.sendMaybeThrottle(request, new
DeleteShareGroupOffsetsResponse(new DeleteShareGroupOffsetsResponseData()))
+ return
+ }
+
+ groupCoordinator.deleteShareGroupOffsets(
+ request.context,
+ new
DeleteShareGroupOffsetsRequestData().setGroupId(groupId).setTopics(authorizedTopics)
+ ).handle[Unit] {(responseData, exception) => {
+ if (exception != null) {
+ requestHelper.sendMaybeThrottle(request,
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
exception))
+ } else if (responseData.errorCode() != Errors.NONE.code) {
+ requestHelper.sendMaybeThrottle(
+ request,
+
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
responseData.errorCode(), responseData.errorMessage())
+ )
+ } else {
+ responseData.responses.forEach { topic => {
+ deleteShareGroupOffsetsResponseTopics.add(topic)
+ }}
+ val deleteShareGroupStateResponse = new
DeleteShareGroupOffsetsResponse(new DeleteShareGroupOffsetsResponseData()
+ .setResponses(deleteShareGroupOffsetsResponseTopics))
+ requestHelper.sendMaybeThrottle(request, deleteShareGroupStateResponse)
+ }
+ }}
}
// Visible for Testing
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e490be540e2..e933b1985c7 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -43,6 +43,8 @@ import
org.apache.kafka.common.message.ApiMessageType.ListenerType
import
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{DescribedGroup,
TopicPartitions}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
+import
org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic
+import
org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.{DeleteShareGroupOffsetsResponsePartition,
DeleteShareGroupOffsetsResponseTopic}
import
org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.{DescribeShareGroupOffsetsRequestGroup,
DescribeShareGroupOffsetsRequestTopic}
import
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.{DescribeShareGroupOffsetsResponseGroup,
DescribeShareGroupOffsetsResponsePartition,
DescribeShareGroupOffsetsResponseTopic}
import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource
=> IAlterConfigsResource, AlterConfigsResourceCollection =>
IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig,
AlterableConfigCollection => IAlterableConfigCollection}
@@ -11809,6 +11811,358 @@ class KafkaApisTest extends Logging {
assertEquals(describeShareGroupOffsetsResponse, response.data)
}
+ @Test
+ def testDeleteShareGroupOffsetsReturnsUnsupportedVersion(): Unit = {
+ val deleteShareGroupOffsetsRequest = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("group")
+ .setTopics(util.List.of(new
DeleteShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1))))
+
+ val requestChannelRequest = buildRequest(new
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest,
true).build())
+ metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response =
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+ response.data.responses.forEach(topic =>
topic.partitions.forEach(partition =>
assertEquals(Errors.UNSUPPORTED_VERSION.code, partition.errorCode)))
+ }
+
+ @Test
+ def testDeleteShareGroupOffsetsRequestsGroupAuthorizationFailed(): Unit = {
+ val deleteShareGroupOffsetsRequest = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("group")
+ .setTopics(util.List.of(new
DeleteShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1))))
+
+ val requestChannelRequest = buildRequest(new
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest,
true).build)
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+ .thenReturn(util.List.of(AuthorizationResult.DENIED))
+ metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
+ authorizer = Some(authorizer),
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response =
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+ assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code,
response.data.errorCode)
+ }
+
+ @Test
+ def testDeleteShareGroupOffsetsRequestsTopicAuthorizationFailed(): Unit = {
+
+ def buildExpectedActionsTopic(topic: String): util.List[Action] = {
+ val pattern = new ResourcePattern(ResourceType.TOPIC, topic,
PatternType.LITERAL)
+ val action = new Action(AclOperation.READ, pattern, 1, true, true)
+ Collections.singletonList(action)
+ }
+
+ def buildExpectedActionsGroup(topic: String): util.List[Action] = {
+ val pattern = new ResourcePattern(ResourceType.GROUP, topic,
PatternType.LITERAL)
+ val action = new Action(AclOperation.DELETE, pattern, 1, true, true)
+ Collections.singletonList(action)
+ }
+
+ val groupId = "group"
+
+ val topicName1 = "topic-1"
+ val topicId1 = Uuid.randomUuid
+ val topicName2 = "topic-2"
+ val topicId2 = Uuid.randomUuid
+ metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ addTopicToMetadataCache(topicName1, 2, topicId = topicId1)
+ addTopicToMetadataCache(topicName2, 2, topicId = topicId2)
+
+ val deleteShareGroupOffsetsRequestTopic1 = new
DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName1)
+ .setPartitions(util.List.of(0, 1))
+
+ val deleteShareGroupOffsetsRequestTopic2 = new
DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName2)
+ .setPartitions(util.List.of(0, 1))
+
+ val deleteShareGroupOffsetsRequestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId(groupId)
+ .setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic1,
deleteShareGroupOffsetsRequestTopic2))
+
+ val deleteShareGroupOffsetsGroupCoordinatorRequestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId(groupId)
+ .setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic2))
+
+ val requestChannelRequest = buildRequest(new
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData,
true).build)
+
+ val resultFuture = new
CompletableFuture[DeleteShareGroupOffsetsResponseData]
+ when(groupCoordinator.deleteShareGroupOffsets(
+ requestChannelRequest.context,
+ deleteShareGroupOffsetsGroupCoordinatorRequestData
+ )).thenReturn(resultFuture)
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ when(authorizer.authorize(any[RequestContext],
ArgumentMatchers.eq(buildExpectedActionsGroup(groupId))))
+ .thenReturn(util.List.of(AuthorizationResult.ALLOWED))
+ when(authorizer.authorize(any[RequestContext],
ArgumentMatchers.eq(buildExpectedActionsTopic(topicName1))))
+ .thenReturn(util.List.of(AuthorizationResult.DENIED))
+ when(authorizer.authorize(any[RequestContext],
ArgumentMatchers.eq(buildExpectedActionsTopic(topicName2))))
+ .thenReturn(util.List.of(AuthorizationResult.ALLOWED))
+
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
+ authorizer = Some(authorizer)
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val deleteShareGroupOffsetsResponseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorMessage(null)
+ .setErrorCode(Errors.NONE.code())
+ .setResponses(util.List.of(
+ new DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName2)
+ .setTopicId(topicId2)
+ .setPartitions(util.List.of(
+ new DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorMessage(null)
+ .setErrorCode(Errors.NONE.code()),
+ new DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(1)
+ .setErrorMessage(null)
+ .setErrorCode(Errors.NONE.code())
+ ))
+ ))
+
+ val expectedResponseTopics:
util.List[DeleteShareGroupOffsetsResponseTopic] = new
util.ArrayList[DeleteShareGroupOffsetsResponseTopic]()
+
+ expectedResponseTopics.add(
+ new DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName1)
+ .setPartitions(util.List.of(
+ new DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message()),
+ new DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
+ ))
+ )
+
+ deleteShareGroupOffsetsResponseData.responses.forEach{ topic => {
+ expectedResponseTopics.add(topic)
+ }}
+
+ val expectedResponseData: DeleteShareGroupOffsetsResponseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(null)
+ .setResponses(expectedResponseTopics)
+
+ resultFuture.complete(deleteShareGroupOffsetsResponseData)
+ val response =
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+ assertEquals(expectedResponseData, response.data)
+ }
+
+ @Test
+ def testDeleteShareGroupOffsetsRequestSuccess(): Unit = {
+ val topicName1 = "topic-1"
+ val topicId1 = Uuid.randomUuid
+ val topicName2 = "topic-2"
+ val topicId2 = Uuid.randomUuid
+ val topicName3 = "topic-3"
+ val topicId3 = Uuid.randomUuid
+ metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+ addTopicToMetadataCache(topicName2, 2, topicId = topicId2)
+ addTopicToMetadataCache(topicName3, 3, topicId = topicId3)
+
+ val deleteShareGroupOffsetsRequestTopic1 = new
DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName1)
+ .setPartitions(util.List.of(0))
+
+ val deleteShareGroupOffsetsRequestTopic2 = new
DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName2)
+ .setPartitions(util.List.of(0, 1))
+
+ val deleteShareGroupOffsetsRequestTopic3 = new
DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName3)
+ .setPartitions(util.List.of(0, 1, 2))
+
+ val deleteShareGroupOffsetsRequestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("group")
+ .setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic1,
deleteShareGroupOffsetsRequestTopic2, deleteShareGroupOffsetsRequestTopic3))
+
+ val requestChannelRequest = buildRequest(new
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData,
true).build)
+
+ val resultFuture = new
CompletableFuture[DeleteShareGroupOffsetsResponseData]
+ when(groupCoordinator.deleteShareGroupOffsets(
+ requestChannelRequest.context,
+ deleteShareGroupOffsetsRequestData
+ )).thenReturn(resultFuture)
+
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val deleteShareGroupOffsetsResponseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorMessage(null)
+ .setErrorCode(Errors.NONE.code())
+ .setResponses(util.List.of(
+ new DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName1)
+ .setTopicId(topicId1)
+ .setPartitions(util.List.of(
+ new DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorMessage(null)
+ .setErrorCode(Errors.NONE.code())
+ )),
+ new DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName2)
+ .setTopicId(topicId2)
+ .setPartitions(util.List.of(
+ new DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorMessage(null)
+ .setErrorCode(Errors.NONE.code()),
+ new DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(1)
+ .setErrorMessage(null)
+ .setErrorCode(Errors.NONE.code())
+ )),
+ new DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName3)
+ .setTopicId(topicId3)
+ .setPartitions(util.List.of(
+ new DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorMessage(null)
+ .setErrorCode(Errors.NONE.code()),
+ new DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(1)
+ .setErrorMessage(null)
+ .setErrorCode(Errors.NONE.code()),
+ new DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(2)
+ .setErrorMessage(null)
+ .setErrorCode(Errors.NONE.code())
+ ))
+ ))
+
+ resultFuture.complete(deleteShareGroupOffsetsResponseData)
+ val response =
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+ assertEquals(deleteShareGroupOffsetsResponseData, response.data)
+ }
+
+ @Test
+ def testDeleteShareGroupOffsetsRequestGroupCoordinatorThrowsError(): Unit = {
+ val topicName1 = "topic-1"
+ val topicId1 = Uuid.randomUuid
+ val topicName2 = "topic-2"
+ val topicId2 = Uuid.randomUuid
+ metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+ addTopicToMetadataCache(topicName2, 2, topicId = topicId2)
+
+ val deleteShareGroupOffsetsRequestTopic1 = new
DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName1)
+ .setPartitions(util.List.of(0))
+
+ val deleteShareGroupOffsetsRequestTopic2 = new
DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName2)
+ .setPartitions(util.List.of(0, 1))
+
+ val deleteShareGroupOffsetsRequestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("group")
+ .setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic1,
deleteShareGroupOffsetsRequestTopic2))
+
+ val requestChannelRequest = buildRequest(new
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData,
true).build)
+
+ when(groupCoordinator.deleteShareGroupOffsets(
+ requestChannelRequest.context,
+ deleteShareGroupOffsetsRequestData
+
)).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception))
+
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val deleteShareGroupOffsetsResponseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+
+ val response =
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+ assertEquals(deleteShareGroupOffsetsResponseData, response.data)
+ }
+
+ @Test
+ def testDeleteShareGroupOffsetsRequestGroupCoordinatorErrorResponse(): Unit
= {
+ val topicName1 = "topic-1"
+ val topicId1 = Uuid.randomUuid
+ val topicName2 = "topic-2"
+ val topicId2 = Uuid.randomUuid
+ metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+ addTopicToMetadataCache(topicName2, 2, topicId = topicId2)
+
+ val deleteShareGroupOffsetsRequestTopic1 = new
DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName1)
+ .setPartitions(util.List.of(0))
+
+ val deleteShareGroupOffsetsRequestTopic2 = new
DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName2)
+ .setPartitions(util.List.of(0, 1))
+
+ val deleteShareGroupOffsetsRequestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("group")
+ .setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic1,
deleteShareGroupOffsetsRequestTopic2))
+
+ val requestChannelRequest = buildRequest(new
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData,
true).build)
+
+ val groupCoordinatorResponse: DeleteShareGroupOffsetsResponseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+ .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
+
+ when(groupCoordinator.deleteShareGroupOffsets(
+ requestChannelRequest.context,
+ deleteShareGroupOffsetsRequestData
+ )).thenReturn(CompletableFuture.completedFuture(groupCoordinatorResponse))
+
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val deleteShareGroupOffsetsResponseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+
+ val response =
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+ assertEquals(deleteShareGroupOffsetsResponseData, response.data)
+ }
+
+ @Test
+ def testDeleteShareGroupOffsetsRequestEmptyTopicsSuccess(): Unit = {
+ metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+
+ val deleteShareGroupOffsetsRequest = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("group")
+
+ val requestChannelRequest = buildRequest(new
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest,
true).build)
+
+ val resultFuture = new
CompletableFuture[DeleteShareGroupOffsetsResponseData]
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val deleteShareGroupOffsetsResponse = new
DeleteShareGroupOffsetsResponseData()
+
+ resultFuture.complete(deleteShareGroupOffsetsResponse)
+ val response =
verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
+ assertEquals(deleteShareGroupOffsetsResponse, response.data)
+ }
+
@Test
def testWriteShareGroupStateSuccess(): Unit = {
val topicId = Uuid.randomUuid();
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index c1072aa8d86..98d33e9f254 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -21,6 +21,8 @@ import
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
@@ -311,6 +313,20 @@ public interface GroupCoordinator {
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
request
);
+ /**
+ * Delete the Share Group Offsets for a given group.
+ *
+ * @param context The request context
+ * @param request The DeleteShareGroupOffsetsRequestGroup
request.
+ *
+ * @return A future yielding the results.
+ * The error codes of the response are set to indicate the errors
occurred during the execution.
+ */
+ CompletableFuture<DeleteShareGroupOffsetsResponseData>
deleteShareGroupOffsets(
+ RequestContext context,
+ DeleteShareGroupOffsetsRequestData request
+ );
+
/**
* Commit offsets for a given Group.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 310bb64bde8..97bc7e69622 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -26,6 +26,9 @@ import
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
@@ -59,6 +62,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
@@ -932,7 +936,8 @@ public class GroupCoordinatorService implements
GroupCoordinator {
@Override
public CompletableFuture<List<DescribedGroup>> shareGroupDescribe(
RequestContext context,
- List<String> groupIds) {
+ List<String> groupIds
+ ) {
if (!isActive.get()) {
return
CompletableFuture.completedFuture(ShareGroupDescribeRequest.getErrorDescribedGroupList(
groupIds,
@@ -1243,6 +1248,48 @@ public class GroupCoordinatorService implements
GroupCoordinator {
});
}
+ private void populateDeleteShareGroupOffsetsFuture(
+ DeleteShareGroupOffsetsRequestData requestData,
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future,
+ Map<Uuid, String> requestTopicIdToNameMapping,
+ List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData,
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
deleteShareGroupOffsetsResponseTopicList
+
+ ) {
+ DeleteShareGroupStateRequestData deleteShareGroupStateRequestData =
new DeleteShareGroupStateRequestData()
+ .setGroupId(requestData.groupId())
+ .setTopics(deleteShareGroupStateRequestTopicsData);
+
+
persister.deleteState(DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData))
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.error("Failed to delete share group state");
+ future.completeExceptionally(error);
+ return;
+ }
+ if (result == null || result.topicsData() == null) {
+ log.error("Result is null for the delete share group
state");
+ future.completeExceptionally(new
IllegalStateException("Result is null for the delete share group state"));
+ return;
+ }
+ result.topicsData().forEach(topicData ->
+ deleteShareGroupOffsetsResponseTopicList.add(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicId(topicData.topicId())
+
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
+ .setPartitions(topicData.partitions().stream().map(
+ partitionData -> new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partitionData.partition())
+ .setErrorMessage(partitionData.errorCode() ==
Errors.NONE.code() ? null : Errors.forCode(partitionData.errorCode()).message())
+ .setErrorCode(partitionData.errorCode())
+ ).toList())
+ ));
+
+ future.complete(
+ new DeleteShareGroupOffsetsResponseData()
+
.setResponses(deleteShareGroupOffsetsResponseTopicList));
+ });
+ }
+
/**
* See {@link GroupCoordinator#fetchOffsets(RequestContext,
OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
*/
@@ -1508,6 +1555,110 @@ public class GroupCoordinatorService implements
GroupCoordinator {
return future;
}
+ /**
+ * See {@link GroupCoordinator#deleteShareGroupOffsets(RequestContext,
DeleteShareGroupOffsetsRequestData)}.
+ */
+ @Override
+ public CompletableFuture<DeleteShareGroupOffsetsResponseData>
deleteShareGroupOffsets(
+ RequestContext context,
+ DeleteShareGroupOffsetsRequestData requestData
+ ) {
+ if (!isActive.get()) {
+ return CompletableFuture.completedFuture(
+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
+ }
+
+ if (metadataImage == null) {
+ return CompletableFuture.completedFuture(
+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
+ }
+
+ String groupId = requestData.groupId();
+
+ if (!isGroupIdNotEmpty(groupId)) {
+ return CompletableFuture.completedFuture(
+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
+ }
+
+ Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
+ List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData = new
ArrayList<>(requestData.topics().size());
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
deleteShareGroupOffsetsResponseTopicList = new
ArrayList<>(requestData.topics().size());
+
+ requestData.topics().forEach(topic -> {
+ Uuid topicId =
metadataImage.topics().topicNameToIdView().get(topic.topicName());
+ if (topicId != null) {
+ requestTopicIdToNameMapping.put(topicId, topic.topicName());
+ deleteShareGroupStateRequestTopicsData.add(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId)
+ .setPartitions(
+ topic.partitions().stream().map(
+ partitionIndex -> new
DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex)
+ ).toList()
+ ));
+ } else {
+ deleteShareGroupOffsetsResponseTopicList.add(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topic.topicName())
+ .setPartitions(topic.partitions().stream().map(
+ partition -> new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+ ).toList()));
+ }
+ });
+
+ // If the request for the persister is empty, just complete the
operation right away.
+ if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+ return CompletableFuture.completedFuture(
+ new DeleteShareGroupOffsetsResponseData()
+ .setResponses(deleteShareGroupOffsetsResponseTopicList));
+ }
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new
CompletableFuture<>();
+
+ TopicPartition topicPartition = topicPartitionFor(groupId);
+
+ // This is done to make sure the provided group is empty. Offsets can
be deleted only for an empty share group.
+ CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>>
describeGroupFuture =
+ runtime.scheduleReadOperation(
+ "share-group-describe",
+ topicPartition,
+ (coordinator, lastCommittedOffset) ->
coordinator.shareGroupDescribe(List.of(groupId), lastCommittedOffset)
+ ).exceptionally(exception -> handleOperationException(
+ "share-group-describe",
+ List.of(groupId),
+ exception,
+ (error, __) ->
ShareGroupDescribeRequest.getErrorDescribedGroupList(List.of(groupId), error),
+ log
+ ));
+
+ describeGroupFuture.whenComplete((groups, throwable) -> {
+ if (throwable != null) {
+ log.error("Failed to describe the share group {}", groupId,
throwable);
+
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable)));
+ } else if (groups == null || groups.isEmpty()) {
+ log.error("Describe share group resulted in empty response for
group {}", groupId);
+
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.GROUP_ID_NOT_FOUND));
+ } else if (groups.get(0).errorCode() != Errors.NONE.code()) {
+ log.error("Failed to describe the share group {}", groupId);
+
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(groups.get(0).errorCode(),
groups.get(0).errorMessage()));
+ } else if (groups.get(0).members() != null &&
!groups.get(0).members().isEmpty()) {
+ log.error("Provided group {} is not empty", groupId);
+
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.NON_EMPTY_GROUP));
+ } else {
+ populateDeleteShareGroupOffsetsFuture(
+ requestData,
+ future,
+ requestTopicIdToNameMapping,
+ deleteShareGroupStateRequestTopicsData,
+ deleteShareGroupOffsetsResponseTopicList
+ );
+ }
+ });
+
+ return future;
+ }
+
/**
* See {@link GroupCoordinator#commitOffsets(RequestContext,
OffsetCommitRequestData, BufferSupplier)}.
*/
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 26e3cbd9469..6b117a0ec45 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -42,6 +42,10 @@ import
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
@@ -117,6 +121,7 @@ import org.mockito.ArgumentMatchers;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -2789,6 +2794,7 @@ public class GroupCoordinatorServiceTest {
List.of(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id")
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+ .setErrorMessage(Errors.COORDINATOR_LOAD_IN_PROGRESS.message())
),
future.get()
);
@@ -2816,6 +2822,7 @@ public class GroupCoordinatorServiceTest {
List.of(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id")
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message())
),
future.get()
);
@@ -3282,6 +3289,531 @@ public class GroupCoordinatorServiceTest {
assertEquals(responseData, future.get());
}
+ @Test
+ public void testDeleteShareGroupOffsetsWithNoOpPersister() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
+ .setErrorMessage(null))))
+ );
+
+ ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("share-group-id-1");
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsWithDefaultPersister() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DeleteShareGroupStateRequestData deleteShareGroupStateRequestData =
new DeleteShareGroupStateRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)))));
+
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(null))))
+ );
+
+ DeleteShareGroupStateResponseData deleteShareGroupStateResponseData =
new DeleteShareGroupStateResponseData()
+ .setResults(
+ List.of(new
DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(null)))
+ )
+ );
+
+ ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("share-group-id-1");
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+ DeleteShareGroupStateParameters deleteShareGroupStateParameters =
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData);
+ DeleteShareGroupStateResult deleteShareGroupStateResult =
DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData);
+ when(persister.deleteState(
+ ArgumentMatchers.eq(deleteShareGroupStateParameters)
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void
testDeleteShareGroupOffsetsNonexistentTopicWithDefaultPersister() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName("badtopic")
+ .setPartitions(List.of(partition))
+ ));
+
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName("badtopic")
+ .setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))))
+ );
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsWithDefaultPersisterThrowsError() {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("share-group-id-1");
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+ when(persister.deleteState(ArgumentMatchers.any()))
+ .thenReturn(CompletableFuture.failedFuture(new Exception("Unable
to validate delete share group state request")));
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+ assertFutureThrows(Exception.class, future, "Unable to validate delete
share group state request");
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsWithDefaultPersisterNullResult() {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("share-group-id-1");
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+ when(persister.deleteState(ArgumentMatchers.any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+ assertFutureThrows(IllegalStateException.class, future, "Result is
null for the delete share group state");
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsWithDefaultPersisterNullTopicData()
{
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("share-group-id-1");
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+ DeleteShareGroupStateResult deleteShareGroupStateResult =
+ new
DeleteShareGroupStateResult.Builder().setTopicsData(null).build();
+
+ when(persister.deleteState(ArgumentMatchers.any()))
+
.thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+ assertFutureThrows(IllegalStateException.class, future, "Result is
null for the delete share group state");
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsCoordinatorNotActive() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsMetadataImageNull() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
+
+ // Forcing a null Metadata Image
+ service.onNewMetadataImage(null, null);
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsInvalidGroupId() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ .setErrorMessage(Errors.INVALID_GROUP_ID.message());
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsDescribeThrowsError() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+ .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsDescribeReturnsNull() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+ .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(null));
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsDescribeReturnsEmpty() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+ .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsDescribeReturnsError() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+ .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+
+ ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+ .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsGroupIsNotEmpty() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.NON_EMPTY_GROUP.code())
+ .setErrorMessage(Errors.NON_EMPTY_GROUP.message());
+
+ ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("share-group-id-1")
+ .setMembers(List.of(new ShareGroupDescribeResponseData.Member()));
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
@Test
public void testPersisterInitializeSuccess() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DeleteShareGroupStateParameters.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DeleteShareGroupStateParameters.java
index 8ca27760709..fa6ae7da934 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DeleteShareGroupStateParameters.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DeleteShareGroupStateParameters.java
@@ -20,6 +20,7 @@ package org.apache.kafka.server.share.persister;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -64,4 +65,16 @@ public class DeleteShareGroupStateParameters implements
PersisterParameters {
return new
DeleteShareGroupStateParameters(groupTopicPartitionData);
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+ DeleteShareGroupStateParameters that =
(DeleteShareGroupStateParameters) o;
+ return Objects.equals(groupTopicPartitionData,
that.groupTopicPartitionData);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(groupTopicPartitionData);
+ }
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index 2005da2d2ce..d6684810c47 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -57,6 +57,8 @@ import
org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.clients.admin.DeleteRecordsOptions;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsOptions;
@@ -473,6 +475,11 @@ public class TestingMetricsInterceptingAdminClient extends
AdminClient {
return adminDelegate.listShareGroupOffsets(groupSpecs, options);
}
+ @Override
+ public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(final String
groupId, final Set<TopicPartition> partitions, final
DeleteShareGroupOffsetsOptions options) {
+ return adminDelegate.deleteShareGroupOffsets(groupId, partitions,
options);
+ }
+
@Override
public DeleteShareGroupsResult deleteShareGroups(final Collection<String>
groupIds, final DeleteShareGroupsOptions options) {
return adminDelegate.deleteShareGroups(groupIds, options);