This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 4b7966dde8b KAFKA-19439 OffsetFetch API does not return group level
errors correctly with version 1 (#21063)
4b7966dde8b is described below
commit 4b7966dde8b77ecf229fe10d9e10df274acd915a
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Dec 3 17:56:41 2025 +0800
KAFKA-19439 OffsetFetch API does not return group level errors correctly
with version 1 (#21063)
The OffsetFetch API does not support top level errors in version 1.
Hence, the top level error must be returned at the partition level.
Side note: It is a tad annoying that we create error response in
multiple places (e.g. KafkaApis, Group CoordinatorService). There were a
reason for this but I cannot remember.
Reviewers: PoAn Yang <[email protected]>, Dongnuo Lyu
<[email protected]>, Sean Quah <[email protected]>, Ken Huang
<[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai
<[email protected]>
---------
Co-authored-by: David Jacot <[email protected]>
---
.../kafka/common/requests/OffsetFetchRequest.java | 2 +
.../kafka/common/requests/OffsetFetchResponse.java | 52 ++++++++++++-------
.../common/requests/OffsetFetchResponseTest.java | 45 ++++++++++++++++
core/src/main/scala/kafka/server/KafkaApis.scala | 60 ++++++++++++----------
.../unit/kafka/server/OffsetFetchRequestTest.scala | 39 ++++++++++++++
.../coordinator/group/GroupCoordinatorService.java | 37 +++++++------
6 files changed, 173 insertions(+), 62 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 7ece0700bfa..57de0ee16ae 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -41,6 +41,8 @@ import java.util.stream.Collectors;
public class OffsetFetchRequest extends AbstractRequest {
+ public static final short TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION = 2;
+
private static final Logger log =
LoggerFactory.getLogger(OffsetFetchRequest.class);
private static final List<OffsetFetchRequestTopic> ALL_TOPIC_PARTITIONS =
null;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 82b6cdb0979..e07d3550b86 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
@@ -40,6 +41,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
import static
org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
+import static
org.apache.kafka.common.requests.OffsetFetchRequest.TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION;
/**
* Possible error codes:
@@ -231,29 +233,14 @@ public class OffsetFetchResponse extends AbstractResponse
{
OffsetFetchResponseTopic newTopic = new
OffsetFetchResponseTopic().setName(topic.name());
data.topics().add(newTopic);
- topic.partitions().forEach(partition -> {
- OffsetFetchResponsePartition newPartition;
-
- if (version < 2 && group.errorCode() !=
Errors.NONE.code()) {
- // Versions prior to version 2 do not support a top
level error. Therefore,
- // we put it at the partition level.
- newPartition = new OffsetFetchResponsePartition()
- .setPartitionIndex(partition.partitionIndex())
- .setErrorCode(group.errorCode())
- .setCommittedOffset(INVALID_OFFSET)
- .setMetadata(NO_METADATA)
-
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH);
- } else {
- newPartition = new OffsetFetchResponsePartition()
+ topic.partitions().forEach(partition ->
+ newTopic.partitions().add(new
OffsetFetchResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(partition.errorCode())
.setCommittedOffset(partition.committedOffset())
.setMetadata(partition.metadata())
-
.setCommittedLeaderEpoch(partition.committedLeaderEpoch());
- }
-
- newTopic.partitions().add(newPartition);
- });
+
.setCommittedLeaderEpoch(partition.committedLeaderEpoch()))
+ );
});
}
}
@@ -403,4 +390,31 @@ public class OffsetFetchResponse extends AbstractResponse {
public boolean shouldClientThrottle(short version) {
return version >= 4;
}
+
+ public static OffsetFetchResponseData.OffsetFetchResponseGroup groupError(
+ OffsetFetchRequestData.OffsetFetchRequestGroup group,
+ Errors error,
+ int version
+ ) {
+ if (version >= TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) {
+ return new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(group.groupId())
+ .setErrorCode(error.code());
+ } else {
+ return new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(group.groupId())
+ .setTopics(group.topics().stream().map(topic ->
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName(topic.name())
+
.setPartitions(topic.partitionIndexes().stream().map(partition ->
+ new
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code())
+ .setCommittedOffset(INVALID_OFFSET)
+ .setMetadata(NO_METADATA)
+
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH)
+ ).collect(Collectors.toList()))
+ ).collect(Collectors.toList()));
+ }
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
index c85d26dac57..3992c6a58ee 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
@@ -29,16 +30,21 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import static
org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
import static
org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
+import static
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
+import static org.apache.kafka.common.requests.OffsetFetchResponse.NO_METADATA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -439,4 +445,43 @@ public class OffsetFetchResponseTest {
.setThrottleTimeMs(throttleTimeMs);
assertEquals(expectedData, response.data());
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+ public void testSingleGroupWithError(short version) {
+ OffsetFetchRequestData.OffsetFetchRequestGroup group = new
OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group1")
+ .setTopics(Collections.singletonList(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(Collections.singletonList(0))
+ ));
+
+ if (version < 2) {
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group1")
+ .setTopics(Collections.singletonList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Collections.singletonList(
+ new
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+
.setErrorCode(Errors.INVALID_GROUP_ID.code())
+ .setCommittedOffset(INVALID_OFFSET)
+ .setMetadata(NO_METADATA)
+
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH)
+ ))
+ )),
+ OffsetFetchResponse.groupError(group, Errors.INVALID_GROUP_ID,
version)
+ );
+ } else {
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group1")
+ .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+ OffsetFetchResponse.groupError(group, Errors.INVALID_GROUP_ID,
version)
+ );
+ }
+ }
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1a813b90942..44df00510b6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1527,9 +1527,11 @@ class KafkaApis(val requestChannel: RequestChannel,
groups.forEach { groupOffsetFetch =>
val isAllPartitions = groupOffsetFetch.topics == null
if (!authHelper.authorize(request.context, DESCRIBE, GROUP,
groupOffsetFetch.groupId)) {
- futures += CompletableFuture.completedFuture(new
OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(groupOffsetFetch.groupId)
- .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
+ futures +=
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+ groupOffsetFetch,
+ Errors.GROUP_AUTHORIZATION_FAILED,
+ request.header.apiVersion()
+ ))
} else if (isAllPartitions) {
futures += fetchAllOffsetsForGroup(
request.context,
@@ -1554,36 +1556,38 @@ class KafkaApis(val requestChannel: RequestChannel,
private def fetchAllOffsetsForGroup(
requestContext: RequestContext,
- offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
+ groupFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
requireStable: Boolean
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
groupCoordinator.fetchAllOffsets(
requestContext,
- offsetFetchRequest,
+ groupFetchRequest,
requireStable
- ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] {
(offsetFetchResponse, exception) =>
+ ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] {
(groupFetchResponse, exception) =>
if (exception != null) {
- new OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(offsetFetchRequest.groupId)
- .setErrorCode(Errors.forException(exception).code)
- } else if (offsetFetchResponse.errorCode() != Errors.NONE.code) {
- offsetFetchResponse
+ OffsetFetchResponse.groupError(
+ groupFetchRequest,
+ Errors.forException(exception),
+ requestContext.apiVersion()
+ )
+ } else if (groupFetchResponse.errorCode() != Errors.NONE.code) {
+ groupFetchResponse
} else {
// Clients are not allowed to see offsets for topics that are not
authorized for Describe.
val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized(
requestContext,
DESCRIBE,
TOPIC,
- offsetFetchResponse.topics.asScala
+ groupFetchResponse.topics.asScala
)(_.name)
- offsetFetchResponse.setTopics(authorizedOffsets.asJava)
+ groupFetchResponse.setTopics(authorizedOffsets.asJava)
}
}
}
private def fetchOffsetsForGroup(
requestContext: RequestContext,
- offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
+ groupFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
requireStable: Boolean
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
// Clients are not allowed to see offsets for topics that are not
authorized for Describe.
@@ -1591,29 +1595,31 @@ class KafkaApis(val requestChannel: RequestChannel,
requestContext,
DESCRIBE,
TOPIC,
- offsetFetchRequest.topics.asScala
+ groupFetchRequest.topics.asScala
)(_.name)
groupCoordinator.fetchOffsets(
requestContext,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
- .setGroupId(offsetFetchRequest.groupId)
- .setMemberId(offsetFetchRequest.memberId)
- .setMemberEpoch(offsetFetchRequest.memberEpoch)
+ .setGroupId(groupFetchRequest.groupId)
+ .setMemberId(groupFetchRequest.memberId)
+ .setMemberEpoch(groupFetchRequest.memberEpoch)
.setTopics(authorizedTopics.asJava),
requireStable
- ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] {
(offsetFetchResponse, exception) =>
+ ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] {
(groupFetchResponse, exception) =>
if (exception != null) {
- new OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(offsetFetchRequest.groupId)
- .setErrorCode(Errors.forException(exception).code)
- } else if (offsetFetchResponse.errorCode() != Errors.NONE.code) {
- offsetFetchResponse
+ OffsetFetchResponse.groupError(
+ groupFetchRequest,
+ Errors.forException(exception),
+ requestContext.apiVersion()
+ )
+ } else if (groupFetchResponse.errorCode() != Errors.NONE.code) {
+ groupFetchResponse
} else {
val topics = new
util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics](
- offsetFetchResponse.topics.size + unauthorizedTopics.size
+ groupFetchResponse.topics.size + unauthorizedTopics.size
)
- topics.addAll(offsetFetchResponse.topics)
+ topics.addAll(groupFetchResponse.topics)
unauthorizedTopics.foreach { topic =>
val topicResponse = new
OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name)
topic.partitionIndexes.forEach { partitionIndex =>
@@ -1624,7 +1630,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
topics.add(topicResponse)
}
- offsetFetchResponse.setTopics(topics)
+ groupFetchResponse.setTopics(topics)
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 67f5cfb5492..fc43dc31c53 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -561,4 +561,43 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
)
}
}
+
+ @ClusterTest
+ def testGroupErrors(): Unit = {
+ // Start from version 1 because version 0 goes to ZK.
+ for (version <- 1 to
ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) {
+ assertEquals(
+ if (version >= 2) {
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("unknown")
+ .setErrorCode(Errors.NOT_COORDINATOR.code)
+ } else {
+ // Version 1 does not support group level errors. Hence, the error is
+ // returned at the partition level.
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("unknown")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NOT_COORDINATOR.code)
+ .setCommittedOffset(-1)
+ .setCommittedLeaderEpoch(-1)
+ .setMetadata("")
+ ).asJava)
+ ).asJava)
+ },
+ fetchOffsets(
+ groupId = "unknown",
+ memberId = null,
+ memberEpoch = -1,
+ partitions = List(new TopicPartition("foo", 0)),
+ requireStable = false,
+ version = version.toShort
+ )
+ )
+ }
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index a7f7375f089..20ffb390f71 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -53,6 +53,7 @@ import
org.apache.kafka.common.requests.ConsumerGroupDescribeRequest;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
@@ -753,18 +754,20 @@ public class GroupCoordinatorService implements
GroupCoordinator {
boolean requireStable
) {
if (!isActive.get()) {
- return CompletableFuture.completedFuture(new
OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(request.groupId())
- .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
- );
+ return
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+ request,
+ Errors.COORDINATOR_NOT_AVAILABLE,
+ context.requestVersion()
+ ));
}
// For backwards compatibility, we support fetch commits for the empty
group id.
if (request.groupId() == null) {
- return CompletableFuture.completedFuture(new
OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(request.groupId())
- .setErrorCode(Errors.INVALID_GROUP_ID.code())
- );
+ return
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+ request,
+ Errors.INVALID_GROUP_ID,
+ context.requestVersion()
+ ));
}
// The require stable flag when set tells the broker to hold on
returning unstable
@@ -804,18 +807,20 @@ public class GroupCoordinatorService implements
GroupCoordinator {
boolean requireStable
) {
if (!isActive.get()) {
- return CompletableFuture.completedFuture(new
OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(request.groupId())
- .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
- );
+ return
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+ request,
+ Errors.COORDINATOR_NOT_AVAILABLE,
+ context.requestVersion()
+ ));
}
// For backwards compatibility, we support fetch commits for the empty
group id.
if (request.groupId() == null) {
- return CompletableFuture.completedFuture(new
OffsetFetchResponseData.OffsetFetchResponseGroup()
- .setGroupId(request.groupId())
- .setErrorCode(Errors.INVALID_GROUP_ID.code())
- );
+ return
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+ request,
+ Errors.INVALID_GROUP_ID,
+ context.requestVersion()
+ ));
}
// The require stable flag when set tells the broker to hold on
returning unstable