This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 729f9ccf065 KAFKA-19440: Handle top-level errors in
AlterShareGroupOffsets RPC (#20049)
729f9ccf065 is described below
commit 729f9ccf065b35ce2e4e5b5fe1e9ce129d2ace08
Author: Andrew Schofield <[email protected]>
AuthorDate: Thu Jul 3 11:00:56 2025 +0100
KAFKA-19440: Handle top-level errors in AlterShareGroupOffsets RPC (#20049)
While testing the code in https://github.com/apache/kafka/pull/19820, it
became clear that the error handling problems were due to the underlying
Admin API. This PR fixes the error handling for top-level errors in the
AlterShareGroupOffsets RPC.
Reviewers: Apoorv Mittal <[email protected]>, Lan Ding
<[email protected]>, TaiJuWu <[email protected]>
---
.../admin/AlterShareGroupOffsetsResult.java | 23 +++---
.../kafka/clients/admin/KafkaAdminClient.java | 10 ++-
.../internals/AlterShareGroupOffsetsHandler.java | 93 ++++++++++++----------
.../requests/AlterShareGroupOffsetsRequest.java | 31 +++++---
.../requests/DeleteShareGroupOffsetsRequest.java | 10 ++-
.../kafka/clients/admin/KafkaAdminClientTest.java | 30 ++++++-
core/src/main/scala/kafka/server/KafkaApis.scala | 31 +++-----
.../kafka/api/AuthorizerIntegrationTest.scala | 14 ++++
.../scala/unit/kafka/server/KafkaApisTest.scala | 12 ++-
.../coordinator/group/GroupCoordinatorService.java | 53 ++++++------
.../group/GroupCoordinatorServiceTest.java | 61 ++++++++++++--
11 files changed, 238 insertions(+), 130 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java
index 7c41852231d..293daaadbb9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java
@@ -20,6 +20,7 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.protocol.Errors;
@@ -35,9 +36,9 @@ import java.util.stream.Collectors;
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResult {
- private final KafkaFuture<Map<TopicPartition, Errors>> future;
+ private final KafkaFuture<Map<TopicPartition, ApiException>> future;
- AlterShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>>
future) {
+ AlterShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition,
ApiException>> future) {
this.future = future;
}
@@ -54,11 +55,11 @@ public class AlterShareGroupOffsetsResult {
result.completeExceptionally(new IllegalArgumentException(
"Alter offset for partition \"" + partition + "\" was not
attempted"));
} else {
- final Errors error = topicPartitions.get(partition);
- if (error == Errors.NONE) {
+ final ApiException exception = topicPartitions.get(partition);
+ if (exception == null) {
result.complete(null);
} else {
- result.completeExceptionally(error.exception());
+ result.completeExceptionally(exception);
}
}
});
@@ -68,22 +69,22 @@ public class AlterShareGroupOffsetsResult {
/**
* Return a future which succeeds if all the alter offsets succeed.
+ * If not, the first topic error shall be returned.
*/
public KafkaFuture<Void> all() {
return this.future.thenApply(topicPartitionErrorsMap -> {
List<TopicPartition> partitionsFailed =
topicPartitionErrorsMap.entrySet()
.stream()
- .filter(e -> e.getValue() != Errors.NONE)
+ .filter(e -> e.getValue() != null)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
- for (Errors error : topicPartitionErrorsMap.values()) {
- if (error != Errors.NONE) {
- throw error.exception(
- "Failed altering share group offsets for the following
partitions: " + partitionsFailed);
+ for (ApiException exception : topicPartitionErrorsMap.values()) {
+ if (exception != null) {
+ throw Errors.forException(exception).exception(
+ "Failed altering group offsets for the following
partitions: " + partitionsFailed);
}
}
return null;
});
}
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index b283d65cbee..e1be4304950 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3804,8 +3804,10 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
- public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId,
Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) {
- SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>>
future = AlterShareGroupOffsetsHandler.newFuture(groupId);
+ public AlterShareGroupOffsetsResult alterShareGroupOffsets(final String
groupId,
+ final
Map<TopicPartition, Long> offsets,
+ final
AlterShareGroupOffsetsOptions options) {
+ SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition,
ApiException>> future = AlterShareGroupOffsetsHandler.newFuture(groupId);
AlterShareGroupOffsetsHandler handler = new
AlterShareGroupOffsetsHandler(groupId, offsets, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new
AlterShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
@@ -3821,7 +3823,9 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
- public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String
groupId, Set<String> topics, DeleteShareGroupOffsetsOptions options) {
+ public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(final String
groupId,
+ final
Set<String> topics,
+ final
DeleteShareGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<String, ApiException>> future
= DeleteShareGroupOffsetsHandler.newFuture(groupId);
DeleteShareGroupOffsetsHandler handler = new
DeleteShareGroupOffsetsHandler(groupId, topics, logContext);
invokeDriver(handler, future, options.timeoutMs);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterShareGroupOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterShareGroupOffsetsHandler.java
index f66f5972836..ef21be6b6d2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterShareGroupOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterShareGroupOffsetsHandler.java
@@ -21,8 +21,8 @@ import
org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
-import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AlterShareGroupOffsetsRequest;
@@ -33,7 +33,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -42,7 +41,7 @@ import java.util.Set;
/**
* This class is the handler for {@link
KafkaAdminClient#alterShareGroupOffsets(String, Map,
AlterShareGroupOffsetsOptions)} call
*/
-public class AlterShareGroupOffsetsHandler extends
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Errors>> {
+public class AlterShareGroupOffsetsHandler extends
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, ApiException>> {
private final CoordinatorKey groupId;
@@ -52,7 +51,6 @@ public class AlterShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coord
private final CoordinatorStrategy lookupStrategy;
-
public AlterShareGroupOffsetsHandler(String groupId, Map<TopicPartition,
Long> offsets, LogContext logContext) {
this.groupId = CoordinatorKey.byGroupId(groupId);
this.offsets = offsets;
@@ -60,8 +58,15 @@ public class AlterShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coord
this.lookupStrategy = new
CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
}
- public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey,
Map<TopicPartition, Errors>> newFuture(String groupId) {
- return
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey,
Map<TopicPartition, ApiException>> newFuture(String groupId) {
+ return
AdminApiFuture.forKeys(Set.of(CoordinatorKey.byGroupId(groupId)));
+ }
+
+ private void validateKeys(Set<CoordinatorKey> groupIds) {
+ if (!groupIds.equals(Set.of(groupId))) {
+ throw new IllegalArgumentException("Received unexpected group ids
" + groupIds +
+ " (expected only " + Set.of(groupId) + ")");
+ }
}
@Override
@@ -87,30 +92,38 @@ public class AlterShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coord
}
@Override
- public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>>
handleResponse(Node broker, Set<CoordinatorKey> keys, AbstractResponse
abstractResponse) {
+ public ApiResult<CoordinatorKey, Map<TopicPartition, ApiException>>
handleResponse(Node broker, Set<CoordinatorKey> keys, AbstractResponse
abstractResponse) {
+ validateKeys(keys);
+
AlterShareGroupOffsetsResponse response =
(AlterShareGroupOffsetsResponse) abstractResponse;
- final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
-
- for
(AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topic :
response.data().responses()) {
- for
(AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition
partition : topic.partitions()) {
- TopicPartition topicPartition = new
TopicPartition(topic.topicName(), partition.partitionIndex());
- Errors error = Errors.forCode(partition.errorCode());
-
- if (error != Errors.NONE) {
- handleError(
- groupId,
- topicPartition,
- error,
- partitionResults,
- groupsToUnmap,
- groupsToRetry
- );
- } else {
- partitionResults.put(topicPartition, error);
+ final Map<TopicPartition, ApiException> partitionResults = new
HashMap<>();
+
+ if (response.data().errorCode() != Errors.NONE.code()) {
+ final Errors topLevelError =
Errors.forCode(response.data().errorCode());
+ final String topLevelErrorMessage = response.data().errorMessage();
+
+ offsets.forEach((topicPartition, offset) ->
+ handleError(
+ groupId,
+ topicPartition,
+ topLevelError,
+ topLevelErrorMessage,
+ partitionResults,
+ groupsToUnmap,
+ groupsToRetry
+ ));
+ } else {
+ response.data().responses().forEach(topic ->
topic.partitions().forEach(partition -> {
+ if (partition.errorCode() != Errors.NONE.code()) {
+ final Errors partitionError =
Errors.forCode(partition.errorCode());
+ final String partitionErrorMessage =
partition.errorMessage();
+ log.debug("AlterShareGroupOffsets request for group id {}
and topic-partition {}-{} failed and returned error {}." +
partitionErrorMessage,
+ groupId.idValue, topic.topicName(),
partition.partitionIndex(), partitionError);
}
- }
+ partitionResults.put(new TopicPartition(topic.topicName(),
partition.partitionIndex()),
Errors.forCode(partition.errorCode()).exception(partition.errorMessage()));
+ }));
}
if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
@@ -121,23 +134,23 @@ public class AlterShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coord
}
private void handleError(
- CoordinatorKey groupId,
- TopicPartition topicPartition,
- Errors error,
- Map<TopicPartition, Errors> partitionResults,
- Set<CoordinatorKey> groupsToUnmap,
- Set<CoordinatorKey> groupsToRetry
+ CoordinatorKey groupId,
+ TopicPartition topicPartition,
+ Errors error,
+ String errorMessage,
+ Map<TopicPartition, ApiException> partitionResults,
+ Set<CoordinatorKey> groupsToUnmap,
+ Set<CoordinatorKey> groupsToRetry
) {
switch (error) {
case COORDINATOR_LOAD_IN_PROGRESS:
case REBALANCE_IN_PROGRESS:
- log.debug("AlterShareGroupOffsets request for group id {}
returned error {}. Will retry.",
- groupId.idValue, error);
+ log.debug("AlterShareGroupOffsets request for group id {}
returned error {}. Will retry." + errorMessage, groupId.idValue, error);
groupsToRetry.add(groupId);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
- log.debug("AlterShareGroupOffsets request for group id {}
returned error {}. Will rediscover the coordinator and retry.",
+ log.debug("AlterShareGroupOffsets request for group id {}
returned error {}. Will rediscover the coordinator and retry." + errorMessage,
groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
@@ -147,14 +160,12 @@ public class AlterShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coord
case UNKNOWN_SERVER_ERROR:
case KAFKA_STORAGE_ERROR:
case GROUP_AUTHORIZATION_FAILED:
- log.debug("AlterShareGroupOffsets request for group id {} and
partition {} failed due" +
- " to error {}.", groupId.idValue, topicPartition,
error);
- partitionResults.put(topicPartition, error);
+ log.debug("AlterShareGroupOffsets request for group id {}
failed due to error {}." + errorMessage, groupId.idValue, error);
+ partitionResults.put(topicPartition,
error.exception(errorMessage));
break;
default:
- log.error("AlterShareGroupOffsets request for group id {} and
partition {} failed due" +
- " to unexpected error {}.", groupId.idValue,
topicPartition, error);
- partitionResults.put(topicPartition, error);
+ log.error("AlterShareGroupOffsets request for group id {}
failed due to unexpected error {}." + errorMessage, groupId.idValue, error);
+ partitionResults.put(topicPartition,
error.exception(errorMessage));
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java
index 2eb9e37bc50..be04568e1a3 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java
@@ -53,26 +53,31 @@ public class AlterShareGroupOffsetsRequest extends
AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- Errors error = Errors.forException(e);
- return new
AlterShareGroupOffsetsResponse(getErrorResponse(throttleTimeMs, error));
+ public AlterShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs,
Throwable e) {
+ return getErrorResponse(throttleTimeMs, Errors.forException(e));
}
- public static AlterShareGroupOffsetsResponseData getErrorResponse(int
throttleTimeMs, Errors error) {
- return new AlterShareGroupOffsetsResponseData()
- .setThrottleTimeMs(throttleTimeMs)
- .setErrorCode(error.code())
- .setErrorMessage(error.message());
+ public AlterShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs,
Errors error) {
+ return getErrorResponse(throttleTimeMs, error.code(), error.message());
+ }
+
+ public AlterShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs,
short errorCode, String message) {
+ return new AlterShareGroupOffsetsResponse(
+ new AlterShareGroupOffsetsResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(errorCode)
+ .setErrorMessage(message)
+ );
}
- public static AlterShareGroupOffsetsResponseData getErrorResponse(Errors
error) {
- return getErrorResponse(error.code(), error.message());
+ public static AlterShareGroupOffsetsResponseData
getErrorResponseData(Errors error) {
+ return getErrorResponseData(error, null);
}
- public static AlterShareGroupOffsetsResponseData getErrorResponse(short
errorCode, String errorMessage) {
+ public static AlterShareGroupOffsetsResponseData
getErrorResponseData(Errors error, String errorMessage) {
return new AlterShareGroupOffsetsResponseData()
- .setErrorCode(errorCode)
- .setErrorMessage(errorMessage);
+ .setErrorCode(error.code())
+ .setErrorMessage(errorMessage == null ? error.message() :
errorMessage);
}
public static AlterShareGroupOffsetsRequest parse(Readable readable, short
version) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
index bec0077b9b3..1e28115bada 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
@@ -80,12 +80,18 @@ public class DeleteShareGroupOffsetsRequest extends
AbstractRequest {
}
public static DeleteShareGroupOffsetsResponseData
getErrorDeleteResponseData(Errors error) {
- return getErrorDeleteResponseData(error.code(), error.message());
+ return getErrorDeleteResponseData(error, null);
}
public static DeleteShareGroupOffsetsResponseData
getErrorDeleteResponseData(short errorCode, String errorMessage) {
return new DeleteShareGroupOffsetsResponseData()
.setErrorCode(errorCode)
- .setErrorMessage(errorMessage);
+ .setErrorMessage(errorMessage == null ?
Errors.forCode(errorCode).message() : errorMessage);
+ }
+
+ public static DeleteShareGroupOffsetsResponseData
getErrorDeleteResponseData(Errors error, String errorMessage) {
+ return new DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(error.code())
+ .setErrorMessage(errorMessage == null ? error.message() :
errorMessage);
}
}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 1d516cf6648..1098078b582 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -56,7 +56,6 @@ import
org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.DuplicateVoterException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
@@ -11351,6 +11350,28 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testAlterShareGroupOffsetsWithTopLevelError() throws Exception
{
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ AlterShareGroupOffsetsResponseData data = new
AlterShareGroupOffsetsResponseData().setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code()).setErrorMessage("Group
authorization failed.");
+
+ TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
+ TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
+ TopicPartition barPartition0 = new TopicPartition("bar", 0);
+ TopicPartition zooTopicPartition0 = new TopicPartition("zoo", 0);
+
+ env.kafkaClient().prepareResponse(new
AlterShareGroupOffsetsResponse(data));
+ final AlterShareGroupOffsetsResult result =
env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0,
1L, fooTopicPartition1, 2L, barPartition0, 1L));
+
+ TestUtils.assertFutureThrows(GroupAuthorizationException.class,
result.all());
+ TestUtils.assertFutureThrows(GroupAuthorizationException.class,
result.partitionResult(fooTopicPartition1));
+ TestUtils.assertFutureThrows(IllegalArgumentException.class,
result.partitionResult(zooTopicPartition0));
+ }
+ }
+
@Test
public void testAlterShareGroupOffsetsWithErrorInOnePartition() throws
Exception {
try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
@@ -11359,7 +11380,8 @@ public class KafkaAdminClientTest {
AlterShareGroupOffsetsResponseData data = new
AlterShareGroupOffsetsResponseData().setResponses(
new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(List.of(
- new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0),
new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.NON_EMPTY_GROUP.code()).setErrorMessage("The
group is not empty"))),
+ new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0),
+ new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code()).setErrorMessage("Topic
authorization failed."))),
new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
).iterator())
);
@@ -11371,9 +11393,9 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new
AlterShareGroupOffsetsResponse(data));
final AlterShareGroupOffsetsResult result =
env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0,
1L, fooTopicPartition1, 2L, barPartition0, 1L));
- TestUtils.assertFutureThrows(GroupNotEmptyException.class,
result.all());
+ TestUtils.assertFutureThrows(TopicAuthorizationException.class,
result.all());
assertNull(result.partitionResult(fooTopicPartition0).get());
- TestUtils.assertFutureThrows(GroupNotEmptyException.class,
result.partitionResult(fooTopicPartition1));
+ TestUtils.assertFutureThrows(TopicAuthorizationException.class,
result.partitionResult(fooTopicPartition1));
assertNull(result.partitionResult(barPartition0).get());
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5eb249c54d6..21edc36c13d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3756,7 +3756,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val groupId = alterShareGroupOffsetsRequest.data.groupId
if (!isShareGroupProtocolEnabled) {
- requestHelper.sendMaybeThrottle(request,
alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
Errors.UNSUPPORTED_VERSION.exception))
+ requestHelper.sendMaybeThrottle(request,
alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
return CompletableFuture.completedFuture[Unit](())
} else if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
requestHelper.sendMaybeThrottle(request,
alterShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
@@ -3766,9 +3766,9 @@ class KafkaApis(val requestChannel: RequestChannel,
alterShareGroupOffsetsRequest.data.topics.forEach(topic => {
val topicError = {
- if (!authHelper.authorize(request.context, READ, TOPIC,
topic.topicName())) {
+ if (!authHelper.authorize(request.context, READ, TOPIC,
topic.topicName)) {
Some(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED))
- } else if (!metadataCache.contains(topic.topicName())) {
+ } else if (!metadataCache.contains(topic.topicName)) {
Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))
} else {
None
@@ -3776,9 +3776,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}
topicError match {
case Some(error) =>
- topic.partitions().forEach(partition =>
responseBuilder.addPartition(topic.topicName(), partition.partitionIndex(),
metadataCache.topicNamesToIds(), error.error))
+ topic.partitions.forEach(partition =>
responseBuilder.addPartition(topic.topicName, partition.partitionIndex,
metadataCache.topicNamesToIds, error.error))
case None =>
- authorizedTopicPartitions.add(topic)
+ authorizedTopicPartitions.add(topic.duplicate)
}
})
@@ -3792,8 +3792,10 @@ class KafkaApis(val requestChannel: RequestChannel,
).handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request,
alterShareGroupOffsetsRequest.getErrorResponse(exception))
+ } else if (response.errorCode != Errors.NONE.code) {
+ requestHelper.sendMaybeThrottle(request,
alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
response.errorCode, response.errorMessage))
} else {
- requestHelper.sendMaybeThrottle(request,
responseBuilder.merge(response, metadataCache.topicNamesToIds()).build())
+ requestHelper.sendMaybeThrottle(request,
responseBuilder.merge(response, metadataCache.topicNamesToIds).build())
}
}
}
@@ -3824,22 +3826,13 @@ class KafkaApis(val requestChannel: RequestChannel,
new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
- .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
+ .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
)
} else {
authorizedTopics.add(topic)
}
}
- if (authorizedTopics.isEmpty) {
- requestHelper.sendMaybeThrottle(
- request,
- new DeleteShareGroupOffsetsResponse(
- new DeleteShareGroupOffsetsResponseData()
- .setResponses(deleteShareGroupOffsetsResponseTopics)))
- return CompletableFuture.completedFuture[Unit](())
- }
-
groupCoordinator.deleteShareGroupOffsets(
request.context,
new
DeleteShareGroupOffsetsRequestData().setGroupId(groupId).setTopics(authorizedTopics)
@@ -3847,12 +3840,12 @@ class KafkaApis(val requestChannel: RequestChannel,
if (exception != null) {
requestHelper.sendMaybeThrottle(request,
deleteShareGroupOffsetsRequest.getErrorResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
- Errors.forException(exception).code(),
- exception.getMessage()))
+ Errors.forException(exception).code,
+ exception.getMessage))
} else if (responseData.errorCode() != Errors.NONE.code) {
requestHelper.sendMaybeThrottle(
request,
-
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
responseData.errorCode(), responseData.errorMessage())
+
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
responseData.errorCode, responseData.errorMessage)
)
} else {
responseData.responses.forEach { topic => {
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 424772275ea..f950362354c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -3254,6 +3254,18 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
removeAllClientAcls()
}
+ private def createEmptyShareGroup(): Unit = {
+ createTopicWithBrokerPrincipal(topic)
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), shareGroupResource)
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), topicResource)
+ shareConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroup)
+ val consumer = createShareConsumer()
+ consumer.subscribe(util.Set.of(topic))
+ consumer.poll(Duration.ofMillis(500L))
+ consumer.close()
+ removeAllClientAcls()
+ }
+
@Test
def testShareGroupDescribeWithGroupDescribeAndTopicDescribeAcl(): Unit = {
createShareGroupToDescribe()
@@ -3614,6 +3626,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
@Test
def testDeleteShareGroupOffsetsWithoutTopicReadAcl(): Unit = {
+ createEmptyShareGroup()
addAndVerifyAcls(shareGroupDeleteAcl(shareGroupResource),
shareGroupResource)
val request = deleteShareGroupOffsetsRequest
@@ -3663,6 +3676,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
@Test
def testAlterShareGroupOffsetsWithoutTopicReadAcl(): Unit = {
+ createEmptyShareGroup()
addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
val request = alterShareGroupOffsetsRequest
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a6c26589635..e3a396ba9d5 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -12863,10 +12863,18 @@ class KafkaApisTest extends Logging {
def testDeleteShareGroupOffsetsRequestEmptyTopicsSuccess(): Unit = {
metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
- val deleteShareGroupOffsetsRequest = new
DeleteShareGroupOffsetsRequestData()
+ val deleteShareGroupOffsetsRequestData = new
DeleteShareGroupOffsetsRequestData()
.setGroupId("group")
- val requestChannelRequest = buildRequest(new
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest).build)
+ val requestChannelRequest = buildRequest(new
DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData).build)
+
+ val groupCoordinatorResponse: DeleteShareGroupOffsetsResponseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.NONE.code())
+
+ when(groupCoordinator.deleteShareGroupOffsets(
+ requestChannelRequest.context,
+ deleteShareGroupOffsetsRequestData
+ )).thenReturn(CompletableFuture.completedFuture(groupCoordinatorResponse))
val resultFuture = new
CompletableFuture[DeleteShareGroupOffsetsResponseData]
kafkaApis = createKafkaApis()
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 099201ecabb..812853a263f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -698,6 +698,10 @@ public class GroupCoordinatorService implements
GroupCoordinator {
InitializeShareGroupStateParameters request,
AlterShareGroupOffsetsResponseData response
) {
+ if (request.groupTopicPartitionData().topicsData().isEmpty()) {
+ return CompletableFuture.completedFuture(response);
+ }
+
return persister.initializeState(request)
.handle((result, exp) -> {
if (exp == null) {
@@ -1233,16 +1237,20 @@ public class GroupCoordinatorService implements
GroupCoordinator {
* See {@link
GroupCoordinator#alterShareGroupOffsets(AuthorizableRequestContext, String,
AlterShareGroupOffsetsRequestData)}.
*/
@Override
- public CompletableFuture<AlterShareGroupOffsetsResponseData>
alterShareGroupOffsets(AuthorizableRequestContext context, String groupId,
AlterShareGroupOffsetsRequestData request) {
+ public CompletableFuture<AlterShareGroupOffsetsResponseData>
alterShareGroupOffsets(
+ AuthorizableRequestContext context,
+ String groupId,
+ AlterShareGroupOffsetsRequestData request
+ ) {
if (!isActive.get() || metadataImage == null) {
- return
CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponse(Errors.COORDINATOR_NOT_AVAILABLE));
+ return
CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
}
if (groupId == null || groupId.isEmpty()) {
- return
CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponse(Errors.INVALID_GROUP_ID));
+ return
CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponseData(Errors.INVALID_GROUP_ID));
}
- if (request.topics() == null || request.topics().isEmpty()) {
+ if (request.topics() == null) {
return CompletableFuture.completedFuture(new
AlterShareGroupOffsetsResponseData());
}
@@ -1257,7 +1265,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
"share-group-offsets-alter",
request,
exception,
- (error, message) ->
AlterShareGroupOffsetsRequest.getErrorResponse(error),
+ (error, message) ->
AlterShareGroupOffsetsRequest.getErrorResponseData(error, message),
log
));
}
@@ -1822,26 +1830,18 @@ public class GroupCoordinatorService implements
GroupCoordinator {
AuthorizableRequestContext context,
DeleteShareGroupOffsetsRequestData requestData
) {
- if (!isActive.get()) {
- return CompletableFuture.completedFuture(
-
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
- }
-
- if (metadataImage == null) {
- return CompletableFuture.completedFuture(
-
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
+ if (!isActive.get() || metadataImage == null) {
+ return
CompletableFuture.completedFuture(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
}
String groupId = requestData.groupId();
if (!isGroupIdNotEmpty(groupId)) {
- return CompletableFuture.completedFuture(
-
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
+ return
CompletableFuture.completedFuture(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
}
- if (requestData.topics() == null || requestData.topics().isEmpty()) {
- return CompletableFuture.completedFuture(
- new DeleteShareGroupOffsetsResponseData()
+ if (requestData.topics() == null) {
+ return CompletableFuture.completedFuture(new
DeleteShareGroupOffsetsResponseData()
);
}
@@ -1850,15 +1850,14 @@ public class GroupCoordinatorService implements
GroupCoordinator {
topicPartitionFor(groupId),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator ->
coordinator.initiateDeleteShareGroupOffsets(groupId, requestData)
- )
- .thenCompose(resultHolder -> deleteShareGroupOffsetsState(groupId,
resultHolder))
- .exceptionally(exception -> handleOperationException(
- "initiate-delete-share-group-offsets",
- groupId,
- exception,
- (error, __) ->
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error),
- log
- ));
+ ).thenCompose(resultHolder -> deleteShareGroupOffsetsState(groupId,
resultHolder)
+ ).exceptionally(exception -> handleOperationException(
+ "initiate-delete-share-group-offsets",
+ groupId,
+ exception,
+ (error, message) ->
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error, message),
+ log
+ ));
}
private CompletableFuture<DeleteShareGroupOffsetsResponseData>
deleteShareGroupOffsetsState(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 01c87696053..4517d0cb8f3 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -4280,10 +4280,26 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> 1);
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("share-group-id");
+ .setGroupId("share-group-id")
+ .setTopics(List.of());
DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData();
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
deleteShareGroupOffsetsResultHolder =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE.code(),
+ null,
+ null,
+ null
+ );
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
+
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
@@ -4291,7 +4307,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
- public void testDeleteShareGroupOffsetsNullTopicsInRequest() throws
InterruptedException, ExecutionException {
+ public void testDeleteShareGroupOffsetsEmptyTopicsInRequest() throws
InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -4303,10 +4319,25 @@ public class GroupCoordinatorServiceTest {
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
- .setTopics(null);
+ .setTopics(List.of());
DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData();
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
deleteShareGroupOffsetsResultHolder =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE.code(),
+ null,
+ null,
+ null
+ );
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
+
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
@@ -4400,9 +4431,7 @@ public class GroupCoordinatorServiceTest {
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
- .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
- .setTopicName(TOPIC_NAME)
- ));
+ .setTopics(List.of());
DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
@@ -5376,13 +5405,29 @@ public class GroupCoordinatorServiceTest {
AlterShareGroupOffsetsRequestData request = new
AlterShareGroupOffsetsRequestData()
.setGroupId(groupId);
+ AlterShareGroupOffsetsResponseData data = new
AlterShareGroupOffsetsResponseData();
+
+ Map.Entry<AlterShareGroupOffsetsResponseData,
InitializeShareGroupStateParameters> alterShareGroupOffsetsIntermediate =
+ Map.entry(
+ new AlterShareGroupOffsetsResponseData()
+ .setResponses(new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection()),
+ new InitializeShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData<>("share-group", List.of()))
+ .build());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("share-group-offsets-alter"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(alterShareGroupOffsetsIntermediate));
+
CompletableFuture<AlterShareGroupOffsetsResponseData> future =
service.alterShareGroupOffsets(
requestContext(ApiKeys.ALTER_SHARE_GROUP_OFFSETS),
groupId,
request
);
- AlterShareGroupOffsetsResponseData data = new
AlterShareGroupOffsetsResponseData();
assertEquals(data, future.get());
}
@@ -5416,7 +5461,7 @@ public class GroupCoordinatorServiceTest {
AlterShareGroupOffsetsResponseData response = new
AlterShareGroupOffsetsResponseData()
.setErrorCode(Errors.NON_EMPTY_GROUP.code())
- .setErrorMessage(Errors.NON_EMPTY_GROUP.message());
+ .setErrorMessage("bad stuff");
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("share-group-offsets-alter"),