This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new be194f5dbae MINOR: Simplify OffsetFetchRequest (#19572)
be194f5dbae is described below
commit be194f5dbaeabf7c8992312fd0b8bcf596a1284a
Author: David Jacot <[email protected]>
AuthorDate: Sun Apr 27 18:58:30 2025 +0200
MINOR: Simplify OffsetFetchRequest (#19572)
While working on https://github.com/apache/kafka/pull/19515, I came to
the conclusion that the OffsetFetchRequest is quite messy and overall
too complicated. This patch rationalize the constructors.
OffsetFetchRequest has a single constructor accepting the
OffsetFetchRequestData. This will also simplify adding the topic ids.
All the changes are mechanical, replacing data structures by others.
Reviewers: PoAn Yang <[email protected]>, TengYao Chi
<[email protected]>, Lianet Magran <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../internals/ListConsumerGroupOffsetsHandler.java | 36 +-
.../consumer/internals/CommitRequestManager.java | 39 +-
.../consumer/internals/ConsumerCoordinator.java | 23 +-
.../kafka/common/requests/OffsetFetchRequest.java | 215 +++--------
.../kafka/clients/admin/KafkaAdminClientTest.java | 6 +-
.../common/requests/OffsetFetchRequestTest.java | 395 +++++++++++----------
.../kafka/common/requests/RequestResponseTest.java | 167 ++++-----
.../kafka/api/AuthorizerIntegrationTest.scala | 46 ++-
.../server/GroupCoordinatorBaseRequestTest.scala | 42 ++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 154 +++++---
.../scala/unit/kafka/server/RequestQuotaTest.scala | 14 +-
11 files changed, 607 insertions(+), 530 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
index 4c0e3db9254..36ff25ebb79 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
@@ -20,6 +20,7 @@ import
org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
@@ -86,15 +87,32 @@ public class ListConsumerGroupOffsetsHandler implements
AdminApiHandler<Coordina
}
public OffsetFetchRequest.Builder buildBatchedRequest(Set<CoordinatorKey>
groupIds) {
- // Create a map that only contains the consumer groups owned by the
coordinator.
- Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions
= new HashMap<>(groupIds.size());
- groupIds.forEach(g -> {
- ListConsumerGroupOffsetsSpec spec = groupSpecs.get(g.idValue);
- List<TopicPartition> partitions = spec.topicPartitions() != null ?
new ArrayList<>(spec.topicPartitions()) : null;
- coordinatorGroupIdToTopicPartitions.put(g.idValue, partitions);
- });
-
- return new
OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, requireStable,
false);
+ // Create a request that only contains the consumer groups owned by
the coordinator.
+ return new OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setRequireStable(requireStable)
+ .setGroups(groupIds.stream().map(groupId -> {
+ ListConsumerGroupOffsetsSpec spec =
groupSpecs.get(groupId.idValue);
+
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics>
topics = null;
+ if (spec.topicPartitions() != null) {
+ topics = spec.topicPartitions().stream()
+
.collect(Collectors.groupingBy(TopicPartition::topic))
+ .entrySet()
+ .stream()
+ .map(entry -> new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName(entry.getKey())
+ .setPartitionIndexes(entry.getValue().stream()
+ .map(TopicPartition::partition)
+ .collect(Collectors.toList())))
+ .collect(Collectors.toList());
+ }
+ return new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(groupId.idValue)
+ .setTopics(topics);
+ }).collect(Collectors.toList())),
+ false
+ );
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 62d1fe3a866..4b22a5711b3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -34,6 +34,7 @@ import
org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnstableOffsetCommitException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
@@ -970,21 +971,37 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
}
public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics =
requestedPartitions.stream()
+ .collect(Collectors.groupingBy(TopicPartition::topic))
+ .entrySet()
+ .stream()
+ .map(entry -> new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName(entry.getKey())
+ .setPartitionIndexes(entry.getValue().stream()
+ .map(TopicPartition::partition)
+ .collect(Collectors.toList())))
+ .collect(Collectors.toList());
- OffsetFetchRequest.Builder builder = memberInfo.memberEpoch.
- map(epoch -> new OffsetFetchRequest.Builder(
- groupId,
- memberInfo.memberId,
- epoch,
- true,
- new ArrayList<>(this.requestedPartitions),
- throwOnFetchStableOffsetUnsupported))
+ OffsetFetchRequest.Builder builder = memberInfo.memberEpoch
+ .map(epoch -> new OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setRequireStable(true)
+ .setGroups(List.of(
+ new
OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(groupId)
+ .setMemberId(memberInfo.memberId)
+ .setMemberEpoch(epoch)
+ .setTopics(topics))),
+ throwOnFetchStableOffsetUnsupported))
// Building request without passing member ID/epoch to leave
the logic to choose
// default values when not present on the request builder.
.orElseGet(() -> new OffsetFetchRequest.Builder(
- groupId,
- true,
- new ArrayList<>(this.requestedPartitions),
+ new OffsetFetchRequestData()
+ .setRequireStable(true)
+ .setGroups(List.of(
+ new
OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(groupId)
+ .setTopics(topics))),
throwOnFetchStableOffsetUnsupported));
return buildRequestWithResponseHandling(builder);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 1cba10ef15d..8829654c7a8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -49,6 +49,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
@@ -1477,9 +1478,27 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
return RequestFuture.coordinatorNotAvailable();
log.debug("Fetching committed offsets for partitions: {}", partitions);
+
// construct the request
- OffsetFetchRequest.Builder requestBuilder =
- new OffsetFetchRequest.Builder(this.rebalanceConfig.groupId, true,
new ArrayList<>(partitions), throwOnFetchStableOffsetsUnsupported);
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics =
partitions.stream()
+ .collect(Collectors.groupingBy(TopicPartition::topic))
+ .entrySet()
+ .stream()
+ .map(entry -> new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName(entry.getKey())
+ .setPartitionIndexes(entry.getValue().stream()
+ .map(TopicPartition::partition)
+ .collect(Collectors.toList())))
+ .collect(Collectors.toList());
+
+ OffsetFetchRequest.Builder requestBuilder = new
OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setRequireStable(true)
+ .setGroups(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(this.rebalanceConfig.groupId)
+ .setTopics(topics))),
+ throwOnFetchStableOffsetsUnsupported);
// send the request with a callback
return client.send(coordinator, requestBuilder)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 907cba953fb..2a26e7940d3 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -22,9 +22,11 @@ import
org.apache.kafka.common.message.OffsetFetchRequestData;
import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopic;
import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,113 +36,35 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
import java.util.stream.Collectors;
public class OffsetFetchRequest extends AbstractRequest {
private static final Logger log =
LoggerFactory.getLogger(OffsetFetchRequest.class);
+ private static final short TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION = 2;
+ private static final short REQUIRE_STABLE_OFFSET_MIN_VERSION = 7;
+ private static final short BATCH_MIN_VERSION = 8;
- private static final List<OffsetFetchRequestTopic> ALL_TOPIC_PARTITIONS =
null;
- private static final List<OffsetFetchRequestTopics>
ALL_TOPIC_PARTITIONS_BATCH = null;
private final OffsetFetchRequestData data;
public static class Builder extends
AbstractRequest.Builder<OffsetFetchRequest> {
- public final OffsetFetchRequestData data;
+ private final OffsetFetchRequestData data;
private final boolean throwOnFetchStableOffsetsUnsupported;
- public Builder(String groupId,
- boolean requireStable,
- List<TopicPartition> partitions,
- boolean throwOnFetchStableOffsetsUnsupported) {
- this(
- groupId,
- null,
- -1,
- requireStable,
- partitions,
- throwOnFetchStableOffsetsUnsupported
- );
- }
-
- public Builder(String groupId,
- String memberId,
- int memberEpoch,
- boolean requireStable,
- List<TopicPartition> partitions,
- boolean throwOnFetchStableOffsetsUnsupported) {
+ public Builder(OffsetFetchRequestData data, boolean
throwOnFetchStableOffsetsUnsupported) {
super(ApiKeys.OFFSET_FETCH);
-
- OffsetFetchRequestData.OffsetFetchRequestGroup group =
- new OffsetFetchRequestData.OffsetFetchRequestGroup()
- .setGroupId(groupId)
- .setMemberId(memberId)
- .setMemberEpoch(memberEpoch);
-
- if (partitions != null) {
- Map<String, OffsetFetchRequestTopics>
offsetFetchRequestTopicMap = new HashMap<>();
- for (TopicPartition topicPartition : partitions) {
- String topicName = topicPartition.topic();
- OffsetFetchRequestTopics topic =
offsetFetchRequestTopicMap.getOrDefault(
- topicName, new
OffsetFetchRequestTopics().setName(topicName));
- topic.partitionIndexes().add(topicPartition.partition());
- offsetFetchRequestTopicMap.put(topicName, topic);
- }
- group.setTopics(new
ArrayList<>(offsetFetchRequestTopicMap.values()));
- } else {
- // If passed in partition list is null, it is requesting
offsets for all topic partitions.
- group.setTopics(ALL_TOPIC_PARTITIONS_BATCH);
- }
-
- this.data = new OffsetFetchRequestData()
- .setRequireStable(requireStable)
- .setGroups(Collections.singletonList(group));
- this.throwOnFetchStableOffsetsUnsupported =
throwOnFetchStableOffsetsUnsupported;
- }
-
- public Builder(Map<String, List<TopicPartition>>
groupIdToTopicPartitionMap,
- boolean requireStable,
- boolean throwOnFetchStableOffsetsUnsupported) {
- super(ApiKeys.OFFSET_FETCH);
-
- List<OffsetFetchRequestGroup> groups = new ArrayList<>();
- for (Entry<String, List<TopicPartition>> entry :
groupIdToTopicPartitionMap.entrySet()) {
- String groupName = entry.getKey();
- List<TopicPartition> tpList = entry.getValue();
- final List<OffsetFetchRequestTopics> topics;
- if (tpList != null) {
- Map<String, OffsetFetchRequestTopics>
offsetFetchRequestTopicMap =
- new HashMap<>();
- for (TopicPartition topicPartition : tpList) {
- String topicName = topicPartition.topic();
- OffsetFetchRequestTopics topic =
offsetFetchRequestTopicMap.getOrDefault(
- topicName, new
OffsetFetchRequestTopics().setName(topicName));
-
topic.partitionIndexes().add(topicPartition.partition());
- offsetFetchRequestTopicMap.put(topicName, topic);
- }
- topics = new
ArrayList<>(offsetFetchRequestTopicMap.values());
- } else {
- topics = ALL_TOPIC_PARTITIONS_BATCH;
- }
- groups.add(new OffsetFetchRequestGroup()
- .setGroupId(groupName)
- .setTopics(topics));
- }
- this.data = new OffsetFetchRequestData()
- .setGroups(groups)
- .setRequireStable(requireStable);
+ this.data = data;
this.throwOnFetchStableOffsetsUnsupported =
throwOnFetchStableOffsetsUnsupported;
}
@Override
public OffsetFetchRequest build(short version) {
- if (data.groups().size() > 1 && version < 8) {
+ if (data.groups().size() > 1 && version < BATCH_MIN_VERSION) {
throw new NoBatchedOffsetFetchRequestException("Broker does
not support"
+ " batching groups for fetch offset request on version "
+ version);
}
- if (data.requireStable() && version < 7) {
+ if (data.requireStable() && version <
REQUIRE_STABLE_OFFSET_MIN_VERSION) {
if (throwOnFetchStableOffsetsUnsupported) {
throw new UnsupportedVersionException("Broker unexpectedly
" +
"doesn't support requireStable flag on version " +
version);
@@ -152,7 +76,7 @@ public class OffsetFetchRequest extends AbstractRequest {
}
}
// convert data to use the appropriate version since version 8
uses different format
- if (version < 8) {
+ if (version < BATCH_MIN_VERSION) {
OffsetFetchRequestData normalizedData;
if (!data.groups().isEmpty()) {
OffsetFetchRequestGroup group = data.groups().get(0);
@@ -175,7 +99,7 @@ public class OffsetFetchRequest extends AbstractRequest {
} else {
normalizedData = data;
}
- if (normalizedData.topics() == null && version < 2) {
+ if (normalizedData.topics() == null && version <
TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) {
throw new UnsupportedVersionException("The broker only
supports OffsetFetchRequest " +
"v" + version + ", but we need v2 or newer to request
all topic partitions.");
}
@@ -202,19 +126,6 @@ public class OffsetFetchRequest extends AbstractRequest {
}
}
- public List<TopicPartition> partitions() {
- if (isAllPartitions()) {
- return null;
- }
- List<TopicPartition> partitions = new ArrayList<>();
- for (OffsetFetchRequestTopic topic : data.topics()) {
- for (Integer partitionIndex : topic.partitionIndexes()) {
- partitions.add(new TopicPartition(topic.name(),
partitionIndex));
- }
- }
- return partitions;
- }
-
public String groupId() {
return data.groupId();
}
@@ -224,7 +135,7 @@ public class OffsetFetchRequest extends AbstractRequest {
}
public List<OffsetFetchRequestData.OffsetFetchRequestGroup> groups() {
- if (version() >= 8) {
+ if (version() >= BATCH_MIN_VERSION) {
return data.groups();
} else {
OffsetFetchRequestData.OffsetFetchRequestGroup group =
@@ -253,7 +164,7 @@ public class OffsetFetchRequest extends AbstractRequest {
Map<String, List<TopicPartition>> groupIdsToPartitions = new
HashMap<>();
for (OffsetFetchRequestGroup group : data.groups()) {
List<TopicPartition> tpList = null;
- if (group.topics() != ALL_TOPIC_PARTITIONS_BATCH) {
+ if (group.topics() != null) {
tpList = new ArrayList<>();
for (OffsetFetchRequestTopics topic : group.topics()) {
for (Integer partitionIndex : topic.partitionIndexes()) {
@@ -285,67 +196,59 @@ public class OffsetFetchRequest extends AbstractRequest {
this.data = data;
}
- public OffsetFetchResponse getErrorResponse(Errors error) {
- return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, error);
- }
-
- public OffsetFetchResponse getErrorResponse(int throttleTimeMs, Errors
error) {
- Map<TopicPartition, OffsetFetchResponse.PartitionData>
responsePartitions = new HashMap<>();
- if (version() < 2) {
- OffsetFetchResponse.PartitionData partitionError = new
OffsetFetchResponse.PartitionData(
- OffsetFetchResponse.INVALID_OFFSET,
- Optional.empty(),
- OffsetFetchResponse.NO_METADATA,
- error);
-
- for (OffsetFetchRequestTopic topic : this.data.topics()) {
- for (int partitionIndex : topic.partitionIndexes()) {
- responsePartitions.put(
- new TopicPartition(topic.name(), partitionIndex),
partitionError);
- }
- }
- return new OffsetFetchResponse(error, responsePartitions);
- }
- if (version() == 2) {
- return new OffsetFetchResponse(error, responsePartitions);
- }
- if (version() >= 3 && version() < 8) {
- return new OffsetFetchResponse(throttleTimeMs, error,
responsePartitions);
- }
- List<String> groupIds = groupIds();
- Map<String, Errors> errorsMap = new HashMap<>(groupIds.size());
- Map<String, Map<TopicPartition, OffsetFetchResponse.PartitionData>>
partitionMap =
- new HashMap<>(groupIds.size());
- for (String g : groupIds) {
- errorsMap.put(g, error);
- partitionMap.put(g, responsePartitions);
- }
- return new OffsetFetchResponse(throttleTimeMs, errorsMap,
partitionMap);
- }
-
@Override
public OffsetFetchResponse getErrorResponse(int throttleTimeMs, Throwable
e) {
- return getErrorResponse(throttleTimeMs, Errors.forException(e));
+ Errors error = Errors.forException(e);
+
+ if (version() < TOP_LEVEL_ERROR_AND_NULL_TOPICS_MIN_VERSION) {
+ // The response does not support top level error so we return each
+ // partition with the error.
+ return new OffsetFetchResponse(
+ new OffsetFetchResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setTopics(data.topics().stream().map(topic ->
+ new OffsetFetchResponseData.OffsetFetchResponseTopic()
+ .setName(topic.name())
+
.setPartitions(topic.partitionIndexes().stream().map(partition ->
+ new
OffsetFetchResponseData.OffsetFetchResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code())
+
.setCommittedOffset(OffsetFetchResponse.INVALID_OFFSET)
+
.setMetadata(OffsetFetchResponse.NO_METADATA)
+
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+ ).collect(Collectors.toList()))
+ ).collect(Collectors.toList())),
+ version()
+ );
+ } else if (version() < BATCH_MIN_VERSION) {
+ // The response does not support multiple groups but it does
support
+ // top level error.
+ return new OffsetFetchResponse(
+ new OffsetFetchResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(error.code()),
+ version()
+ );
+ } else {
+ // The response does support multiple groups so we provide a top
level
+ // error per group.
+ return new OffsetFetchResponse(
+ new OffsetFetchResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setGroups(data.groups().stream().map(group ->
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId(group.groupId())
+ .setErrorCode(error.code())
+ ).collect(Collectors.toList())),
+ version()
+ );
+ }
}
public static OffsetFetchRequest parse(Readable readable, short version) {
return new OffsetFetchRequest(new OffsetFetchRequestData(readable,
version), version);
}
- public boolean isAllPartitions() {
- return data.topics() == ALL_TOPIC_PARTITIONS;
- }
-
- public boolean isAllPartitionsForGroup(String groupId) {
- OffsetFetchRequestGroup group = data
- .groups()
- .stream()
- .filter(g -> g.groupId().equals(groupId))
- .collect(Collectors.toList())
- .get(0);
- return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
- }
-
@Override
public OffsetFetchRequestData data() {
return data;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0fa00dbea20..319a91d7407 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -4396,7 +4396,7 @@ public class KafkaAdminClientTest {
ClientRequest clientRequest = mockClient.requests().peek();
assertNotNull(clientRequest);
assertEquals(300, clientRequest.requestTimeoutMs());
- OffsetFetchRequestData data = ((OffsetFetchRequest.Builder)
clientRequest.requestBuilder()).data;
+ OffsetFetchRequestData data = ((OffsetFetchRequest.Builder)
clientRequest.requestBuilder()).build().data();
assertTrue(data.requireStable());
assertEquals(Collections.singletonList(GROUP_ID),
data.groups().stream().map(OffsetFetchRequestGroup::groupId).collect(Collectors.toList()));
@@ -4791,7 +4791,7 @@ public class KafkaAdminClientTest {
waitForRequest(mockClient, ApiKeys.OFFSET_FETCH);
ClientRequest clientRequest = mockClient.requests().peek();
- OffsetFetchRequestData data = ((OffsetFetchRequest.Builder)
clientRequest.requestBuilder()).data;
+ OffsetFetchRequestData data = ((OffsetFetchRequest.Builder)
clientRequest.requestBuilder()).build().data();
Map<String, Map<TopicPartition, PartitionData>> results = new
HashMap<>();
Map<String, Errors> errors = new HashMap<>();
data.groups().forEach(group -> {
@@ -4813,7 +4813,7 @@ public class KafkaAdminClientTest {
waitForRequest(mockClient, ApiKeys.OFFSET_FETCH);
ClientRequest clientRequest = mockClient.requests().peek();
- OffsetFetchRequestData data = ((OffsetFetchRequest.Builder)
clientRequest.requestBuilder()).data;
+ OffsetFetchRequestData data = ((OffsetFetchRequest.Builder)
clientRequest.requestBuilder()).build().data();
Map<String, Map<TopicPartition, PartitionData>> results = new
HashMap<>();
Map<String, Errors> errors = new HashMap<>();
data.groups().forEach(group -> {
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
index 1098925e42a..c4671ac1588 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
@@ -16,220 +16,239 @@
*/
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
-import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.OffsetFetchRequest.Builder;
-import
org.apache.kafka.common.requests.OffsetFetchRequest.NoBatchedOffsetFetchRequestException;
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import static
org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
public class OffsetFetchRequestTest {
-
- private final String topicOne = "topic1";
- private final int partitionOne = 1;
- private final String topicTwo = "topic2";
- private final int partitionTwo = 2;
- private final String topicThree = "topic3";
- private final String group1 = "group1";
- private final String group2 = "group2";
- private final String group3 = "group3";
- private final String group4 = "group4";
- private final String group5 = "group5";
- private final List<String> groups = Arrays.asList(group1, group2, group3,
group4, group5);
-
- private final List<Integer> listOfVersionsNonBatchOffsetFetch =
Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7);
-
-
- private OffsetFetchRequest.Builder builder;
-
- @Test
- public void testConstructor() {
- List<TopicPartition> partitions = Arrays.asList(
- new TopicPartition(topicOne, partitionOne),
- new TopicPartition(topicTwo, partitionTwo));
- int throttleTimeMs = 10;
-
- Map<TopicPartition, PartitionData> expectedData = new HashMap<>();
- for (TopicPartition partition : partitions) {
- expectedData.put(partition, new PartitionData(
- OffsetFetchResponse.INVALID_OFFSET,
- Optional.empty(),
- OffsetFetchResponse.NO_METADATA,
- Errors.NONE
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+ public void testWithMultipleGroups(short version) {
+ var data = new OffsetFetchRequestData()
+ .setGroups(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp1")
+ .setTopics(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List.of(0, 1, 2))
+ )),
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp2")
+ .setTopics(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(List.of(0, 1, 2))
+ ))
));
+ var builder = new OffsetFetchRequest.Builder(data, false);
+
+ if (version < 8) {
+
assertThrows(OffsetFetchRequest.NoBatchedOffsetFetchRequestException.class, ()
-> builder.build(version));
+ } else {
+ assertEquals(data, builder.build(version).data());
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+ public void testThrowOnFetchStableOffsetsUnsupported(short version) {
+ var builder = new OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setRequireStable(true)
+ .setGroups(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp1")
+ .setTopics(List.of(
+ new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List.of(0, 1, 2))
+ ))
+ )),
+ true
+ );
+
+ if (version < 7) {
+ assertThrows(UnsupportedVersionException.class, () ->
builder.build(version));
+ } else {
+ builder.build(version);
}
+ }
- for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
- if (version < 8) {
- builder = new OffsetFetchRequest.Builder(
- group1,
- false,
- partitions,
- false);
- OffsetFetchRequest request = builder.build(version);
- assertFalse(request.isAllPartitions());
- assertEquals(group1, request.groupId());
- assertEquals(partitions, request.partitions());
-
- OffsetFetchResponse response =
request.getErrorResponse(throttleTimeMs, Errors.NONE);
- assertEquals(Errors.NONE, response.error());
- assertFalse(response.hasError());
- assertEquals(Collections.singletonMap(Errors.NONE, version <=
(short) 1 ? 3 : 1), response.errorCounts(),
- "Incorrect error count for version " + version);
-
- if (version <= 1) {
- assertEquals(expectedData, response.responseDataV0ToV7());
- }
-
- if (version >= 3) {
- assertEquals(throttleTimeMs, response.throttleTimeMs());
- } else {
- assertEquals(DEFAULT_THROTTLE_TIME,
response.throttleTimeMs());
- }
- } else {
- builder = new Builder(Collections.singletonMap(group1,
partitions), false, false);
- OffsetFetchRequest request = builder.build(version);
- Map<String, List<TopicPartition>> groupToPartitionMap =
- request.groupIdsToPartitions();
- Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
- request.groupIdsToTopics();
- assertFalse(request.isAllPartitionsForGroup(group1));
- assertTrue(groupToPartitionMap.containsKey(group1) &&
groupToTopicMap.containsKey(
- group1));
- assertEquals(partitions, groupToPartitionMap.get(group1));
- OffsetFetchResponse response =
request.getErrorResponse(throttleTimeMs, Errors.NONE);
- assertEquals(Errors.NONE, response.groupLevelError(group1));
- assertFalse(response.groupHasError(group1));
- assertEquals(Collections.singletonMap(Errors.NONE, 1),
response.errorCounts(),
- "Incorrect error count for version " + version);
- assertEquals(throttleTimeMs, response.throttleTimeMs());
- }
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+ public void testSingleGroup(short version) {
+ var data = new OffsetFetchRequestData()
+ .setGroups(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp1")
+ .setTopics(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List.of(0, 1, 2))
+ ))
+ ));
+ var builder = new OffsetFetchRequest.Builder(data, false);
+
+ if (version < 8) {
+ var expectedRequest = new OffsetFetchRequestData()
+ .setGroupId("grp1")
+ .setTopics(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestTopic()
+ .setName("foo")
+ .setPartitionIndexes(List.of(0, 1, 2))
+ ));
+ assertEquals(expectedRequest, builder.build(version).data());
+ } else {
+ assertEquals(data, builder.build(version).data());
}
}
- @Test
- public void testConstructorWithMultipleGroups() {
- List<TopicPartition> topic1Partitions = Arrays.asList(
- new TopicPartition(topicOne, partitionOne),
- new TopicPartition(topicOne, partitionTwo));
- List<TopicPartition> topic2Partitions = Arrays.asList(
- new TopicPartition(topicTwo, partitionOne),
- new TopicPartition(topicTwo, partitionTwo));
- List<TopicPartition> topic3Partitions = Arrays.asList(
- new TopicPartition(topicThree, partitionOne),
- new TopicPartition(topicThree, partitionTwo));
- Map<String, List<TopicPartition>> groupToTp = new HashMap<>();
- groupToTp.put(group1, topic1Partitions);
- groupToTp.put(group2, topic2Partitions);
- groupToTp.put(group3, topic3Partitions);
- groupToTp.put(group4, null);
- groupToTp.put(group5, null);
- int throttleTimeMs = 10;
-
- for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
- if (version >= 8) {
- builder = new Builder(groupToTp, false, false);
- OffsetFetchRequest request = builder.build(version);
- Map<String, List<TopicPartition>> groupToPartitionMap =
- request.groupIdsToPartitions();
- Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
- request.groupIdsToTopics();
- assertEquals(groupToTp.keySet(), groupToTopicMap.keySet());
- assertEquals(groupToTp.keySet(), groupToPartitionMap.keySet());
- assertFalse(request.isAllPartitionsForGroup(group1));
- assertFalse(request.isAllPartitionsForGroup(group2));
- assertFalse(request.isAllPartitionsForGroup(group3));
- assertTrue(request.isAllPartitionsForGroup(group4));
- assertTrue(request.isAllPartitionsForGroup(group5));
- OffsetFetchResponse response =
request.getErrorResponse(throttleTimeMs, Errors.NONE);
- for (String group : groups) {
- assertEquals(Errors.NONE, response.groupLevelError(group));
- assertFalse(response.groupHasError(group));
- }
- assertEquals(Collections.singletonMap(Errors.NONE, 5),
response.errorCounts(),
- "Incorrect error count for version " + version);
- assertEquals(throttleTimeMs, response.throttleTimeMs());
- }
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+ public void testSingleGroupWithAllTopics(short version) {
+ var data = new OffsetFetchRequestData()
+ .setGroups(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp1")
+ .setTopics(null)
+ ));
+ var builder = new OffsetFetchRequest.Builder(data, false);
+
+ if (version < 2) {
+ assertThrows(UnsupportedVersionException.class, () ->
builder.build(version));
+ } else if (version < 8) {
+ var expectedRequest = new OffsetFetchRequestData()
+ .setGroupId("grp1")
+ .setTopics(null);
+ assertEquals(expectedRequest, builder.build(version).data());
+ } else {
+ assertEquals(data, builder.build(version).data());
}
}
- @Test
- public void testBuildThrowForUnsupportedBatchRequest() {
- for (int version : listOfVersionsNonBatchOffsetFetch) {
- Map<String, List<TopicPartition>> groupPartitionMap = new
HashMap<>();
- groupPartitionMap.put(group1, null);
- groupPartitionMap.put(group2, null);
- builder = new Builder(groupPartitionMap, true, false);
- final short finalVersion = (short) version;
- assertThrows(NoBatchedOffsetFetchRequestException.class, () ->
builder.build(finalVersion));
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+ public void testGetErrorResponse(short version) {
+ var request = new OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setGroups(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp1")
+ .setTopics(List.of(
+ new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List.of(0, 1))
+ ))
+ )),
+ false
+ ).build(version);
+
+ if (version < 2) {
+ var expectedResponse = new OffsetFetchResponseData()
+ .setThrottleTimeMs(1000)
+ .setTopics(List.of(
+ new OffsetFetchResponseData.OffsetFetchResponseTopic()
+ .setName("foo")
+ .setPartitions(List.of(
+ new
OffsetFetchResponseData.OffsetFetchResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+
.setCommittedOffset(OffsetFetchResponse.INVALID_OFFSET)
+ .setMetadata(OffsetFetchResponse.NO_METADATA)
+
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH),
+ new
OffsetFetchResponseData.OffsetFetchResponsePartition()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+
.setCommittedOffset(OffsetFetchResponse.INVALID_OFFSET)
+ .setMetadata(OffsetFetchResponse.NO_METADATA)
+
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+ ))
+ ));
+ assertEquals(expectedResponse, request.getErrorResponse(1000,
Errors.INVALID_GROUP_ID.exception()).data());
+ } else if (version < 8) {
+ var expectedResponse = new OffsetFetchResponseData()
+ .setThrottleTimeMs(1000)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+ assertEquals(expectedResponse, request.getErrorResponse(1000,
Errors.INVALID_GROUP_ID.exception()).data());
+ } else {
+ var expectedResponse = new OffsetFetchResponseData()
+ .setThrottleTimeMs(1000)
+ .setGroups(List.of(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp1")
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ ));
+ assertEquals(expectedResponse, request.getErrorResponse(1000,
Errors.INVALID_GROUP_ID.exception()).data());
}
}
- @Test
- public void testConstructorFailForUnsupportedRequireStable() {
- for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
- if (version < 8) {
- // The builder needs to be initialized every cycle as the
internal data `requireStable` flag is flipped.
- builder = new OffsetFetchRequest.Builder(group1, true, null,
false);
- final short finalVersion = version;
- if (version < 2) {
- assertThrows(UnsupportedVersionException.class, () ->
builder.build(finalVersion));
- } else {
- OffsetFetchRequest request = builder.build(finalVersion);
- assertEquals(group1, request.groupId());
- assertNull(request.partitions());
- assertTrue(request.isAllPartitions());
- if (version < 7) {
- assertFalse(request.requireStable());
- } else {
- assertTrue(request.requireStable());
- }
- }
- } else {
- builder = new Builder(Collections.singletonMap(group1, null),
true, false);
- OffsetFetchRequest request = builder.build(version);
- Map<String, List<TopicPartition>> groupToPartitionMap =
- request.groupIdsToPartitions();
- Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
- request.groupIdsToTopics();
- assertTrue(groupToPartitionMap.containsKey(group1) &&
groupToTopicMap.containsKey(
- group1));
- assertNull(groupToPartitionMap.get(group1));
- assertTrue(request.isAllPartitionsForGroup(group1));
- assertTrue(request.requireStable());
- }
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+ public void testGroups(short version) {
+ var request = new OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setGroups(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp1")
+ .setTopics(List.of(
+ new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List.of(0, 1, 2))
+ ))
+ )),
+ false
+ ).build(version);
+
+ if (version < 8) {
+ var expectedGroups = List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp1")
+ .setTopics(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List.of(0, 1, 2))
+ ))
+ );
+ assertEquals(expectedGroups, request.groups());
+ } else {
+ assertEquals(request.data().groups(), request.groups());
}
}
- @Test
- public void testBuildThrowForUnsupportedRequireStable() {
- for (int version : listOfVersionsNonBatchOffsetFetch) {
- builder = new OffsetFetchRequest.Builder(group1, true, null, true);
- if (version < 7) {
- final short finalVersion = (short) version;
- assertThrows(UnsupportedVersionException.class, () ->
builder.build(finalVersion));
- } else {
- OffsetFetchRequest request = builder.build((short) version);
- assertTrue(request.requireStable());
- }
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH, fromVersion = 2)
+ public void testGroupsWithAllTopics(short version) {
+ var request = new OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setGroups(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp1")
+ .setTopics(null)
+ )),
+ false
+ ).build(version);
+
+ if (version < 8) {
+ var expectedGroups = List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp1")
+ .setTopics(null)
+ );
+ assertEquals(expectedGroups, request.groups());
+ } else {
+ assertEquals(request.data().groups(), request.groups());
}
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index ce379f512f0..4a1b2d7f9b9 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -199,6 +199,7 @@ import
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
import
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection;
import
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
import
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic;
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection;
@@ -276,8 +277,6 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
@@ -772,55 +771,6 @@ public class RequestResponseTest {
assertThrows(IllegalArgumentException.class, () ->
createTopicsRequest.serializeWithHeader(requestHeader));
}
- @Test
- public void testOffsetFetchRequestBuilderToStringV0ToV7() {
- List<Boolean> stableFlags = asList(true, false);
- for (Boolean requireStable : stableFlags) {
- String allTopicPartitionsString = new OffsetFetchRequest.Builder(
- "someGroup",
- requireStable,
- null,
- false
- ).toString();
-
- assertTrue(allTopicPartitionsString.contains("groupId='',
topics=[],"
- + " groups=[OffsetFetchRequestGroup(groupId='someGroup',
memberId=null, memberEpoch=-1, topics=null)], requireStable=" + requireStable));
- String string = new OffsetFetchRequest.Builder(
- "group1",
- requireStable,
- singletonList(
- new TopicPartition("test11", 1)),
- false
- ).toString();
- assertTrue(string.contains("test11"));
- assertTrue(string.contains("group1"));
- assertTrue(string.contains("requireStable=" + requireStable));
- }
- }
-
- @ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void testOffsetFetchRequestBuilderToStringV8AndAbove(boolean
requireStable) {
- String allTopicPartitionsString = new OffsetFetchRequest.Builder(
- Collections.singletonMap("someGroup", null),
- requireStable,
- false
- ).toString();
-
assertTrue(allTopicPartitionsString.contains("groups=[OffsetFetchRequestGroup"
- + "(groupId='someGroup', memberId=null, memberEpoch=-1,
topics=null)], requireStable=" + requireStable));
-
- String subsetTopicPartitionsString = new OffsetFetchRequest.Builder(
- Collections.singletonMap(
- "group1",
- singletonList(new TopicPartition("test11", 1))),
- requireStable,
- false
- ).toString();
- assertTrue(subsetTopicPartitionsString.contains("test11"));
- assertTrue(subsetTopicPartitionsString.contains("group1"));
- assertTrue(subsetTopicPartitionsString.contains("requireStable=" +
requireStable));
- }
-
@Test
public void testApiVersionsRequestBeforeV3Validation() {
for (short version = 0; version < 3; version++) {
@@ -2446,66 +2396,83 @@ public class RequestResponseTest {
}
private OffsetFetchRequest createOffsetFetchRequest(short version, boolean
requireStable) {
- if (version < 8) {
- return new OffsetFetchRequest.Builder(
- "group1",
- requireStable,
- singletonList(new TopicPartition("test11", 1)),
- false)
- .build(version);
- }
return new OffsetFetchRequest.Builder(
- Collections.singletonMap(
- "group1",
- singletonList(new TopicPartition("test11", 1))),
- requireStable,
- false)
- .build(version);
+ new OffsetFetchRequestData()
+ .setRequireStable(requireStable)
+ .setGroups(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group1")
+ .setMemberId(version >= 9 ? "memberid" : null)
+ .setMemberEpoch(version >= 9 ? 10 : -1)
+ .setTopics(List.of(
+ new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("test11")
+ .setPartitionIndexes(List.of(1))
+ ))
+ )),
+ false
+ ).build(version);
}
private OffsetFetchRequest
createOffsetFetchRequestWithMultipleGroups(short version, boolean
requireStable) {
- Map<String, List<TopicPartition>> groupToPartitionMap = new
HashMap<>();
- List<TopicPartition> topic1 = singletonList(
- new TopicPartition("topic1", 0));
- List<TopicPartition> topic2 = asList(
- new TopicPartition("topic1", 0),
- new TopicPartition("topic2", 0),
- new TopicPartition("topic2", 1));
- List<TopicPartition> topic3 = asList(
- new TopicPartition("topic1", 0),
- new TopicPartition("topic2", 0),
- new TopicPartition("topic2", 1),
- new TopicPartition("topic3", 0),
- new TopicPartition("topic3", 1),
- new TopicPartition("topic3", 2));
- groupToPartitionMap.put("group1", topic1);
- groupToPartitionMap.put("group2", topic2);
- groupToPartitionMap.put("group3", topic3);
- groupToPartitionMap.put("group4", null);
- groupToPartitionMap.put("group5", null);
-
return new OffsetFetchRequest.Builder(
- groupToPartitionMap,
- requireStable,
+ new OffsetFetchRequestData()
+ .setRequireStable(requireStable)
+ .setGroups(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group1")
+ .setTopics(List.of(
+ new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("topic1")
+ .setPartitionIndexes(List.of(0))
+ )),
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group2")
+ .setTopics(List.of(
+ new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("topic1")
+ .setPartitionIndexes(List.of(0)),
+ new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("topic2")
+ .setPartitionIndexes(List.of(0, 1))
+ )),
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group3")
+ .setTopics(List.of(
+ new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("topic1")
+ .setPartitionIndexes(List.of(0)),
+ new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("topic2")
+ .setPartitionIndexes(List.of(0, 1)),
+ new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("topic3")
+ .setPartitionIndexes(List.of(0, 1, 2))
+ )),
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group4")
+ .setTopics(null),
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group5")
+ .setTopics(null)
+ )),
false
).build(version);
}
private OffsetFetchRequest createOffsetFetchRequestForAllPartition(short
version, boolean requireStable) {
- if (version < 8) {
- return new OffsetFetchRequest.Builder(
- "group1",
- requireStable,
- null,
- false)
- .build(version);
- }
return new OffsetFetchRequest.Builder(
- Collections.singletonMap(
- "group1", null),
- requireStable,
- false)
- .build(version);
+ new OffsetFetchRequestData()
+ .setRequireStable(requireStable)
+ .setGroups(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group1")
+ .setMemberId(version >= 9 ? "memberid" : null)
+ .setMemberEpoch(version >= 9 ? 10 : -1)
+ .setTopics(null)
+ )),
+ false
+ ).build(version);
}
private OffsetFetchResponse createOffsetFetchResponse(short version) {
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e9e476fcd12..5286e719784 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -37,7 +37,7 @@ import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition,
ListOffsetsTopic}
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition,
OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
-import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData,
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData,
ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData,
ConsumerGroupHeartbeatResponseData, CreateAclsRequestData,
CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData,
DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData,
DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsR [...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData,
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData,
ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData,
ConsumerGroupHeartbeatResponseData, CreateAclsRequestData,
CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData,
DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData,
DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsR [...]
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch,
SimpleRecord}
@@ -318,15 +318,53 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
private def createOffsetFetchRequest: OffsetFetchRequest = {
- new requests.OffsetFetchRequest.Builder(group, false, List(tp).asJava,
false).build()
+ new OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setRequireStable(false)
+ .setGroups(List(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(group)
+ .setTopics(List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName(tp.topic)
+ .setPartitionIndexes(List[Integer](tp.partition).asJava)
+ ).asJava)
+ ).asJava),
+ false
+ ).build()
}
private def createOffsetFetchRequestAllPartitions: OffsetFetchRequest = {
- new requests.OffsetFetchRequest.Builder(group, false, null, false).build()
+ new OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setRequireStable(false)
+ .setGroups(List(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(group)
+ .setTopics(null)
+ ).asJava),
+ false
+ ).build()
}
private def createOffsetFetchRequest(groupToPartitionMap: util.Map[String,
util.List[TopicPartition]]): OffsetFetchRequest = {
- new requests.OffsetFetchRequest.Builder(groupToPartitionMap, false,
false).build()
+ new OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setGroups(groupToPartitionMap.asScala.map { case (groupId,
partitions) =>
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(groupId)
+ .setTopics(
+ if (partitions == null)
+ null
+ else
+ partitions.asScala.groupBy(_.topic).map { case (topic,
partitions) =>
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName(topic)
+
.setPartitionIndexes(partitions.map(_.partition).map(Int.box).asJava)
+ }.toList.asJava)
+ }.toList.asJava),
+ false
+ ).build()
}
private def createFindCoordinatorRequest = {
diff --git
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index be96826858a..df939c29ffb 100644
---
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -24,7 +24,7 @@ import
org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupR
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
-import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData,
AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData,
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData,
ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData,
DeleteGroupsResponseData, DescribeGroupsRequestData,
DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData,
HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData,
JoinGroupResponseData, LeaveGroupRes [...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData,
AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData,
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData,
ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData,
DeleteGroupsResponseData, DescribeGroupsRequestData,
DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData,
HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData,
JoinGroupResponseData, LeaveGroupRes [...]
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest,
ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest,
ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse,
DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse,
HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest,
InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, L [...]
import org.apache.kafka.common.serialization.StringSerializer
@@ -333,11 +333,23 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
version: Short
): OffsetFetchResponseData.OffsetFetchResponseGroup = {
val request = new OffsetFetchRequest.Builder(
- groupId,
- memberId,
- memberEpoch,
- requireStable,
- if (partitions == null) null else partitions.asJava,
+ new OffsetFetchRequestData()
+ .setRequireStable(requireStable)
+ .setGroups(List(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(memberEpoch)
+ .setTopics(
+ if (partitions == null)
+ null
+ else
+ partitions.groupBy(_.topic).map { case (topic, partitions) =>
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName(topic)
+
.setPartitionIndexes(partitions.map(_.partition).map(Int.box).asJava)
+ }.toList.asJava)
+ ).asJava),
false
).build(version)
@@ -383,8 +395,22 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
}
val request = new OffsetFetchRequest.Builder(
- groups.map { case (k, v) => (k, v.asJava) }.asJava,
- requireStable,
+ new OffsetFetchRequestData()
+ .setRequireStable(requireStable)
+ .setGroups(groups.map { case (groupId, partitions) =>
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(groupId)
+ .setTopics(
+ if (partitions == null)
+ null
+ else
+ partitions.groupBy(_.topic).map { case (topic, partitions) =>
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName(topic)
+
.setPartitionIndexes(partitions.map(_.partition).map(Int.box).asJava)
+ }.toList.asJava
+ )
+ }.toList.asJava),
false
).build(version)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 0b51fd886d8..e3ffce710a6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -8242,16 +8242,30 @@ class KafkaApisTest extends Logging {
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
def testHandleOffsetFetchWithMultipleGroups(version: Short): Unit = {
def makeRequest(version: Short): RequestChannel.Request = {
- val groups = Map(
- "group-1" -> List(
- new TopicPartition("foo", 0),
- new TopicPartition("foo", 1)
- ).asJava,
- "group-2" -> null,
- "group-3" -> null,
- "group-4" -> null,
- ).asJava
- buildRequest(new OffsetFetchRequest.Builder(groups, false,
false).build(version))
+ buildRequest(
+ new OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setGroups(List(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-1")
+ .setTopics(List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List[Integer](0, 1).asJava)
+ ).asJava),
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-2")
+ .setTopics(null),
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-3")
+ .setTopics(null),
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-4")
+ .setTopics(null),
+ ).asJava),
+ false
+ ).build(version)
+ )
}
if (version < 8) {
@@ -8364,12 +8378,17 @@ class KafkaApisTest extends Logging {
def testHandleOffsetFetchWithSingleGroup(version: Short): Unit = {
def makeRequest(version: Short): RequestChannel.Request = {
buildRequest(new OffsetFetchRequest.Builder(
- "group-1",
- false,
- List(
- new TopicPartition("foo", 0),
- new TopicPartition("foo", 1)
- ).asJava,
+ new OffsetFetchRequestData()
+ .setRequireStable(false)
+ .setGroups(List(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-1")
+ .setTopics(List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List[Integer](0, 1).asJava)
+ ).asJava)
+ ).asJava),
false
).build(version))
}
@@ -8434,17 +8453,18 @@ class KafkaApisTest extends Logging {
}
@ParameterizedTest
- @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+ // Version 1 does not support fetching offsets for all topics.
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH, fromVersion = 2)
def testHandleOffsetFetchAllOffsetsWithSingleGroup(version: Short): Unit = {
- // Version 0 gets offsets from Zookeeper. Version 1 does not support
fetching all
- // offsets request. We are not interested in testing these here.
- if (version < 2) return
-
def makeRequest(version: Short): RequestChannel.Request = {
buildRequest(new OffsetFetchRequest.Builder(
- "group-1",
- false,
- null, // all offsets.
+ new OffsetFetchRequestData()
+ .setRequireStable(false)
+ .setGroups(List(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-1")
+ .setTopics(null) // all offsets.
+ ).asJava),
false
).build(version))
}
@@ -8509,19 +8529,40 @@ class KafkaApisTest extends Logging {
@Test
def testHandleOffsetFetchAuthorization(): Unit = {
def makeRequest(version: Short): RequestChannel.Request = {
- val groups = Map(
- "group-1" -> List(
- new TopicPartition("foo", 0),
- new TopicPartition("bar", 0)
- ).asJava,
- "group-2" -> List(
- new TopicPartition("foo", 0),
- new TopicPartition("bar", 0)
- ).asJava,
- "group-3" -> null,
- "group-4" -> null,
- ).asJava
- buildRequest(new OffsetFetchRequest.Builder(groups, false,
false).build(version))
+ buildRequest(
+ new OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setGroups(List(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-1")
+ .setTopics(List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List[Integer](0).asJava),
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(List[Integer](0).asJava)
+ ).asJava),
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-2")
+ .setTopics(List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List[Integer](0).asJava),
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(List[Integer](0).asJava)
+ ).asJava),
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-3")
+ .setTopics(null),
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-4")
+ .setTopics(null),
+ ).asJava),
+ false
+ ).build(version)
+ )
}
val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
@@ -8662,17 +8703,34 @@ class KafkaApisTest extends Logging {
@Test
def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(): Unit = {
def makeRequest(version: Short): RequestChannel.Request = {
- val groups = Map(
- "group-1" -> List(
- new TopicPartition("foo", 0),
- new TopicPartition("bar", 0)
- ).asJava,
- "group-2" -> List(
- new TopicPartition("foo", 0),
- new TopicPartition("bar", 0)
- ).asJava
- ).asJava
- buildRequest(new OffsetFetchRequest.Builder(groups, false,
false).build(version))
+ buildRequest(
+ new OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setGroups(List(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-1")
+ .setTopics(List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List[Integer](0).asJava),
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(List[Integer](0).asJava)
+ ).asJava),
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-2")
+ .setTopics(List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List[Integer](0).asJava),
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(List[Integer](0).asJava)
+ ).asJava)
+ ).asJava),
+ false
+ ).build(version)
+ )
}
val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 7610e466207..2f001d9baf8 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -292,7 +292,19 @@ class RequestQuotaTest extends BaseRequestTest {
)
)
case ApiKeys.OFFSET_FETCH =>
- new OffsetFetchRequest.Builder(Map("test-group"->
List(tp).asJava).asJava, false, false)
+ new OffsetFetchRequest.Builder(
+ new OffsetFetchRequestData()
+ .setGroups(List(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("test-group")
+ .setTopics(List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName(tp.topic)
+ .setPartitionIndexes(List[Integer](tp.partition).asJava)
+ ).asJava)
+ ).asJava),
+ false
+ )
case ApiKeys.FIND_COORDINATOR =>
new FindCoordinatorRequest.Builder(