This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 4b7966dde8b KAFKA-19439 OffsetFetch API does not return group level 
errors correctly with version 1 (#21063)
4b7966dde8b is described below

commit 4b7966dde8b77ecf229fe10d9e10df274acd915a
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Dec 3 17:56:41 2025 +0800

    KAFKA-19439 OffsetFetch API does not return group level errors correctly 
with version 1 (#21063)
    
    The OffsetFetch API does not support top level errors in version 1.
    Hence, the top level error must be returned at the partition level.
    
    Side note: It is a tad annoying that we create error response in
    multiple places (e.g. KafkaApis, Group CoordinatorService). There were a
    reason for this but I cannot remember.
    
    Reviewers: PoAn Yang <[email protected]>, Dongnuo Lyu
    <[email protected]>, Sean Quah <[email protected]>, Ken Huang
    <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai
    <[email protected]>
    
    ---------
    
    Co-authored-by: David Jacot <[email protected]>
---
 .../kafka/common/requests/OffsetFetchRequest.java  |  2 +
 .../kafka/common/requests/OffsetFetchResponse.java | 52 ++++++++++++-------
 .../common/requests/OffsetFetchResponseTest.java   | 45 ++++++++++++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   | 60 ++++++++++++----------
 .../unit/kafka/server/OffsetFetchRequestTest.scala | 39 ++++++++++++++
 .../coordinator/group/GroupCoordinatorService.java | 37 +++++++------
 6 files changed, 173 insertions(+), 62 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 7ece0700bfa..57de0ee16ae 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -41,6 +41,8 @@ import java.util.stream.Collectors;
 
 public class OffsetFetchRequest extends AbstractRequest {
 
+    public static final short TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION = 2;
+
     private static final Logger log = 
LoggerFactory.getLogger(OffsetFetchRequest.class);
 
     private static final List<OffsetFetchRequestTopic> ALL_TOPIC_PARTITIONS = 
null;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 82b6cdb0979..e07d3550b86 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
@@ -40,6 +41,7 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetFetchRequest.TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION;
 
 /**
  * Possible error codes:
@@ -231,29 +233,14 @@ public class OffsetFetchResponse extends AbstractResponse 
{
                 OffsetFetchResponseTopic newTopic = new 
OffsetFetchResponseTopic().setName(topic.name());
                 data.topics().add(newTopic);
 
-                topic.partitions().forEach(partition -> {
-                    OffsetFetchResponsePartition newPartition;
-
-                    if (version < 2 && group.errorCode() != 
Errors.NONE.code()) {
-                        // Versions prior to version 2 do not support a top 
level error. Therefore,
-                        // we put it at the partition level.
-                        newPartition = new OffsetFetchResponsePartition()
-                            .setPartitionIndex(partition.partitionIndex())
-                            .setErrorCode(group.errorCode())
-                            .setCommittedOffset(INVALID_OFFSET)
-                            .setMetadata(NO_METADATA)
-                            
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH);
-                    } else {
-                        newPartition = new OffsetFetchResponsePartition()
+                topic.partitions().forEach(partition ->
+                    newTopic.partitions().add(new 
OffsetFetchResponsePartition()
                             .setPartitionIndex(partition.partitionIndex())
                             .setErrorCode(partition.errorCode())
                             .setCommittedOffset(partition.committedOffset())
                             .setMetadata(partition.metadata())
-                            
.setCommittedLeaderEpoch(partition.committedLeaderEpoch());
-                    }
-
-                    newTopic.partitions().add(newPartition);
-                });
+                            
.setCommittedLeaderEpoch(partition.committedLeaderEpoch()))
+                );
             });
         }
     }
@@ -403,4 +390,31 @@ public class OffsetFetchResponse extends AbstractResponse {
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
+
+    public static OffsetFetchResponseData.OffsetFetchResponseGroup groupError(
+        OffsetFetchRequestData.OffsetFetchRequestGroup group,
+        Errors error,
+        int version
+    ) {
+        if (version >= TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) {
+            return new OffsetFetchResponseData.OffsetFetchResponseGroup()
+                .setGroupId(group.groupId())
+                .setErrorCode(error.code());
+        } else {
+            return new OffsetFetchResponseData.OffsetFetchResponseGroup()
+                .setGroupId(group.groupId())
+                .setTopics(group.topics().stream().map(topic ->
+                    new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                        .setName(topic.name())
+                        
.setPartitions(topic.partitionIndexes().stream().map(partition ->
+                            new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                                .setPartitionIndex(partition)
+                                .setErrorCode(error.code())
+                                .setCommittedOffset(INVALID_OFFSET)
+                                .setMetadata(NO_METADATA)
+                                
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH)
+                        ).collect(Collectors.toList()))
+                ).collect(Collectors.toList()));
+        }
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
index c85d26dac57..3992c6a58ee 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
 import 
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
@@ -29,16 +30,21 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
 import static 
org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
+import static 
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
+import static org.apache.kafka.common.requests.OffsetFetchResponse.NO_METADATA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -439,4 +445,43 @@ public class OffsetFetchResponseTest {
                 .setThrottleTimeMs(throttleTimeMs);
         assertEquals(expectedData, response.data());
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+    public void testSingleGroupWithError(short version) {
+        OffsetFetchRequestData.OffsetFetchRequestGroup group = new 
OffsetFetchRequestData.OffsetFetchRequestGroup()
+            .setGroupId("group1")
+            .setTopics(Collections.singletonList(
+                new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName("foo")
+                    .setPartitionIndexes(Collections.singletonList(0))
+            ));
+
+        if (version < 2) {
+            assertEquals(
+                new OffsetFetchResponseData.OffsetFetchResponseGroup()
+                    .setGroupId("group1")
+                    .setTopics(Collections.singletonList(
+                        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                            .setName("foo")
+                            .setPartitions(Collections.singletonList(
+                                new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                                    .setPartitionIndex(0)
+                                    
.setErrorCode(Errors.INVALID_GROUP_ID.code())
+                                    .setCommittedOffset(INVALID_OFFSET)
+                                    .setMetadata(NO_METADATA)
+                                    
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH)
+                            ))
+                    )),
+                OffsetFetchResponse.groupError(group, Errors.INVALID_GROUP_ID, 
version)
+            );
+        } else {
+            assertEquals(
+                new OffsetFetchResponseData.OffsetFetchResponseGroup()
+                    .setGroupId("group1")
+                    .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+                OffsetFetchResponse.groupError(group, Errors.INVALID_GROUP_ID, 
version)
+            );
+        }
+    }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1a813b90942..44df00510b6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1527,9 +1527,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     groups.forEach { groupOffsetFetch =>
       val isAllPartitions = groupOffsetFetch.topics == null
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, 
groupOffsetFetch.groupId)) {
-        futures += CompletableFuture.completedFuture(new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
-          .setGroupId(groupOffsetFetch.groupId)
-          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
+        futures += 
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+          groupOffsetFetch,
+          Errors.GROUP_AUTHORIZATION_FAILED,
+          request.header.apiVersion()
+        ))
       } else if (isAllPartitions) {
         futures += fetchAllOffsetsForGroup(
           request.context,
@@ -1554,36 +1556,38 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   private def fetchAllOffsetsForGroup(
     requestContext: RequestContext,
-    offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
+    groupFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
     requireStable: Boolean
   ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
     groupCoordinator.fetchAllOffsets(
       requestContext,
-      offsetFetchRequest,
+      groupFetchRequest,
       requireStable
-    ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { 
(offsetFetchResponse, exception) =>
+    ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { 
(groupFetchResponse, exception) =>
       if (exception != null) {
-        new OffsetFetchResponseData.OffsetFetchResponseGroup()
-          .setGroupId(offsetFetchRequest.groupId)
-          .setErrorCode(Errors.forException(exception).code)
-      } else if (offsetFetchResponse.errorCode() != Errors.NONE.code) {
-        offsetFetchResponse
+        OffsetFetchResponse.groupError(
+          groupFetchRequest,
+          Errors.forException(exception),
+          requestContext.apiVersion()
+        )
+      } else if (groupFetchResponse.errorCode() != Errors.NONE.code) {
+        groupFetchResponse
       } else {
         // Clients are not allowed to see offsets for topics that are not 
authorized for Describe.
         val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized(
           requestContext,
           DESCRIBE,
           TOPIC,
-          offsetFetchResponse.topics.asScala
+          groupFetchResponse.topics.asScala
         )(_.name)
-        offsetFetchResponse.setTopics(authorizedOffsets.asJava)
+        groupFetchResponse.setTopics(authorizedOffsets.asJava)
       }
     }
   }
 
   private def fetchOffsetsForGroup(
     requestContext: RequestContext,
-    offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
+    groupFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
     requireStable: Boolean
   ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
     // Clients are not allowed to see offsets for topics that are not 
authorized for Describe.
@@ -1591,29 +1595,31 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestContext,
       DESCRIBE,
       TOPIC,
-      offsetFetchRequest.topics.asScala
+      groupFetchRequest.topics.asScala
     )(_.name)
 
     groupCoordinator.fetchOffsets(
       requestContext,
       new OffsetFetchRequestData.OffsetFetchRequestGroup()
-        .setGroupId(offsetFetchRequest.groupId)
-        .setMemberId(offsetFetchRequest.memberId)
-        .setMemberEpoch(offsetFetchRequest.memberEpoch)
+        .setGroupId(groupFetchRequest.groupId)
+        .setMemberId(groupFetchRequest.memberId)
+        .setMemberEpoch(groupFetchRequest.memberEpoch)
         .setTopics(authorizedTopics.asJava),
       requireStable
-    ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { 
(offsetFetchResponse, exception) =>
+    ).handle[OffsetFetchResponseData.OffsetFetchResponseGroup] { 
(groupFetchResponse, exception) =>
       if (exception != null) {
-        new OffsetFetchResponseData.OffsetFetchResponseGroup()
-          .setGroupId(offsetFetchRequest.groupId)
-          .setErrorCode(Errors.forException(exception).code)
-      } else if (offsetFetchResponse.errorCode() != Errors.NONE.code) {
-        offsetFetchResponse
+        OffsetFetchResponse.groupError(
+          groupFetchRequest,
+          Errors.forException(exception),
+          requestContext.apiVersion()
+        )
+      } else if (groupFetchResponse.errorCode() != Errors.NONE.code) {
+        groupFetchResponse
       } else {
         val topics = new 
util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics](
-          offsetFetchResponse.topics.size + unauthorizedTopics.size
+          groupFetchResponse.topics.size + unauthorizedTopics.size
         )
-        topics.addAll(offsetFetchResponse.topics)
+        topics.addAll(groupFetchResponse.topics)
         unauthorizedTopics.foreach { topic =>
           val topicResponse = new 
OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name)
           topic.partitionIndexes.forEach { partitionIndex =>
@@ -1624,7 +1630,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           }
           topics.add(topicResponse)
         }
-        offsetFetchResponse.setTopics(topics)
+        groupFetchResponse.setTopics(topics)
       }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 67f5cfb5492..fc43dc31c53 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -561,4 +561,43 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
       )
     }
   }
+
+  @ClusterTest
+  def testGroupErrors(): Unit = {
+    // Start from version 1 because version 0 goes to ZK.
+    for (version <- 1 to 
ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) {
+      assertEquals(
+        if (version >= 2) {
+          new OffsetFetchResponseData.OffsetFetchResponseGroup()
+            .setGroupId("unknown")
+            .setErrorCode(Errors.NOT_COORDINATOR.code)
+        } else {
+          // Version 1 does not support group level errors. Hence, the error is
+          // returned at the partition level.
+          new OffsetFetchResponseData.OffsetFetchResponseGroup()
+            .setGroupId("unknown")
+            .setTopics(List(
+              new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setPartitions(List(
+                  new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                    .setPartitionIndex(0)
+                    .setErrorCode(Errors.NOT_COORDINATOR.code)
+                    .setCommittedOffset(-1)
+                    .setCommittedLeaderEpoch(-1)
+                    .setMetadata("")
+                ).asJava)
+            ).asJava)
+        },
+        fetchOffsets(
+          groupId = "unknown",
+          memberId = null,
+          memberEpoch = -1,
+          partitions = List(new TopicPartition("foo", 0)),
+          requireStable = false,
+          version = version.toShort
+        )
+      )
+    }
+  }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index a7f7375f089..20ffb390f71 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -53,6 +53,7 @@ import 
org.apache.kafka.common.requests.ConsumerGroupDescribeRequest;
 import org.apache.kafka.common.requests.DeleteGroupsRequest;
 import org.apache.kafka.common.requests.DescribeGroupsRequest;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
@@ -753,18 +754,20 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         boolean requireStable
     ) {
         if (!isActive.get()) {
-            return CompletableFuture.completedFuture(new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
-                .setGroupId(request.groupId())
-                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
-            );
+            return 
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+                request,
+                Errors.COORDINATOR_NOT_AVAILABLE,
+                context.requestVersion()
+            ));
         }
 
         // For backwards compatibility, we support fetch commits for the empty 
group id.
         if (request.groupId() == null) {
-            return CompletableFuture.completedFuture(new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
-                .setGroupId(request.groupId())
-                .setErrorCode(Errors.INVALID_GROUP_ID.code())
-            );
+            return 
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+                request,
+                Errors.INVALID_GROUP_ID,
+                context.requestVersion()
+            ));
         }
 
         // The require stable flag when set tells the broker to hold on 
returning unstable
@@ -804,18 +807,20 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         boolean requireStable
     ) {
         if (!isActive.get()) {
-            return CompletableFuture.completedFuture(new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
-                .setGroupId(request.groupId())
-                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
-            );
+            return 
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+                request,
+                Errors.COORDINATOR_NOT_AVAILABLE,
+                context.requestVersion()
+            ));
         }
 
         // For backwards compatibility, we support fetch commits for the empty 
group id.
         if (request.groupId() == null) {
-            return CompletableFuture.completedFuture(new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
-                .setGroupId(request.groupId())
-                .setErrorCode(Errors.INVALID_GROUP_ID.code())
-            );
+            return 
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
+                request,
+                Errors.INVALID_GROUP_ID,
+                context.requestVersion()
+            ));
         }
 
         // The require stable flag when set tells the broker to hold on 
returning unstable

Reply via email to