This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 471a5e0 KAFKA-12234: Implement request/response for offsetFetch
batching (KIP-709) (#10962)
471a5e0 is described below
commit 471a5e0e9c107e94031f368abe7672b142eca476
Author: Sanjana Kaundinya <[email protected]>
AuthorDate: Wed Jul 7 03:55:00 2021 -0700
KAFKA-12234: Implement request/response for offsetFetch batching (KIP-709)
(#10962)
This implements the request and response portion of KIP-709. It updates the
OffsetFetch request and response to support fetching offsets for multiple
consumer groups at a time. If the broker does not support the new OffsetFetch
version, clients can revert to the previous behaviour and use a request for
each coordinator.
Reviewers: Rajini Sivaram <[email protected]>, Konstantine
Karantasis <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../internals/ListConsumerGroupOffsetsHandler.java | 10 +-
.../consumer/internals/ConsumerCoordinator.java | 26 +-
.../kafka/common/requests/OffsetFetchRequest.java | 170 ++++++++++-
.../kafka/common/requests/OffsetFetchResponse.java | 131 +++++++-
.../common/message/OffsetFetchRequest.json | 26 +-
.../common/message/OffsetFetchResponse.json | 49 ++-
.../ListConsumerGroupOffsetsHandlerTest.java | 8 +-
.../apache/kafka/common/message/MessageTest.java | 242 ++++++++++++++-
.../common/requests/OffsetFetchRequestTest.java | 194 +++++++++---
.../common/requests/OffsetFetchResponseTest.java | 331 +++++++++++++++++----
.../kafka/common/requests/RequestResponseTest.java | 147 +++++++--
core/src/main/scala/kafka/server/KafkaApis.scala | 173 +++++++----
.../kafka/api/AuthorizerIntegrationTest.scala | 215 ++++++++++++-
.../unit/kafka/server/OffsetFetchRequestTest.scala | 237 +++++++++++++++
15 files changed, 1701 insertions(+), 260 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index cfa7272..0b1ccb0 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -85,7 +85,7 @@
files="clients[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity"
- files="MessageTest.java"/>
+ files="MessageTest.java|OffsetFetchRequest.java"/>
<!-- Clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
index 4439bc3..240516d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
@@ -87,12 +87,14 @@ public class ListConsumerGroupOffsetsHandler implements
AdminApiHandler<Coordina
Map<CoordinatorKey, Throwable> failed = new HashMap<>();
List<CoordinatorKey> unmapped = new ArrayList<>();
- if (response.error() != Errors.NONE) {
- handleError(groupId, response.error(), failed, unmapped);
+ Errors responseError = response.groupLevelError(groupId.idValue);
+ if (responseError != Errors.NONE) {
+ handleError(groupId, responseError, failed, unmapped);
} else {
final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing =
new HashMap<>();
- for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData>
entry :
- response.responseData().entrySet()) {
+ Map<TopicPartition, OffsetFetchResponse.PartitionData>
partitionDataMap =
+ response.partitionDataMap(groupId.idValue);
+ for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData>
entry : partitionDataMap.entrySet()) {
final TopicPartition topicPartition = entry.getKey();
OffsetFetchResponse.PartitionData partitionData =
entry.getValue();
final Errors error = partitionData.error;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 39f4520..68cf8a9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1308,29 +1308,31 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
@Override
public void handle(OffsetFetchResponse response,
RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
- if (response.hasError()) {
- Errors error = response.error();
- log.debug("Offset fetch failed: {}", error.message());
+ Errors responseError =
response.groupLevelError(rebalanceConfig.groupId);
+ if (responseError != Errors.NONE) {
+ log.debug("Offset fetch failed: {}", responseError.message());
- if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+ if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
// just retry
- future.raise(error);
- } else if (error == Errors.NOT_COORDINATOR) {
+ future.raise(responseError);
+ } else if (responseError == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry
- markCoordinatorUnknown(error);
- future.raise(error);
- } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+ markCoordinatorUnknown(responseError);
+ future.raise(responseError);
+ } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED)
{
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else {
- future.raise(new KafkaException("Unexpected error in fetch
offset response: " + error.message()));
+ future.raise(new KafkaException("Unexpected error in fetch
offset response: " + responseError.message()));
}
return;
}
Set<String> unauthorizedTopics = null;
- Map<TopicPartition, OffsetAndMetadata> offsets = new
HashMap<>(response.responseData().size());
+ Map<TopicPartition, OffsetFetchResponse.PartitionData>
responseData =
+ response.partitionDataMap(rebalanceConfig.groupId);
+ Map<TopicPartition, OffsetAndMetadata> offsets = new
HashMap<>(responseData.size());
Set<TopicPartition> unstableTxnOffsetTopicPartitions = new
HashSet<>();
- for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData>
entry : response.responseData().entrySet()) {
+ for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData>
entry : responseData.entrySet()) {
TopicPartition tp = entry.getKey();
OffsetFetchResponse.PartitionData partitionData =
entry.getValue();
if (partitionData.hasError()) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index c35d479..c5c094a 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -16,10 +16,15 @@
*/
package org.apache.kafka.common.requests;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetFetchRequestData;
+import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopic;
+import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
@@ -38,6 +43,7 @@ public class OffsetFetchRequest extends AbstractRequest {
private static final Logger log =
LoggerFactory.getLogger(OffsetFetchRequest.class);
private static final List<OffsetFetchRequestTopic> ALL_TOPIC_PARTITIONS =
null;
+ private static final List<OffsetFetchRequestTopics>
ALL_TOPIC_PARTITIONS_BATCH = null;
private final OffsetFetchRequestData data;
public static class Builder extends
AbstractRequest.Builder<OffsetFetchRequest> {
@@ -78,26 +84,107 @@ public class OffsetFetchRequest extends AbstractRequest {
return this.data.topics() == ALL_TOPIC_PARTITIONS;
}
+ public Builder(Map<String, List<TopicPartition>>
groupIdToTopicPartitionMap,
+ boolean requireStable,
+ boolean throwOnFetchStableOffsetsUnsupported) {
+ super(ApiKeys.OFFSET_FETCH);
+
+ List<OffsetFetchRequestGroup> groups = new ArrayList<>();
+ for (Entry<String, List<TopicPartition>> entry :
groupIdToTopicPartitionMap.entrySet()) {
+ String groupName = entry.getKey();
+ List<TopicPartition> tpList = entry.getValue();
+ final List<OffsetFetchRequestTopics> topics;
+ if (tpList != null) {
+ Map<String, OffsetFetchRequestTopics>
offsetFetchRequestTopicMap =
+ new HashMap<>();
+ for (TopicPartition topicPartition : tpList) {
+ String topicName = topicPartition.topic();
+ OffsetFetchRequestTopics topic =
offsetFetchRequestTopicMap.getOrDefault(
+ topicName, new
OffsetFetchRequestTopics().setName(topicName));
+
topic.partitionIndexes().add(topicPartition.partition());
+ offsetFetchRequestTopicMap.put(topicName, topic);
+ }
+ topics = new
ArrayList<>(offsetFetchRequestTopicMap.values());
+ } else {
+ topics = ALL_TOPIC_PARTITIONS_BATCH;
+ }
+ groups.add(new OffsetFetchRequestGroup()
+ .setGroupId(groupName)
+ .setTopics(topics));
+ }
+ this.data = new OffsetFetchRequestData()
+ .setGroups(groups)
+ .setRequireStable(requireStable);
+ this.throwOnFetchStableOffsetsUnsupported =
throwOnFetchStableOffsetsUnsupported;
+ }
+
@Override
public OffsetFetchRequest build(short version) {
if (isAllTopicPartitions() && version < 2) {
throw new UnsupportedVersionException("The broker only
supports OffsetFetchRequest " +
"v" + version + ", but we need v2 or newer to request all
topic partitions.");
}
-
+ if (data.groups().size() > 1 && version < 8) {
+ throw new NoBatchedOffsetFetchRequestException("Broker does
not support"
+ + " batching groups for fetch offset request on version "
+ version);
+ }
if (data.requireStable() && version < 7) {
if (throwOnFetchStableOffsetsUnsupported) {
throw new UnsupportedVersionException("Broker unexpectedly
" +
"doesn't support requireStable flag on version " +
version);
} else {
log.trace("Fallback the requireStable flag to false as
broker " +
- "only supports OffsetFetchRequest version
{}. Need " +
- "v7 or newer to enable this feature",
version);
-
- return new
OffsetFetchRequest(data.setRequireStable(false), version);
+ "only supports OffsetFetchRequest version {}. Need " +
+ "v7 or newer to enable this feature", version);
+ data.setRequireStable(false);
+ }
+ }
+ // convert data to use the appropriate version since version 8
uses different format
+ if (version < 8) {
+ OffsetFetchRequestData oldDataFormat = null;
+ if (!data.groups().isEmpty()) {
+ OffsetFetchRequestGroup group = data.groups().get(0);
+ String groupName = group.groupId();
+ List<OffsetFetchRequestTopics> topics = group.topics();
+ List<OffsetFetchRequestTopic> oldFormatTopics = null;
+ if (topics != null) {
+ oldFormatTopics = topics
+ .stream()
+ .map(t ->
+ new OffsetFetchRequestTopic()
+ .setName(t.name())
+ .setPartitionIndexes(t.partitionIndexes()))
+ .collect(Collectors.toList());
+ }
+ oldDataFormat = new OffsetFetchRequestData()
+ .setGroupId(groupName)
+ .setTopics(oldFormatTopics)
+ .setRequireStable(data.requireStable());
+ }
+ return new OffsetFetchRequest(oldDataFormat == null ? data :
oldDataFormat, version);
+ } else {
+ if (data.groups().isEmpty()) {
+ String groupName = data.groupId();
+ List<OffsetFetchRequestTopic> oldFormatTopics =
data.topics();
+ List<OffsetFetchRequestTopics> topics = null;
+ if (oldFormatTopics != null) {
+ topics = oldFormatTopics
+ .stream()
+ .map(t -> new OffsetFetchRequestTopics()
+ .setName(t.name())
+ .setPartitionIndexes(t.partitionIndexes()))
+ .collect(Collectors.toList());
+ }
+ OffsetFetchRequestData convertedDataFormat =
+ new OffsetFetchRequestData()
+ .setGroups(Collections.singletonList(
+ new OffsetFetchRequestGroup()
+ .setGroupId(groupName)
+ .setTopics(topics)))
+ .setRequireStable(data.requireStable());
+ return new OffsetFetchRequest(convertedDataFormat,
version);
}
}
-
return new OffsetFetchRequest(data, version);
}
@@ -107,6 +194,18 @@ public class OffsetFetchRequest extends AbstractRequest {
}
}
+ /**
+ * Indicates that it is not possible to fetch consumer groups in batches
with FetchOffset.
+ * Instead consumer groups' offsets must be fetched one by one.
+ */
+ public static class NoBatchedOffsetFetchRequestException extends
UnsupportedVersionException {
+ private static final long serialVersionUID = 1L;
+
+ public NoBatchedOffsetFetchRequestException(String message) {
+ super(message);
+ }
+ }
+
public List<TopicPartition> partitions() {
if (isAllPartitions()) {
return null;
@@ -128,6 +227,37 @@ public class OffsetFetchRequest extends AbstractRequest {
return data.requireStable();
}
+ public Map<String, List<TopicPartition>> groupIdsToPartitions() {
+ Map<String, List<TopicPartition>> groupIdsToPartitions = new
HashMap<>();
+ for (OffsetFetchRequestGroup group : data.groups()) {
+ List<TopicPartition> tpList = null;
+ if (group.topics() != ALL_TOPIC_PARTITIONS_BATCH) {
+ tpList = new ArrayList<>();
+ for (OffsetFetchRequestTopics topic : group.topics()) {
+ for (Integer partitionIndex : topic.partitionIndexes()) {
+ tpList.add(new TopicPartition(topic.name(),
partitionIndex));
+ }
+ }
+ }
+ groupIdsToPartitions.put(group.groupId(), tpList);
+ }
+ return groupIdsToPartitions;
+ }
+
+ public Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics() {
+ Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics =
+ new HashMap<>(data.groups().size());
+ data.groups().forEach(g -> groupIdsToTopics.put(g.groupId(),
g.topics()));
+ return groupIdsToTopics;
+ }
+
+ public List<String> groupIds() {
+ return data.groups()
+ .stream()
+ .map(OffsetFetchRequestGroup::groupId)
+ .collect(Collectors.toList());
+ }
+
private OffsetFetchRequest(OffsetFetchRequestData data, short version) {
super(ApiKeys.OFFSET_FETCH, version);
this.data = data;
@@ -152,13 +282,23 @@ public class OffsetFetchRequest extends AbstractRequest {
new TopicPartition(topic.name(), partitionIndex),
partitionError);
}
}
+ return new OffsetFetchResponse(error, responsePartitions);
}
-
- if (version() >= 3) {
- return new OffsetFetchResponse(throttleTimeMs, error,
responsePartitions);
- } else {
+ if (version() == 2) {
return new OffsetFetchResponse(error, responsePartitions);
}
+ if (version() >= 3 && version() < 8) {
+ return new OffsetFetchResponse(throttleTimeMs, error,
responsePartitions);
+ }
+ List<String> groupIds = groupIds();
+ Map<String, Errors> errorsMap = new HashMap<>(groupIds.size());
+ Map<String, Map<TopicPartition, OffsetFetchResponse.PartitionData>>
partitionMap =
+ new HashMap<>(groupIds.size());
+ for (String g : groupIds) {
+ errorsMap.put(g, error);
+ partitionMap.put(g, responsePartitions);
+ }
+ return new OffsetFetchResponse(throttleTimeMs, errorsMap,
partitionMap);
}
@Override
@@ -174,6 +314,16 @@ public class OffsetFetchRequest extends AbstractRequest {
return data.topics() == ALL_TOPIC_PARTITIONS;
}
+ public boolean isAllPartitionsForGroup(String groupId) {
+ OffsetFetchRequestGroup group = data
+ .groups()
+ .stream()
+ .filter(g -> g.groupId().equals(groupId))
+ .collect(Collectors.toList())
+ .get(0);
+ return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
+ }
+
@Override
public OffsetFetchRequestData data() {
return data;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 594eb0e..213182e 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -16,10 +16,15 @@
*/
package org.apache.kafka.common.requests;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.OffsetFetchResponseData;
+import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
+import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartitions;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopic;
+import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
@@ -65,6 +70,7 @@ public class OffsetFetchResponse extends AbstractResponse {
private final OffsetFetchResponseData data;
private final Errors error;
+ private final Map<String, Errors> groupLevelErrors = new HashMap<>();
public static final class PartitionData {
public final long offset;
@@ -113,8 +119,14 @@ public class OffsetFetchResponse extends AbstractResponse {
}
}
+ public OffsetFetchResponse(OffsetFetchResponseData data) {
+ super(ApiKeys.OFFSET_FETCH);
+ this.data = data;
+ this.error = null;
+ }
+
/**
- * Constructor for all versions without throttle time.
+ * Constructor without throttle time.
* @param error Potential coordinator or group level error code (for api
version 2 and later)
* @param responseData Fetched offset information grouped by
topic-partition
*/
@@ -123,7 +135,7 @@ public class OffsetFetchResponse extends AbstractResponse {
}
/**
- * Constructor with throttle time
+ * Constructor with throttle time for version 0 to 7
* @param throttleTimeMs The time in milliseconds that this response was
throttled
* @param error Potential coordinator or group level error code (for api
version 2 and later)
* @param responseData Fetched offset information grouped by
topic-partition
@@ -154,6 +166,48 @@ public class OffsetFetchResponse extends AbstractResponse {
this.error = error;
}
+ /**
+ * Constructor with throttle time for version 8 and above.
+ * @param throttleTimeMs The time in milliseconds that this response was
throttled
+ * @param errors Potential coordinator or group level error code
+ * @param responseData Fetched offset information grouped by
topic-partition and by group
+ */
+ public OffsetFetchResponse(int throttleTimeMs,
+ Map<String, Errors> errors, Map<String,
+ Map<TopicPartition, PartitionData>>
responseData) {
+ super(ApiKeys.OFFSET_FETCH);
+ List<OffsetFetchResponseGroup> groupList = new ArrayList<>();
+ for (Entry<String, Map<TopicPartition, PartitionData>> entry :
responseData.entrySet()) {
+ String groupName = entry.getKey();
+ Map<TopicPartition, PartitionData> partitionDataMap =
entry.getValue();
+ Map<String, OffsetFetchResponseTopics>
offsetFetchResponseTopicsMap = new HashMap<>();
+ for (Entry<TopicPartition, PartitionData> partitionEntry :
partitionDataMap.entrySet()) {
+ String topicName = partitionEntry.getKey().topic();
+ OffsetFetchResponseTopics topic =
+ offsetFetchResponseTopicsMap.getOrDefault(topicName,
+ new OffsetFetchResponseTopics().setName(topicName));
+ PartitionData partitionData = partitionEntry.getValue();
+ topic.partitions().add(new OffsetFetchResponsePartitions()
+ .setPartitionIndex(partitionEntry.getKey().partition())
+ .setErrorCode(partitionData.error.code())
+ .setCommittedOffset(partitionData.offset)
+ .setCommittedLeaderEpoch(
+
partitionData.leaderEpoch.orElse(NO_PARTITION_LEADER_EPOCH))
+ .setMetadata(partitionData.metadata));
+ offsetFetchResponseTopicsMap.put(topicName, topic);
+ }
+ groupList.add(new OffsetFetchResponseGroup()
+ .setGroupId(groupName)
+ .setTopics(new
ArrayList<>(offsetFetchResponseTopicsMap.values()))
+ .setErrorCode(errors.get(groupName).code()));
+ groupLevelErrors.put(groupName, errors.get(groupName));
+ }
+ this.data = new OffsetFetchResponseData()
+ .setGroups(groupList)
+ .setThrottleTimeMs(throttleTimeMs);
+ this.error = null;
+ }
+
public OffsetFetchResponse(OffsetFetchResponseData data, short version) {
super(ApiKeys.OFFSET_FETCH);
this.data = data;
@@ -161,7 +215,17 @@ public class OffsetFetchResponse extends AbstractResponse {
// for older versions there is no top-level error in the response and
all errors are partition errors,
// so if there is a group or coordinator error at the partition level
use that as the top-level error.
// this way clients can depend on the top-level error regardless of
the offset fetch version.
- this.error = version >= 2 ? Errors.forCode(data.errorCode()) :
topLevelError(data);
+ // we return the error differently starting with version 8, so we will
only populate the
+ // error field if we are between version 2 and 7. if we are in version
8 or greater, then
+ // we will populate the map of group id to error codes.
+ if (version < 8) {
+ this.error = version >= 2 ? Errors.forCode(data.errorCode()) :
topLevelError(data);
+ } else {
+ for (OffsetFetchResponseGroup group : data.groups()) {
+ this.groupLevelErrors.put(group.groupId(),
Errors.forCode(group.errorCode()));
+ }
+ this.error = null;
+ }
}
private static Errors topLevelError(OffsetFetchResponseData data) {
@@ -185,21 +249,46 @@ public class OffsetFetchResponse extends AbstractResponse
{
return error != Errors.NONE;
}
+ public boolean groupHasError(String groupId) {
+ return groupLevelErrors.get(groupId) != Errors.NONE;
+ }
+
public Errors error() {
return error;
}
+ public Errors groupLevelError(String groupId) {
+ if (error != null) {
+ return error;
+ }
+ return groupLevelErrors.get(groupId);
+ }
+
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
- updateErrorCounts(counts, error);
- data.topics().forEach(topic ->
- topic.partitions().forEach(partition ->
+ if (!groupLevelErrors.isEmpty()) {
+ // built response with v8 or above
+ for (Map.Entry<String, Errors> entry :
groupLevelErrors.entrySet()) {
+ updateErrorCounts(counts, entry.getValue());
+ }
+ for (OffsetFetchResponseGroup group : data.groups()) {
+ group.topics().forEach(topic ->
+ topic.partitions().forEach(partition ->
updateErrorCounts(counts,
Errors.forCode(partition.errorCode()))));
+ }
+ } else {
+ // built response with v0-v7
+ updateErrorCounts(counts, error);
+ data.topics().forEach(topic ->
+ topic.partitions().forEach(partition ->
+ updateErrorCounts(counts,
Errors.forCode(partition.errorCode()))));
+ }
return counts;
}
- public Map<TopicPartition, PartitionData> responseData() {
+ // package-private for testing purposes
+ Map<TopicPartition, PartitionData> responseDataV0ToV7() {
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
for (OffsetFetchResponseTopic topic : data.topics()) {
for (OffsetFetchResponsePartition partition : topic.partitions()) {
@@ -214,6 +303,34 @@ public class OffsetFetchResponse extends AbstractResponse {
return responseData;
}
+ private Map<TopicPartition, PartitionData> buildResponseData(String
groupId) {
+ Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+ OffsetFetchResponseGroup group = data
+ .groups()
+ .stream()
+ .filter(g -> g.groupId().equals(groupId))
+ .collect(Collectors.toList())
+ .get(0);
+ for (OffsetFetchResponseTopics topic : group.topics()) {
+ for (OffsetFetchResponsePartitions partition : topic.partitions())
{
+ responseData.put(new TopicPartition(topic.name(),
partition.partitionIndex()),
+ new PartitionData(partition.committedOffset(),
+
RequestUtils.getLeaderEpoch(partition.committedLeaderEpoch()),
+ partition.metadata(),
+ Errors.forCode(partition.errorCode()))
+ );
+ }
+ }
+ return responseData;
+ }
+
+ public Map<TopicPartition, PartitionData> partitionDataMap(String groupId)
{
+ if (groupLevelErrors.isEmpty()) {
+ return responseDataV0ToV7();
+ }
+ return buildResponseData(groupId);
+ }
+
public static OffsetFetchResponse parse(ByteBuffer buffer, short version) {
return new OffsetFetchResponse(new OffsetFetchResponseData(new
ByteBufferAccessor(buffer), version), version);
}
diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json
b/clients/src/main/resources/common/message/OffsetFetchRequest.json
index d4a4d5f..8f3c414 100644
--- a/clients/src/main/resources/common/message/OffsetFetchRequest.json
+++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json
@@ -31,19 +31,33 @@
// Version 6 is the first flexible version.
//
// Version 7 is adding the require stable flag.
- "validVersions": "0-7",
+ //
+ // Version 8 is adding support for fetching offsets for multiple groups at a
time
+ "validVersions": "0-8",
"flexibleVersions": "6+",
"fields": [
- { "name": "GroupId", "type": "string", "versions": "0+", "entityType":
"groupId",
+ { "name": "GroupId", "type": "string", "versions": "0-7", "entityType":
"groupId",
"about": "The group to fetch offsets for." },
- { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+",
"nullableVersions": "2+",
+ { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions":
"0-7", "nullableVersions": "2-7",
"about": "Each topic we would like to fetch offsets for, or null to
fetch offsets for all topics.", "fields": [
- { "name": "Name", "type": "string", "versions": "0+", "entityType":
"topicName",
+ { "name": "Name", "type": "string", "versions": "0-7", "entityType":
"topicName",
"about": "The topic name."},
- { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
+ { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7",
"about": "The partition indexes we would like to fetch offsets for." }
]},
+ { "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+",
+ "about": "Each group we would like to fetch offsets for", "fields": [
+ { "name": "groupId", "type": "string", "versions": "8+", "entityType":
"groupId",
+ "about": "The group ID."},
+ { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions":
"8+", "nullableVersions": "8+",
+ "about": "Each topic we would like to fetch offsets for, or null to
fetch offsets for all topics.", "fields": [
+ { "name": "Name", "type": "string", "versions": "8+", "entityType":
"topicName",
+ "about": "The topic name."},
+ { "name": "PartitionIndexes", "type": "[]int32", "versions": "8+",
+ "about": "The partition indexes we would like to fetch offsets for."
}
+ ]}
+ ]},
{"name": "RequireStable", "type": "bool", "versions": "7+", "default":
"false",
- "about": "Whether broker should hold on returning unstable offsets but
set a retriable error code for the partition."}
+ "about": "Whether broker should hold on returning unstable offsets but
set a retriable error code for the partitions."}
]
}
diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json
b/clients/src/main/resources/common/message/OffsetFetchResponse.json
index b977701..dfad60e 100644
--- a/clients/src/main/resources/common/message/OffsetFetchResponse.json
+++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json
@@ -30,30 +30,57 @@
// Version 6 is the first flexible version.
//
// Version 7 adds pending offset commit as new error response on partition
level.
- "validVersions": "0-7",
+ //
+ // Version 8 is adding support for fetching offsets for multiple groups
+ "validVersions": "0-8",
"flexibleVersions": "6+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+",
"ignorable": true,
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
- { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions":
"0+",
+ { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions":
"0-7",
"about": "The responses per topic.", "fields": [
- { "name": "Name", "type": "string", "versions": "0+", "entityType":
"topicName",
+ { "name": "Name", "type": "string", "versions": "0-7", "entityType":
"topicName",
"about": "The topic name." },
- { "name": "Partitions", "type": "[]OffsetFetchResponsePartition",
"versions": "0+",
+ { "name": "Partitions", "type": "[]OffsetFetchResponsePartition",
"versions": "0-7",
"about": "The responses per partition", "fields": [
- { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+ { "name": "PartitionIndex", "type": "int32", "versions": "0-7",
"about": "The partition index." },
- { "name": "CommittedOffset", "type": "int64", "versions": "0+",
+ { "name": "CommittedOffset", "type": "int64", "versions": "0-7",
"about": "The committed message offset." },
- { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+",
"default": "-1",
+ { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7",
"default": "-1",
"ignorable": true, "about": "The leader epoch." },
- { "name": "Metadata", "type": "string", "versions": "0+",
"nullableVersions": "0+",
+ { "name": "Metadata", "type": "string", "versions": "0-7",
"nullableVersions": "0-7",
"about": "The partition metadata." },
- { "name": "ErrorCode", "type": "int16", "versions": "0+",
+ { "name": "ErrorCode", "type": "int16", "versions": "0-7",
"about": "The error code, or 0 if there was no error." }
]}
]},
- { "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0",
"ignorable": true,
- "about": "The top-level error code, or 0 if there was no error." }
+ { "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0",
"ignorable": true,
+ "about": "The top-level error code, or 0 if there was no error." },
+ {"name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
+ "about": "The responses per group id.", "fields": [
+ { "name": "groupId", "type": "string", "versions": "8+", "entityType":
"groupId",
+ "about": "The group ID." },
+ { "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions":
"8+",
+ "about": "The responses per topic.", "fields": [
+ { "name": "Name", "type": "string", "versions": "8+", "entityType":
"topicName",
+ "about": "The topic name." },
+ { "name": "Partitions", "type": "[]OffsetFetchResponsePartitions",
"versions": "8+",
+ "about": "The responses per partition", "fields": [
+ { "name": "PartitionIndex", "type": "int32", "versions": "8+",
+ "about": "The partition index." },
+ { "name": "CommittedOffset", "type": "int64", "versions": "8+",
+ "about": "The committed message offset." },
+ { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "8+",
"default": "-1",
+ "ignorable": true, "about": "The leader epoch." },
+ { "name": "Metadata", "type": "string", "versions": "8+",
"nullableVersions": "8+",
+ "about": "The partition metadata." },
+ { "name": "ErrorCode", "type": "int16", "versions": "8+",
+ "about": "The partition-level error code, or 0 if there was no
error." }
+ ]}
+ ]},
+ { "name": "ErrorCode", "type": "int16", "versions": "8+", "default": "0",
+ "about": "The group-level error code, or 0 if there was no error." }
+ ]}
]
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
index 5c98940..b461ea3 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
@@ -55,10 +55,10 @@ public class ListConsumerGroupOffsetsHandlerTest {
public void testBuildRequest() {
ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
OffsetFetchRequest request = handler.buildRequest(1,
singleton(CoordinatorKey.byGroupId(groupId))).build();
- assertEquals(groupId, request.data().groupId());
- assertEquals(2, request.data().topics().size());
- assertEquals(2,
request.data().topics().get(0).partitionIndexes().size());
- assertEquals(2,
request.data().topics().get(1).partitionIndexes().size());
+ assertEquals(groupId, request.data().groups().get(0).groupId());
+ assertEquals(2, request.data().groups().get(0).topics().size());
+ assertEquals(2,
request.data().groups().get(0).topics().get(0).partitionIndexes().size());
+ assertEquals(2,
request.data().groups().get(0).topics().get(1).partitionIndexes().size());
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index e191ad6..aa00c24 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -38,9 +38,14 @@ import
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitReque
import
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
import
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
+import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopic;
+import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
+import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
+import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartitions;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopic;
+import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopics;
import
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
import
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
import
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
@@ -75,6 +80,7 @@ public final class MessageTest {
private final String memberId = "memberId";
private final String instanceId = "instanceId";
+ private final List<Integer> listOfVersionsNonBatchOffsetFetch =
Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7);
@Test
public void testAddOffsetsToTxnVersions() throws Exception {
@@ -607,7 +613,7 @@ public final class MessageTest {
}
@Test
- public void testOffsetFetchVersions() throws Exception {
+ public void testOffsetFetchV0ToV7() throws Exception {
String groupId = "groupId";
String topicName = "topic";
@@ -615,11 +621,11 @@ public final class MessageTest {
new OffsetFetchRequestTopic()
.setName(topicName)
.setPartitionIndexes(Collections.singletonList(5)));
- testAllMessageRoundTrips(new OffsetFetchRequestData()
+ testAllMessageRoundTripsOffsetFetchV0ToV7(new OffsetFetchRequestData()
.setTopics(new ArrayList<>())
.setGroupId(groupId));
- testAllMessageRoundTrips(new OffsetFetchRequestData()
+ testAllMessageRoundTripsOffsetFetchV0ToV7(new OffsetFetchRequestData()
.setGroupId(groupId)
.setTopics(topics));
@@ -632,18 +638,18 @@ public final class MessageTest {
.setTopics(topics)
.setRequireStable(true);
- for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
- final short finalVersion = version;
+ for (int version : listOfVersionsNonBatchOffsetFetch) {
+ final short finalVersion = (short) version;
if (version < 2) {
- assertThrows(NullPointerException.class, () ->
testAllMessageRoundTripsFromVersion(finalVersion, allPartitionData));
+ assertThrows(NullPointerException.class, () ->
testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7(finalVersion,
allPartitionData));
} else {
- testAllMessageRoundTripsFromVersion(version, allPartitionData);
+ testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7((short)
version, allPartitionData);
}
if (version < 7) {
- assertThrows(UnsupportedVersionException.class, () ->
testAllMessageRoundTripsFromVersion(finalVersion, requireStableData));
+ assertThrows(UnsupportedVersionException.class, () ->
testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7(finalVersion,
requireStableData));
} else {
- testAllMessageRoundTripsFromVersion(finalVersion,
requireStableData);
+
testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7(finalVersion,
requireStableData);
}
}
@@ -661,7 +667,7 @@ public final class MessageTest {
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())))))
.setErrorCode(Errors.NOT_COORDINATOR.code())
.setThrottleTimeMs(10);
- for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+ for (int version : listOfVersionsNonBatchOffsetFetch) {
OffsetFetchResponseData responseData = response.get();
if (version <= 1) {
responseData.setErrorCode(Errors.NONE.code());
@@ -675,7 +681,221 @@ public final class MessageTest {
responseData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1);
}
- testAllMessageRoundTripsFromVersion(version, responseData);
+ testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7((short)
version, responseData);
+ }
+ }
+
+ private void testAllMessageRoundTripsOffsetFetchV0ToV7(Message message)
throws Exception {
+ testDuplication(message);
+
testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7(message.lowestSupportedVersion(),
message);
+ }
+
+ private void testAllMessageRoundTripsOffsetFetchFromVersionV0ToV7(short
fromVersion,
+ Message message) throws Exception {
+ for (short version = fromVersion; version <= 7; version++) {
+ testEquivalentMessageRoundTrip(version, message);
+ }
+ }
+
+ @Test
+ public void testOffsetFetchV8AndAboveSingleGroup() throws Exception {
+ String groupId = "groupId";
+ String topicName = "topic";
+
+ List<OffsetFetchRequestTopics> topic = Collections.singletonList(
+ new OffsetFetchRequestTopics()
+ .setName(topicName)
+ .setPartitionIndexes(Collections.singletonList(5)));
+
+ OffsetFetchRequestData allPartitionData = new OffsetFetchRequestData()
+ .setGroups(Collections.singletonList(
+ new OffsetFetchRequestGroup()
+ .setGroupId(groupId)
+ .setTopics(null)));
+
+ OffsetFetchRequestData specifiedPartitionData = new
OffsetFetchRequestData()
+ .setGroups(Collections.singletonList(
+ new OffsetFetchRequestGroup()
+ .setGroupId(groupId)
+ .setTopics(topic)))
+ .setRequireStable(true);
+
+ testAllMessageRoundTripsOffsetFetchV8AndAbove(allPartitionData);
+ testAllMessageRoundTripsOffsetFetchV8AndAbove(specifiedPartitionData);
+
+ for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+ if (version >= 8) {
+
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version,
specifiedPartitionData);
+
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version,
allPartitionData);
+ }
+ }
+
+ Supplier<OffsetFetchResponseData> response =
+ () -> new OffsetFetchResponseData()
+ .setGroups(Collections.singletonList(
+ new OffsetFetchResponseGroup()
+ .setGroupId(groupId)
+ .setTopics(Collections.singletonList(
+ new OffsetFetchResponseTopics()
+ .setPartitions(Collections.singletonList(
+ new OffsetFetchResponsePartitions()
+ .setPartitionIndex(5)
+ .setMetadata(null)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(3)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())))))
+ .setErrorCode(Errors.NOT_COORDINATOR.code())))
+ .setThrottleTimeMs(10);
+ for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+ if (version >= 8) {
+ OffsetFetchResponseData responseData = response.get();
+
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, responseData);
+ }
+ }
+ }
+
+ @Test
+ public void testOffsetFetchV8AndAbove() throws Exception {
+ String groupOne = "group1";
+ String groupTwo = "group2";
+ String groupThree = "group3";
+ String groupFour = "group4";
+ String groupFive = "group5";
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String topic3 = "topic3";
+
+ OffsetFetchRequestTopics topicOne = new OffsetFetchRequestTopics()
+ .setName(topic1)
+ .setPartitionIndexes(Collections.singletonList(5));
+ OffsetFetchRequestTopics topicTwo = new OffsetFetchRequestTopics()
+ .setName(topic2)
+ .setPartitionIndexes(Collections.singletonList(10));
+ OffsetFetchRequestTopics topicThree = new OffsetFetchRequestTopics()
+ .setName(topic3)
+ .setPartitionIndexes(Collections.singletonList(15));
+
+ List<OffsetFetchRequestTopics> groupOneTopics =
singletonList(topicOne);
+ OffsetFetchRequestGroup group1 =
+ new OffsetFetchRequestGroup()
+ .setGroupId(groupOne)
+ .setTopics(groupOneTopics);
+
+ List<OffsetFetchRequestTopics> groupTwoTopics =
Arrays.asList(topicOne, topicTwo);
+ OffsetFetchRequestGroup group2 =
+ new OffsetFetchRequestGroup()
+ .setGroupId(groupTwo)
+ .setTopics(groupTwoTopics);
+
+ List<OffsetFetchRequestTopics> groupThreeTopics =
Arrays.asList(topicOne, topicTwo, topicThree);
+ OffsetFetchRequestGroup group3 =
+ new OffsetFetchRequestGroup()
+ .setGroupId(groupThree)
+ .setTopics(groupThreeTopics);
+
+ OffsetFetchRequestGroup group4 =
+ new OffsetFetchRequestGroup()
+ .setGroupId(groupFour)
+ .setTopics(null);
+
+ OffsetFetchRequestGroup group5 =
+ new OffsetFetchRequestGroup()
+ .setGroupId(groupFive)
+ .setTopics(null);
+
+ OffsetFetchRequestData requestData = new OffsetFetchRequestData()
+ .setGroups(Arrays.asList(group1, group2, group3, group4, group5))
+ .setRequireStable(true);
+
+ testAllMessageRoundTripsOffsetFetchV8AndAbove(requestData);
+
+
testAllMessageRoundTripsOffsetFetchV8AndAbove(requestData.setRequireStable(false));
+
+
+ for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+ if (version >= 8) {
+
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, requestData);
+ }
+ }
+
+ OffsetFetchResponseTopics responseTopic1 =
+ new OffsetFetchResponseTopics()
+ .setName(topic1)
+ .setPartitions(Collections.singletonList(
+ new OffsetFetchResponsePartitions()
+ .setPartitionIndex(5)
+ .setMetadata(null)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(3)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())));
+ OffsetFetchResponseTopics responseTopic2 =
+ new OffsetFetchResponseTopics()
+ .setName(topic2)
+ .setPartitions(Collections.singletonList(
+ new OffsetFetchResponsePartitions()
+ .setPartitionIndex(10)
+ .setMetadata("foo")
+ .setCommittedOffset(200)
+ .setCommittedLeaderEpoch(2)
+
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code())));
+ OffsetFetchResponseTopics responseTopic3 =
+ new OffsetFetchResponseTopics()
+ .setName(topic3)
+ .setPartitions(Collections.singletonList(
+ new OffsetFetchResponsePartitions()
+ .setPartitionIndex(15)
+ .setMetadata("bar")
+ .setCommittedOffset(300)
+ .setCommittedLeaderEpoch(1)
+
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())));
+
+ OffsetFetchResponseGroup responseGroup1 =
+ new OffsetFetchResponseGroup()
+ .setGroupId(groupOne)
+ .setTopics(Collections.singletonList(responseTopic1))
+ .setErrorCode(Errors.NOT_COORDINATOR.code());
+ OffsetFetchResponseGroup responseGroup2 =
+ new OffsetFetchResponseGroup()
+ .setGroupId(groupTwo)
+ .setTopics(Arrays.asList(responseTopic1, responseTopic2))
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+ OffsetFetchResponseGroup responseGroup3 =
+ new OffsetFetchResponseGroup()
+ .setGroupId(groupThree)
+ .setTopics(Arrays.asList(responseTopic1, responseTopic2,
responseTopic3))
+ .setErrorCode(Errors.NONE.code());
+ OffsetFetchResponseGroup responseGroup4 =
+ new OffsetFetchResponseGroup()
+ .setGroupId(groupFour)
+ .setTopics(Arrays.asList(responseTopic1, responseTopic2,
responseTopic3))
+ .setErrorCode(Errors.NONE.code());
+ OffsetFetchResponseGroup responseGroup5 =
+ new OffsetFetchResponseGroup()
+ .setGroupId(groupFive)
+ .setTopics(Arrays.asList(responseTopic1, responseTopic2,
responseTopic3))
+ .setErrorCode(Errors.NONE.code());
+
+ Supplier<OffsetFetchResponseData> response =
+ () -> new OffsetFetchResponseData()
+ .setGroups(Arrays.asList(responseGroup1, responseGroup2,
responseGroup3,
+ responseGroup4, responseGroup5))
+ .setThrottleTimeMs(10);
+ for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+ if (version >= 8) {
+ OffsetFetchResponseData responseData = response.get();
+
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, responseData);
+ }
+ }
+ }
+
+ private void testAllMessageRoundTripsOffsetFetchV8AndAbove(Message
message) throws Exception {
+ testDuplication(message);
+ testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove((short) 8,
message);
+ }
+
+ private void
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(short fromVersion,
Message message) throws Exception {
+ for (short version = fromVersion; version <=
message.highestSupportedVersion(); version++) {
+ testEquivalentMessageRoundTrip(version, message);
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
index ddb2cd9..37076d0 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
@@ -18,10 +18,12 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.OffsetFetchRequest.Builder;
+import
org.apache.kafka.common.requests.OffsetFetchRequest.NoBatchedOffsetFetchRequestException;
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -44,25 +46,24 @@ public class OffsetFetchRequestTest {
private final int partitionOne = 1;
private final String topicTwo = "topic2";
private final int partitionTwo = 2;
- private final String groupId = "groupId";
+ private final String topicThree = "topic3";
+ private final String group1 = "group1";
+ private final String group2 = "group2";
+ private final String group3 = "group3";
+ private final String group4 = "group4";
+ private final String group5 = "group5";
+ private List<String> groups = Arrays.asList(group1, group2, group3,
group4, group5);
+
+ private final List<Integer> listOfVersionsNonBatchOffsetFetch =
Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7);
+
private OffsetFetchRequest.Builder builder;
- private List<TopicPartition> partitions;
-
- @BeforeEach
- public void setUp() {
- partitions = Arrays.asList(new TopicPartition(topicOne, partitionOne),
- new TopicPartition(topicTwo, partitionTwo));
- builder = new OffsetFetchRequest.Builder(
- groupId,
- false,
- partitions,
- false);
- }
@Test
public void testConstructor() {
- assertFalse(builder.isAllTopicPartitions());
+ List<TopicPartition> partitions = Arrays.asList(
+ new TopicPartition(topicOne, partitionOne),
+ new TopicPartition(topicTwo, partitionTwo));
int throttleTimeMs = 10;
Map<TopicPartition, PartitionData> expectedData = new HashMap<>();
@@ -76,60 +77,157 @@ public class OffsetFetchRequestTest {
}
for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
- OffsetFetchRequest request = builder.build(version);
- assertFalse(request.isAllPartitions());
- assertEquals(groupId, request.groupId());
- assertEquals(partitions, request.partitions());
-
- OffsetFetchResponse response =
request.getErrorResponse(throttleTimeMs, Errors.NONE);
- assertEquals(Errors.NONE, response.error());
- assertFalse(response.hasError());
- assertEquals(Collections.singletonMap(Errors.NONE, version <=
(short) 1 ? 3 : 1), response.errorCounts(),
- "Incorrect error count for version " + version);
-
- if (version <= 1) {
- assertEquals(expectedData, response.responseData());
+ if (version < 8) {
+ builder = new OffsetFetchRequest.Builder(
+ group1,
+ false,
+ partitions,
+ false);
+ assertFalse(builder.isAllTopicPartitions());
+ OffsetFetchRequest request = builder.build(version);
+ assertFalse(request.isAllPartitions());
+ assertEquals(group1, request.groupId());
+ assertEquals(partitions, request.partitions());
+
+ OffsetFetchResponse response =
request.getErrorResponse(throttleTimeMs, Errors.NONE);
+ assertEquals(Errors.NONE, response.error());
+ assertFalse(response.hasError());
+ assertEquals(Collections.singletonMap(Errors.NONE, version <=
(short) 1 ? 3 : 1), response.errorCounts(),
+ "Incorrect error count for version " + version);
+
+ if (version <= 1) {
+ assertEquals(expectedData, response.responseDataV0ToV7());
+ }
+
+ if (version >= 3) {
+ assertEquals(throttleTimeMs, response.throttleTimeMs());
+ } else {
+ assertEquals(DEFAULT_THROTTLE_TIME,
response.throttleTimeMs());
+ }
+ } else {
+ builder = new Builder(Collections.singletonMap(group1,
partitions), false, false);
+ OffsetFetchRequest request = builder.build(version);
+ Map<String, List<TopicPartition>> groupToPartitionMap =
+ request.groupIdsToPartitions();
+ Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
+ request.groupIdsToTopics();
+ assertFalse(request.isAllPartitionsForGroup(group1));
+ assertTrue(groupToPartitionMap.containsKey(group1) &&
groupToTopicMap.containsKey(
+ group1));
+ assertEquals(partitions, groupToPartitionMap.get(group1));
+ OffsetFetchResponse response =
request.getErrorResponse(throttleTimeMs, Errors.NONE);
+ assertEquals(Errors.NONE, response.groupLevelError(group1));
+ assertFalse(response.groupHasError(group1));
+ assertEquals(Collections.singletonMap(Errors.NONE, 1),
response.errorCounts(),
+ "Incorrect error count for version " + version);
+ assertEquals(throttleTimeMs, response.throttleTimeMs());
}
+ }
+ }
+
+ @Test
+ public void testConstructorWithMultipleGroups() {
+ List<TopicPartition> topic1Partitions = Arrays.asList(
+ new TopicPartition(topicOne, partitionOne),
+ new TopicPartition(topicOne, partitionTwo));
+ List<TopicPartition> topic2Partitions = Arrays.asList(
+ new TopicPartition(topicTwo, partitionOne),
+ new TopicPartition(topicTwo, partitionTwo));
+ List<TopicPartition> topic3Partitions = Arrays.asList(
+ new TopicPartition(topicThree, partitionOne),
+ new TopicPartition(topicThree, partitionTwo));
+ Map<String, List<TopicPartition>> groupToTp = new HashMap<>();
+ groupToTp.put(group1, topic1Partitions);
+ groupToTp.put(group2, topic2Partitions);
+ groupToTp.put(group3, topic3Partitions);
+ groupToTp.put(group4, null);
+ groupToTp.put(group5, null);
+ int throttleTimeMs = 10;
- if (version >= 3) {
+ for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+ if (version >= 8) {
+ builder = new Builder(groupToTp, false, false);
+ OffsetFetchRequest request = builder.build(version);
+ Map<String, List<TopicPartition>> groupToPartitionMap =
+ request.groupIdsToPartitions();
+ Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
+ request.groupIdsToTopics();
+ assertEquals(groupToTp.keySet(), groupToTopicMap.keySet());
+ assertEquals(groupToTp.keySet(), groupToPartitionMap.keySet());
+ assertFalse(request.isAllPartitionsForGroup(group1));
+ assertFalse(request.isAllPartitionsForGroup(group2));
+ assertFalse(request.isAllPartitionsForGroup(group3));
+ assertTrue(request.isAllPartitionsForGroup(group4));
+ assertTrue(request.isAllPartitionsForGroup(group5));
+ OffsetFetchResponse response =
request.getErrorResponse(throttleTimeMs, Errors.NONE);
+ for (String group : groups) {
+ assertEquals(Errors.NONE, response.groupLevelError(group));
+ assertFalse(response.groupHasError(group));
+ }
+ assertEquals(Collections.singletonMap(Errors.NONE, 5),
response.errorCounts(),
+ "Incorrect error count for version " + version);
assertEquals(throttleTimeMs, response.throttleTimeMs());
- } else {
- assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
}
}
}
@Test
+ public void testBuildThrowForUnsupportedBatchRequest() {
+ for (int version : listOfVersionsNonBatchOffsetFetch) {
+ Map<String, List<TopicPartition>> groupPartitionMap = new
HashMap<>();
+ groupPartitionMap.put(group1, null);
+ groupPartitionMap.put(group2, null);
+ builder = new Builder(groupPartitionMap, true, false);
+ final short finalVersion = (short) version;
+ assertThrows(NoBatchedOffsetFetchRequestException.class, () ->
builder.build(finalVersion));
+ }
+ }
+
+ @Test
public void testConstructorFailForUnsupportedRequireStable() {
for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
- // The builder needs to be initialized every cycle as the internal
data `requireStable` flag is flipped.
- builder = new OffsetFetchRequest.Builder(groupId, true, null,
false);
- final short finalVersion = version;
- if (version < 2) {
- assertThrows(UnsupportedVersionException.class, () ->
builder.build(finalVersion));
- } else {
- OffsetFetchRequest request = builder.build(finalVersion);
- assertEquals(groupId, request.groupId());
- assertNull(request.partitions());
- assertTrue(request.isAllPartitions());
- if (version < 7) {
- assertFalse(request.requireStable());
+ if (version < 8) {
+ // The builder needs to be initialized every cycle as the
internal data `requireStable` flag is flipped.
+ builder = new OffsetFetchRequest.Builder(group1, true, null,
false);
+ final short finalVersion = version;
+ if (version < 2) {
+ assertThrows(UnsupportedVersionException.class, () ->
builder.build(finalVersion));
} else {
- assertTrue(request.requireStable());
+ OffsetFetchRequest request = builder.build(finalVersion);
+ assertEquals(group1, request.groupId());
+ assertNull(request.partitions());
+ assertTrue(request.isAllPartitions());
+ if (version < 7) {
+ assertFalse(request.requireStable());
+ } else {
+ assertTrue(request.requireStable());
+ }
}
+ } else {
+ builder = new Builder(Collections.singletonMap(group1, null),
true, false);
+ OffsetFetchRequest request = builder.build(version);
+ Map<String, List<TopicPartition>> groupToPartitionMap =
+ request.groupIdsToPartitions();
+ Map<String, List<OffsetFetchRequestTopics>> groupToTopicMap =
+ request.groupIdsToTopics();
+ assertTrue(groupToPartitionMap.containsKey(group1) &&
groupToTopicMap.containsKey(
+ group1));
+ assertNull(groupToPartitionMap.get(group1));
+ assertTrue(request.isAllPartitionsForGroup(group1));
+ assertTrue(request.requireStable());
}
}
}
@Test
public void testBuildThrowForUnsupportedRequireStable() {
- for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
- builder = new OffsetFetchRequest.Builder(groupId, true, null,
true);
+ for (int version : listOfVersionsNonBatchOffsetFetch) {
+ builder = new OffsetFetchRequest.Builder(group1, true, null, true);
if (version < 7) {
- final short finalVersion = version;
+ final short finalVersion = (short) version;
assertThrows(UnsupportedVersionException.class, () ->
builder.build(finalVersion));
} else {
- OffsetFetchRequest request = builder.build(version);
+ OffsetFetchRequest request = builder.build((short) version);
assertTrue(request.requireStable());
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
index e202fc2..c73ea2a 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
@@ -18,8 +18,11 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.OffsetFetchResponseData;
+import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
+import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartitions;
import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopic;
+import
org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
@@ -44,12 +47,19 @@ public class OffsetFetchResponseTest {
private final int offset = 100;
private final String metadata = "metadata";
+ private final String groupOne = "group1";
+ private final String groupTwo = "group2";
+ private final String groupThree = "group3";
private final String topicOne = "topic1";
private final int partitionOne = 1;
private final Optional<Integer> leaderEpochOne = Optional.of(1);
private final String topicTwo = "topic2";
private final int partitionTwo = 2;
private final Optional<Integer> leaderEpochTwo = Optional.of(2);
+ private final String topicThree = "topic3";
+ private final int partitionThree = 3;
+ private final Optional<Integer> leaderEpochThree = Optional.of(3);
+
private Map<TopicPartition, PartitionData> partitionDataMap;
@@ -72,99 +82,228 @@ public class OffsetFetchResponseTest {
@Test
public void testConstructor() {
- OffsetFetchResponse response = new OffsetFetchResponse(throttleTimeMs,
Errors.NOT_COORDINATOR, partitionDataMap);
- assertEquals(Errors.NOT_COORDINATOR, response.error());
- assertEquals(3, response.errorCounts().size());
- assertEquals(Utils.mkMap(Utils.mkEntry(Errors.NOT_COORDINATOR, 1),
- Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1),
- Utils.mkEntry(Errors.UNKNOWN_TOPIC_OR_PARTITION, 1)),
- response.errorCounts());
-
- assertEquals(throttleTimeMs, response.throttleTimeMs());
-
- Map<TopicPartition, PartitionData> responseData =
response.responseData();
- assertEquals(partitionDataMap, responseData);
- responseData.forEach(
- (tp, data) -> assertTrue(data.hasError())
- );
+ for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+ if (version < 8) {
+ OffsetFetchResponse response = new
OffsetFetchResponse(throttleTimeMs, Errors.NOT_COORDINATOR, partitionDataMap);
+ assertEquals(Errors.NOT_COORDINATOR, response.error());
+ assertEquals(3, response.errorCounts().size());
+ assertEquals(Utils.mkMap(Utils.mkEntry(Errors.NOT_COORDINATOR,
1),
+ Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1),
+ Utils.mkEntry(Errors.UNKNOWN_TOPIC_OR_PARTITION, 1)),
+ response.errorCounts());
+
+ assertEquals(throttleTimeMs, response.throttleTimeMs());
+
+ Map<TopicPartition, PartitionData> responseData =
response.responseDataV0ToV7();
+ assertEquals(partitionDataMap, responseData);
+ responseData.forEach((tp, data) ->
assertTrue(data.hasError()));
+ } else {
+ OffsetFetchResponse response = new OffsetFetchResponse(
+ throttleTimeMs,
+ Collections.singletonMap(groupOne, Errors.NOT_COORDINATOR),
+ Collections.singletonMap(groupOne, partitionDataMap));
+ assertEquals(Errors.NOT_COORDINATOR,
response.groupLevelError(groupOne));
+ assertEquals(3, response.errorCounts().size());
+ assertEquals(Utils.mkMap(Utils.mkEntry(Errors.NOT_COORDINATOR,
1),
+ Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1),
+ Utils.mkEntry(Errors.UNKNOWN_TOPIC_OR_PARTITION, 1)),
+ response.errorCounts());
+
+ assertEquals(throttleTimeMs, response.throttleTimeMs());
+
+ Map<TopicPartition, PartitionData> responseData =
response.partitionDataMap(groupOne);
+ assertEquals(partitionDataMap, responseData);
+ responseData.forEach((tp, data) ->
assertTrue(data.hasError()));
+ }
+ }
}
- /**
- * Test behavior changes over the versions. Refer to
resources.common.messages.OffsetFetchResponse.json
- */
@Test
- public void testStructBuild() {
- partitionDataMap.put(new TopicPartition(topicTwo, partitionTwo), new
PartitionData(
+ public void testConstructorWithMultipleGroups() {
+ Map<String, Map<TopicPartition, PartitionData>> responseData = new
HashMap<>();
+ Map<String, Errors> errorMap = new HashMap<>();
+ Map<TopicPartition, PartitionData> pd1 = new HashMap<>();
+ Map<TopicPartition, PartitionData> pd2 = new HashMap<>();
+ Map<TopicPartition, PartitionData> pd3 = new HashMap<>();
+ pd1.put(new TopicPartition(topicOne, partitionOne), new PartitionData(
+ offset,
+ leaderEpochOne,
+ metadata,
+ Errors.TOPIC_AUTHORIZATION_FAILED));
+ pd2.put(new TopicPartition(topicTwo, partitionTwo), new PartitionData(
offset,
leaderEpochTwo,
metadata,
- Errors.GROUP_AUTHORIZATION_FAILED
- ));
+ Errors.UNKNOWN_TOPIC_OR_PARTITION));
+ pd3.put(new TopicPartition(topicThree, partitionThree), new
PartitionData(
+ offset,
+ leaderEpochThree,
+ metadata,
+ Errors.NONE));
+ responseData.put(groupOne, pd1);
+ responseData.put(groupTwo, pd2);
+ responseData.put(groupThree, pd3);
+ errorMap.put(groupOne, Errors.NOT_COORDINATOR);
+ errorMap.put(groupTwo, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+ errorMap.put(groupThree, Errors.NONE);
+ for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+ if (version >= 8) {
+ OffsetFetchResponse response = new OffsetFetchResponse(
+ throttleTimeMs, errorMap, responseData);
+
+ assertEquals(Errors.NOT_COORDINATOR,
response.groupLevelError(groupOne));
+ assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS,
response.groupLevelError(groupTwo));
+ assertEquals(Errors.NONE,
response.groupLevelError(groupThree));
+ assertTrue(response.groupHasError(groupOne));
+ assertTrue(response.groupHasError(groupTwo));
+ assertFalse(response.groupHasError(groupThree));
+ assertEquals(5, response.errorCounts().size());
+ assertEquals(Utils.mkMap(Utils.mkEntry(Errors.NOT_COORDINATOR,
1),
+ Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1),
+ Utils.mkEntry(Errors.UNKNOWN_TOPIC_OR_PARTITION, 1),
+ Utils.mkEntry(Errors.COORDINATOR_LOAD_IN_PROGRESS, 1),
+ Utils.mkEntry(Errors.NONE, 2)),
+ response.errorCounts());
- OffsetFetchResponse latestResponse = new
OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap);
+ assertEquals(throttleTimeMs, response.throttleTimeMs());
+ Map<TopicPartition, PartitionData> responseData1 =
response.partitionDataMap(groupOne);
+ assertEquals(pd1, responseData1);
+ responseData1.forEach((tp, data) ->
assertTrue(data.hasError()));
+ Map<TopicPartition, PartitionData> responseData2 =
response.partitionDataMap(groupTwo);
+ assertEquals(pd2, responseData2);
+ responseData2.forEach((tp, data) ->
assertTrue(data.hasError()));
+ Map<TopicPartition, PartitionData> responseData3 =
response.partitionDataMap(groupThree);
+ assertEquals(pd3, responseData3);
+ responseData3.forEach((tp, data) ->
assertFalse(data.hasError()));
+ }
+ }
+ }
+
+ /**
+ * Test behavior changes over the versions. Refer to
resources.common.messages.OffsetFetchResponse.json
+ */
+ @Test
+ public void testStructBuild() {
for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
- OffsetFetchResponseData data = new OffsetFetchResponseData(
- new ByteBufferAccessor(latestResponse.serialize(version)),
version);
+ if (version < 8) {
+ partitionDataMap.put(new TopicPartition(topicTwo,
partitionTwo), new PartitionData(
+ offset,
+ leaderEpochTwo,
+ metadata,
+ Errors.GROUP_AUTHORIZATION_FAILED
+ ));
- OffsetFetchResponse oldResponse = new OffsetFetchResponse(data,
version);
+ OffsetFetchResponse latestResponse = new
OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap);
+ OffsetFetchResponseData data = new OffsetFetchResponseData(
+ new ByteBufferAccessor(latestResponse.serialize(version)),
version);
- if (version <= 1) {
- assertEquals(Errors.NONE.code(), data.errorCode());
+ OffsetFetchResponse oldResponse = new
OffsetFetchResponse(data, version);
- // Partition level error populated in older versions.
- assertEquals(Errors.GROUP_AUTHORIZATION_FAILED,
oldResponse.error());
-
assertEquals(Utils.mkMap(Utils.mkEntry(Errors.GROUP_AUTHORIZATION_FAILED, 2),
- Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1)),
oldResponse.errorCounts());
+ if (version <= 1) {
+ assertEquals(Errors.NONE.code(), data.errorCode());
- } else {
- assertEquals(Errors.NONE.code(), data.errorCode());
+ // Partition level error populated in older versions.
+ assertEquals(Errors.GROUP_AUTHORIZATION_FAILED,
oldResponse.error());
+
assertEquals(Utils.mkMap(Utils.mkEntry(Errors.GROUP_AUTHORIZATION_FAILED, 2),
+ Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1)),
+ oldResponse.errorCounts());
+ } else {
+ assertEquals(Errors.NONE.code(), data.errorCode());
- assertEquals(Errors.NONE, oldResponse.error());
- assertEquals(Utils.mkMap(
+ assertEquals(Errors.NONE, oldResponse.error());
+ assertEquals(Utils.mkMap(
Utils.mkEntry(Errors.NONE, 1),
Utils.mkEntry(Errors.GROUP_AUTHORIZATION_FAILED, 1),
- Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1)),
oldResponse.errorCounts());
- }
+ Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1)),
+ oldResponse.errorCounts());
+ }
+
+ if (version <= 2) {
+ assertEquals(DEFAULT_THROTTLE_TIME,
oldResponse.throttleTimeMs());
+ } else {
+ assertEquals(throttleTimeMs, oldResponse.throttleTimeMs());
+ }
- if (version <= 2) {
- assertEquals(DEFAULT_THROTTLE_TIME,
oldResponse.throttleTimeMs());
+ Map<TopicPartition, PartitionData> expectedDataMap = new
HashMap<>();
+ for (Map.Entry<TopicPartition, PartitionData> entry :
partitionDataMap.entrySet()) {
+ PartitionData partitionData = entry.getValue();
+ expectedDataMap.put(entry.getKey(), new PartitionData(
+ partitionData.offset,
+ version <= 4 ? Optional.empty() :
partitionData.leaderEpoch,
+ partitionData.metadata,
+ partitionData.error
+ ));
+ }
+
+ Map<TopicPartition, PartitionData> responseData =
oldResponse.responseDataV0ToV7();
+ assertEquals(expectedDataMap, responseData);
+
+ responseData.forEach((tp, rdata) ->
assertTrue(rdata.hasError()));
} else {
+ partitionDataMap.put(new TopicPartition(topicTwo,
partitionTwo), new PartitionData(
+ offset,
+ leaderEpochTwo,
+ metadata,
+ Errors.GROUP_AUTHORIZATION_FAILED));
+ OffsetFetchResponse latestResponse = new OffsetFetchResponse(
+ throttleTimeMs,
+ Collections.singletonMap(groupOne, Errors.NONE),
+ Collections.singletonMap(groupOne, partitionDataMap));
+ OffsetFetchResponseData data = new OffsetFetchResponseData(
+ new ByteBufferAccessor(latestResponse.serialize(version)),
version);
+ OffsetFetchResponse oldResponse = new
OffsetFetchResponse(data, version);
+ assertEquals(Errors.NONE.code(),
data.groups().get(0).errorCode());
+
+ assertEquals(Errors.NONE,
oldResponse.groupLevelError(groupOne));
+ assertEquals(Utils.mkMap(
+ Utils.mkEntry(Errors.NONE, 1),
+ Utils.mkEntry(Errors.GROUP_AUTHORIZATION_FAILED, 1),
+ Utils.mkEntry(Errors.TOPIC_AUTHORIZATION_FAILED, 1)),
+ oldResponse.errorCounts());
assertEquals(throttleTimeMs, oldResponse.throttleTimeMs());
- }
- Map<TopicPartition, PartitionData> expectedDataMap = new
HashMap<>();
- for (Map.Entry<TopicPartition, PartitionData> entry :
partitionDataMap.entrySet()) {
- PartitionData partitionData = entry.getValue();
- expectedDataMap.put(entry.getKey(), new PartitionData(
- partitionData.offset,
- version <= 4 ? Optional.empty() :
partitionData.leaderEpoch,
- partitionData.metadata,
- partitionData.error
- ));
- }
+ Map<TopicPartition, PartitionData> expectedDataMap = new
HashMap<>();
+ for (Map.Entry<TopicPartition, PartitionData> entry :
partitionDataMap.entrySet()) {
+ PartitionData partitionData = entry.getValue();
+ expectedDataMap.put(entry.getKey(), new PartitionData(
+ partitionData.offset,
+ partitionData.leaderEpoch,
+ partitionData.metadata,
+ partitionData.error
+ ));
+ }
- Map<TopicPartition, PartitionData> responseData =
oldResponse.responseData();
- assertEquals(expectedDataMap, responseData);
+ Map<TopicPartition, PartitionData> responseData =
oldResponse.partitionDataMap(groupOne);
+ assertEquals(expectedDataMap, responseData);
- responseData.forEach((tp, rdata) -> assertTrue(rdata.hasError()));
+ responseData.forEach((tp, rdata) ->
assertTrue(rdata.hasError()));
+ }
}
}
@Test
public void testShouldThrottle() {
- OffsetFetchResponse response = new OffsetFetchResponse(throttleTimeMs,
Errors.NONE, partitionDataMap);
for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
- if (version >= 4) {
- assertTrue(response.shouldClientThrottle(version));
+ if (version < 8) {
+ OffsetFetchResponse response = new
OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap);
+ if (version >= 4) {
+ assertTrue(response.shouldClientThrottle(version));
+ } else {
+ assertFalse(response.shouldClientThrottle(version));
+ }
} else {
- assertFalse(response.shouldClientThrottle(version));
+ OffsetFetchResponse response = new OffsetFetchResponse(
+ throttleTimeMs,
+ Collections.singletonMap(groupOne, Errors.NOT_COORDINATOR),
+ Collections.singletonMap(groupOne, partitionDataMap));
+ assertTrue(response.shouldClientThrottle(version));
}
}
}
@Test
- public void testNullableMetadata() {
+ public void testNullableMetadataV0ToV7() {
PartitionData pd = new PartitionData(
offset,
leaderEpochOne,
@@ -196,7 +335,43 @@ public class OffsetFetchResponseTest {
}
@Test
- public void testUseDefaultLeaderEpoch() {
+ public void testNullableMetadataV8AndAbove() {
+ PartitionData pd = new PartitionData(
+ offset,
+ leaderEpochOne,
+ null,
+ Errors.UNKNOWN_TOPIC_OR_PARTITION);
+ // test PartitionData.equals with null metadata
+ assertEquals(pd, pd);
+ partitionDataMap.clear();
+ partitionDataMap.put(new TopicPartition(topicOne, partitionOne), pd);
+
+ OffsetFetchResponse response = new OffsetFetchResponse(
+ throttleTimeMs,
+ Collections.singletonMap(groupOne,
Errors.GROUP_AUTHORIZATION_FAILED),
+ Collections.singletonMap(groupOne, partitionDataMap));
+ OffsetFetchResponseData expectedData =
+ new OffsetFetchResponseData()
+ .setGroups(Collections.singletonList(
+ new OffsetFetchResponseGroup()
+ .setGroupId(groupOne)
+ .setTopics(Collections.singletonList(
+ new OffsetFetchResponseTopics()
+ .setName(topicOne)
+ .setPartitions(Collections.singletonList(
+ new OffsetFetchResponsePartitions()
+ .setPartitionIndex(partitionOne)
+ .setCommittedOffset(offset)
+
.setCommittedLeaderEpoch(leaderEpochOne.orElse(-1))
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+ .setMetadata(null)))))
+
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())))
+ .setThrottleTimeMs(throttleTimeMs);
+ assertEquals(expectedData, response.data());
+ }
+
+ @Test
+ public void testUseDefaultLeaderEpochV0ToV7() {
final Optional<Integer> emptyLeaderEpoch = Optional.empty();
partitionDataMap.clear();
@@ -227,4 +402,40 @@ public class OffsetFetchResponseTest {
);
assertEquals(expectedData, response.data());
}
+
+ @Test
+ public void testUseDefaultLeaderEpochV8() {
+ final Optional<Integer> emptyLeaderEpoch = Optional.empty();
+ partitionDataMap.clear();
+
+ partitionDataMap.put(new TopicPartition(topicOne, partitionOne),
+ new PartitionData(
+ offset,
+ emptyLeaderEpoch,
+ metadata,
+ Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ );
+ OffsetFetchResponse response = new OffsetFetchResponse(
+ throttleTimeMs,
+ Collections.singletonMap(groupOne, Errors.NOT_COORDINATOR),
+ Collections.singletonMap(groupOne, partitionDataMap));
+ OffsetFetchResponseData expectedData =
+ new OffsetFetchResponseData()
+ .setGroups(Collections.singletonList(
+ new OffsetFetchResponseGroup()
+ .setGroupId(groupOne)
+ .setTopics(Collections.singletonList(
+ new OffsetFetchResponseTopics()
+ .setName(topicOne)
+ .setPartitions(Collections.singletonList(
+ new OffsetFetchResponsePartitions()
+ .setPartitionIndex(partitionOne)
+ .setCommittedOffset(offset)
+
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+ .setMetadata(metadata)))))
+ .setErrorCode(Errors.NOT_COORDINATOR.code())))
+ .setThrottleTimeMs(throttleTimeMs);
+ assertEquals(expectedData, response.data());
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 4d8e69e..887d15c 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -233,6 +233,7 @@ import static
org.apache.kafka.common.protocol.ApiKeys.JOIN_GROUP;
import static org.apache.kafka.common.protocol.ApiKeys.LEADER_AND_ISR;
import static org.apache.kafka.common.protocol.ApiKeys.LIST_GROUPS;
import static org.apache.kafka.common.protocol.ApiKeys.LIST_OFFSETS;
+import static org.apache.kafka.common.protocol.ApiKeys.OFFSET_FETCH;
import static org.apache.kafka.common.protocol.ApiKeys.STOP_REPLICA;
import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP;
import static
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
@@ -311,21 +312,30 @@ public class RequestResponseTest {
checkErrorResponse(createMetadataRequest(3,
Collections.singletonList("topic1")), unknownServerException, true);
checkResponse(createMetadataResponse(), 4, true);
checkErrorResponse(createMetadataRequest(4,
Collections.singletonList("topic1")), unknownServerException, true);
- checkRequest(createOffsetFetchRequestForAllPartition("group1", false),
true);
- checkRequest(createOffsetFetchRequestForAllPartition("group1", true),
true);
- checkErrorResponse(createOffsetFetchRequestForAllPartition("group1",
false), new NotCoordinatorException("Not Coordinator"), true);
- checkErrorResponse(createOffsetFetchRequestForAllPartition("group1",
true), new NotCoordinatorException("Not Coordinator"), true);
checkRequest(createOffsetFetchRequest(0, false), true);
checkRequest(createOffsetFetchRequest(1, false), true);
checkRequest(createOffsetFetchRequest(2, false), true);
checkRequest(createOffsetFetchRequest(7, true), true);
- checkRequest(createOffsetFetchRequestForAllPartition("group1", false),
true);
- checkRequest(createOffsetFetchRequestForAllPartition("group1", true),
true);
+ checkRequest(createOffsetFetchRequest(8, true), true);
+ checkRequest(createOffsetFetchRequestWithMultipleGroups(8, true),
true);
+ checkRequest(createOffsetFetchRequestWithMultipleGroups(8, false),
true);
+ checkRequest(createOffsetFetchRequestForAllPartition(7, true), true);
+ checkRequest(createOffsetFetchRequestForAllPartition(8, true), true);
checkErrorResponse(createOffsetFetchRequest(0, false),
unknownServerException, true);
checkErrorResponse(createOffsetFetchRequest(1, false),
unknownServerException, true);
checkErrorResponse(createOffsetFetchRequest(2, false),
unknownServerException, true);
checkErrorResponse(createOffsetFetchRequest(7, true),
unknownServerException, true);
- checkResponse(createOffsetFetchResponse(), 0, true);
+ checkErrorResponse(createOffsetFetchRequest(8, true),
unknownServerException, true);
+ checkErrorResponse(createOffsetFetchRequestWithMultipleGroups(8,
true), unknownServerException, true);
+ checkErrorResponse(createOffsetFetchRequestForAllPartition(7, true),
+ new NotCoordinatorException("Not Coordinator"), true);
+ checkErrorResponse(createOffsetFetchRequestForAllPartition(8, true),
+ new NotCoordinatorException("Not Coordinator"), true);
+ checkErrorResponse(createOffsetFetchRequestWithMultipleGroups(8, true),
+ new NotCoordinatorException("Not Coordinator"), true);
+ checkResponse(createOffsetFetchResponse(0), 0, true);
+ checkResponse(createOffsetFetchResponse(7), 7, true);
+ checkResponse(createOffsetFetchResponse(8), 8, true);
checkRequest(createProduceRequest(2), true);
checkErrorResponse(createProduceRequest(2), unknownServerException,
true);
checkRequest(createProduceRequest(3), true);
@@ -1051,18 +1061,51 @@ public class RequestResponseTest {
}
@Test
- public void testOffsetFetchRequestBuilderToString() {
+ public void testOffsetFetchRequestBuilderToStringV0ToV7() {
List<Boolean> stableFlags = Arrays.asList(true, false);
for (Boolean requireStable : stableFlags) {
- String allTopicPartitionsString = new
OffsetFetchRequest.Builder("someGroup", requireStable, null, false).toString();
-
- assertTrue(allTopicPartitionsString.contains("groupId='someGroup',
topics=null, requireStable="
- +
requireStable.toString()));
+ String allTopicPartitionsString = new
OffsetFetchRequest.Builder("someGroup",
+ requireStable,
+ null,
+ false)
+ .toString();
+
+ assertTrue(allTopicPartitionsString.contains("groupId='someGroup',
topics=null,"
+ + " groups=[], requireStable=" + requireStable));
String string = new OffsetFetchRequest.Builder("group1",
- requireStable, Collections.singletonList(new
TopicPartition("test11", 1)), false).toString();
+ requireStable,
+ Collections.singletonList(
+ new TopicPartition("test11", 1)),
+ false)
+ .toString();
assertTrue(string.contains("test11"));
assertTrue(string.contains("group1"));
- assertTrue(string.contains("requireStable=" +
requireStable.toString()));
+ assertTrue(string.contains("requireStable=" + requireStable));
+ }
+ }
+
+ @Test
+ public void testOffsetFetchRequestBuilderToStringV8AndAbove() {
+ List<Boolean> stableFlags = Arrays.asList(true, false);
+ for (Boolean requireStable : stableFlags) {
+ String allTopicPartitionsString = new OffsetFetchRequest.Builder(
+ Collections.singletonMap("someGroup", null),
+ requireStable,
+ false)
+ .toString();
+
assertTrue(allTopicPartitionsString.contains("groups=[OffsetFetchRequestGroup"
+ + "(groupId='someGroup', topics=null)], requireStable=" +
requireStable));
+
+ String subsetTopicPartitionsString = new
OffsetFetchRequest.Builder(
+ Collections.singletonMap(
+ "group1",
+ Collections.singletonList(new TopicPartition("test11",
1))),
+ requireStable,
+ false)
+ .toString();
+ assertTrue(subsetTopicPartitionsString.contains("test11"));
+ assertTrue(subsetTopicPartitionsString.contains("group1"));
+ assertTrue(subsetTopicPartitionsString.contains("requireStable=" +
requireStable));
}
}
@@ -1603,21 +1646,81 @@ public class RequestResponseTest {
}
private OffsetFetchRequest createOffsetFetchRequest(int version, boolean
requireStable) {
- return new OffsetFetchRequest.Builder("group1", requireStable,
Collections.singletonList(new TopicPartition("test11", 1)), false)
+ if (version < 8) {
+ return new OffsetFetchRequest.Builder(
+ "group1",
+ requireStable,
+ Collections.singletonList(new TopicPartition("test11", 1)),
+ false)
.build((short) version);
+ }
+ return new OffsetFetchRequest.Builder(
+ Collections.singletonMap(
+ "group1",
+ Collections.singletonList(new TopicPartition("test11", 1))),
+ requireStable,
+ false)
+ .build((short) version);
+ }
+
+ private OffsetFetchRequest createOffsetFetchRequestWithMultipleGroups(int
version,
+ boolean requireStable) {
+ Map<String, List<TopicPartition>> groupToPartitionMap = new
HashMap<>();
+ List<TopicPartition> topic1 = singletonList(
+ new TopicPartition("topic1", 0));
+ List<TopicPartition> topic2 = Arrays.asList(
+ new TopicPartition("topic1", 0),
+ new TopicPartition("topic2", 0),
+ new TopicPartition("topic2", 1));
+ List<TopicPartition> topic3 = Arrays.asList(
+ new TopicPartition("topic1", 0),
+ new TopicPartition("topic2", 0),
+ new TopicPartition("topic2", 1),
+ new TopicPartition("topic3", 0),
+ new TopicPartition("topic3", 1),
+ new TopicPartition("topic3", 2));
+ groupToPartitionMap.put("group1", topic1);
+ groupToPartitionMap.put("group2", topic2);
+ groupToPartitionMap.put("group3", topic3);
+ groupToPartitionMap.put("group4", null);
+ groupToPartitionMap.put("group5", null);
+
+ return new OffsetFetchRequest.Builder(
+ groupToPartitionMap,
+ requireStable,
+ false
+ ).build((short) version);
}
- private OffsetFetchRequest createOffsetFetchRequestForAllPartition(String
groupId, boolean requireStable) {
- return new OffsetFetchRequest.Builder(groupId, requireStable, null,
false).build();
+ private OffsetFetchRequest createOffsetFetchRequestForAllPartition(int
version, boolean requireStable) {
+ if (version < 8) {
+ return new OffsetFetchRequest.Builder(
+ "group1",
+ requireStable,
+ null,
+ false)
+ .build((short) version);
+ }
+ return new OffsetFetchRequest.Builder(
+ Collections.singletonMap(
+ "group1", null),
+ requireStable,
+ false)
+ .build((short) version);
}
- private OffsetFetchResponse createOffsetFetchResponse() {
+ private OffsetFetchResponse createOffsetFetchResponse(int version) {
Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData =
new HashMap<>();
responseData.put(new TopicPartition("test", 0), new
OffsetFetchResponse.PartitionData(
- 100L, Optional.empty(), "", Errors.NONE));
+ 100L, Optional.empty(), "", Errors.NONE));
responseData.put(new TopicPartition("test", 1), new
OffsetFetchResponse.PartitionData(
- 100L, Optional.of(10), null, Errors.NONE));
- return new OffsetFetchResponse(Errors.NONE, responseData);
+ 100L, Optional.of(10), null, Errors.NONE));
+ if (version < 8) {
+ return new OffsetFetchResponse(Errors.NONE, responseData);
+ }
+ int throttleMs = 10;
+ return new OffsetFetchResponse(throttleMs,
Collections.singletonMap("group1", Errors.NONE),
+ Collections.singletonMap("group1", responseData));
}
@SuppressWarnings("deprecation")
@@ -2864,7 +2967,7 @@ public class RequestResponseTest {
assertEquals(Integer.valueOf(3),
createMetadataResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(1),
createOffsetCommitResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(2),
createOffsetDeleteResponse().errorCounts().get(Errors.NONE));
- assertEquals(Integer.valueOf(3),
createOffsetFetchResponse().errorCounts().get(Errors.NONE));
+ assertEquals(Integer.valueOf(3),
createOffsetFetchResponse(OFFSET_FETCH.latestVersion()).errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(1),
createProduceResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(1),
createRenewTokenResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(1),
createSaslAuthenticateResponse().errorCounts().get(Errors.NONE));
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7f7b506..6bf1a0e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -61,6 +61,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
@@ -415,7 +416,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new OffsetCommitResponse(requestThrottleMs,
combinedCommitStatus.asJava))
}
- // reject the request if not authorized to the group
+ // reject the request if not authorized to the group
if (!authHelper.authorize(request.context, READ, GROUP,
offsetCommitRequest.data.groupId)) {
val error = Errors.GROUP_AUTHORIZATION_FAILED
val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
@@ -1254,74 +1255,102 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle an offset fetch request
*/
def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = {
+ val version = request.header.apiVersion
+ if (version == 0) {
+ // reading offsets from ZK
+ handleOffsetFetchRequestV0(request)
+ } else if (version >= 1 && version <= 7) {
+ // reading offsets from Kafka
+ handleOffsetFetchRequestBetweenV1AndV7(request)
+ } else {
+ // batching offset reads for multiple groups starts with version 8 and
greater
+ handleOffsetFetchRequestV8AndAbove(request)
+ }
+ }
+
+ private def handleOffsetFetchRequestV0(request: RequestChannel.Request):
Unit = {
val header = request.header
val offsetFetchRequest = request.body[OffsetFetchRequest]
- def partitionByAuthorized(seq: Seq[TopicPartition]): (Seq[TopicPartition],
Seq[TopicPartition]) =
- authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC,
seq)(_.topic)
-
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val offsetFetchResponse =
- // reject the request if not authorized to the group
+ // reject the request if not authorized to the group
if (!authHelper.authorize(request.context, DESCRIBE, GROUP,
offsetFetchRequest.groupId))
offsetFetchRequest.getErrorResponse(requestThrottleMs,
Errors.GROUP_AUTHORIZATION_FAILED)
else {
- if (header.apiVersion == 0) {
- val zkSupport =
metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset fetch
requests"))
- val (authorizedPartitions, unauthorizedPartitions) =
partitionByAuthorized(
- offsetFetchRequest.partitions.asScala)
-
- // version 0 reads offsets from ZK
- val authorizedPartitionData = authorizedPartitions.map {
topicPartition =>
- try {
- if (!metadataCache.contains(topicPartition))
- (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
- else {
- val payloadOpt =
zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
- payloadOpt match {
- case Some(payload) =>
- (topicPartition, new
OffsetFetchResponse.PartitionData(payload.toLong,
- Optional.empty(), OffsetFetchResponse.NO_METADATA,
Errors.NONE))
- case None =>
- (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
- }
- }
- } catch {
- case e: Throwable =>
- (topicPartition, new
OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
- Optional.empty(), OffsetFetchResponse.NO_METADATA,
Errors.forException(e)))
- }
- }.toMap
+ val zkSupport =
metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset fetch
requests"))
+ val (authorizedPartitions, unauthorizedPartitions) =
partitionByAuthorized(
+ offsetFetchRequest.partitions.asScala, request.context)
- val unauthorizedPartitionData = unauthorizedPartitions.map(_ ->
OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
- new OffsetFetchResponse(requestThrottleMs, Errors.NONE,
(authorizedPartitionData ++ unauthorizedPartitionData).asJava)
- } else {
- // versions 1 and above read offsets from Kafka
- if (offsetFetchRequest.isAllPartitions) {
- val (error, allPartitionData) =
groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
offsetFetchRequest.requireStable)
- if (error != Errors.NONE)
- offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
- else {
- // clients are not allowed to see offsets for topics that are
not authorized for Describe
- val (authorizedPartitionData, _) =
authHelper.partitionMapByAuthorized(request.context,
- DESCRIBE, TOPIC, allPartitionData)(_.topic)
- new OffsetFetchResponse(requestThrottleMs, Errors.NONE,
authorizedPartitionData.asJava)
- }
- } else {
- val (authorizedPartitions, unauthorizedPartitions) =
partitionByAuthorized(
- offsetFetchRequest.partitions.asScala)
- val (error, authorizedPartitionData) =
groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
- offsetFetchRequest.requireStable, Some(authorizedPartitions))
- if (error != Errors.NONE)
- offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
+ // version 0 reads offsets from ZK
+ val authorizedPartitionData = authorizedPartitions.map {
topicPartition =>
+ try {
+ if (!metadataCache.contains(topicPartition))
+ (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
else {
- val unauthorizedPartitionData = unauthorizedPartitions.map(_
-> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
- new OffsetFetchResponse(requestThrottleMs, Errors.NONE,
(authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+ val payloadOpt =
zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
+ payloadOpt match {
+ case Some(payload) =>
+ (topicPartition, new
OffsetFetchResponse.PartitionData(payload.toLong,
+ Optional.empty(), OffsetFetchResponse.NO_METADATA,
Errors.NONE))
+ case None =>
+ (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
+ }
}
+ } catch {
+ case e: Throwable =>
+ (topicPartition, new
OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+ Optional.empty(), OffsetFetchResponse.NO_METADATA,
Errors.forException(e)))
}
- }
+ }.toMap
+
+ val unauthorizedPartitionData = unauthorizedPartitions.map(_ ->
OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
+ new OffsetFetchResponse(requestThrottleMs, Errors.NONE,
(authorizedPartitionData ++ unauthorizedPartitionData).asJava)
}
+ trace(s"Sending offset fetch response $offsetFetchResponse for
correlation id ${header.correlationId} to client ${header.clientId}.")
+ offsetFetchResponse
+ }
+ requestHelper.sendResponseMaybeThrottle(request, createResponse)
+ }
+ private def handleOffsetFetchRequestBetweenV1AndV7(request:
RequestChannel.Request): Unit = {
+ val header = request.header
+ val offsetFetchRequest = request.body[OffsetFetchRequest]
+ val groupId = offsetFetchRequest.groupId()
+ val (error, partitionData) = fetchOffsets(groupId,
offsetFetchRequest.isAllPartitions,
+ offsetFetchRequest.requireStable, offsetFetchRequest.partitions,
request.context)
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
+ val offsetFetchResponse =
+ if (error != Errors.NONE) {
+ offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
+ } else {
+ new OffsetFetchResponse(requestThrottleMs, Errors.NONE,
partitionData.asJava)
+ }
+ trace(s"Sending offset fetch response $offsetFetchResponse for
correlation id ${header.correlationId} to client ${header.clientId}.")
+ offsetFetchResponse
+ }
+ requestHelper.sendResponseMaybeThrottle(request, createResponse)
+ }
+
+ private def handleOffsetFetchRequestV8AndAbove(request:
RequestChannel.Request): Unit = {
+ val header = request.header
+ val offsetFetchRequest = request.body[OffsetFetchRequest]
+ val groupIds = offsetFetchRequest.groupIds().asScala
+ val groupToErrorMap = mutable.Map.empty[String, Errors]
+ val groupToPartitionData = mutable.Map.empty[String,
util.Map[TopicPartition, PartitionData]]
+ val groupToTopicPartitions = offsetFetchRequest.groupIdsToPartitions()
+ groupIds.foreach(g => {
+ val (error, partitionData) = fetchOffsets(g,
+ offsetFetchRequest.isAllPartitionsForGroup(g),
+ offsetFetchRequest.requireStable(),
+ groupToTopicPartitions.get(g), request.context)
+ groupToErrorMap += (g -> error)
+ groupToPartitionData += (g -> partitionData.asJava)
+ })
+
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
+ val offsetFetchResponse = new OffsetFetchResponse(requestThrottleMs,
+ groupToErrorMap.asJava, groupToPartitionData.asJava)
trace(s"Sending offset fetch response $offsetFetchResponse for
correlation id ${header.correlationId} to client ${header.clientId}.")
offsetFetchResponse
}
@@ -1329,6 +1358,40 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, createResponse)
}
+ private def fetchOffsets(groupId: String, isAllPartitions: Boolean,
requireStable: Boolean,
+ partitions: util.List[TopicPartition], context:
RequestContext): (Errors, Map[TopicPartition,
OffsetFetchResponse.PartitionData]) = {
+ if (!authHelper.authorize(context, DESCRIBE, GROUP, groupId)) {
+ (Errors.GROUP_AUTHORIZATION_FAILED, Map.empty)
+ } else {
+ if (isAllPartitions) {
+ val (error, allPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable)
+ if (error != Errors.NONE) {
+ (error, allPartitionData)
+ } else {
+ // clients are not allowed to see offsets for topics that are not
authorized for Describe
+ val (authorizedPartitionData, _) =
authHelper.partitionMapByAuthorized(context,
+ DESCRIBE, TOPIC, allPartitionData)(_.topic)
+ (Errors.NONE, authorizedPartitionData)
+ }
+ } else {
+ val (authorizedPartitions, unauthorizedPartitions) =
partitionByAuthorized(
+ partitions.asScala, context)
+ val (error, authorizedPartitionData) =
groupCoordinator.handleFetchOffsets(groupId,
+ requireStable, Some(authorizedPartitions))
+ if (error != Errors.NONE) {
+ (error, authorizedPartitionData)
+ } else {
+ val unauthorizedPartitionData = unauthorizedPartitions.map(_ ->
OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
+ (Errors.NONE, authorizedPartitionData ++ unauthorizedPartitionData)
+ }
+ }
+ }
+ }
+
+ private def partitionByAuthorized(seq: Seq[TopicPartition], context:
RequestContext):
+ (Seq[TopicPartition], Seq[TopicPartition]) =
+ authHelper.partitionSeqByAuthorized(context, DESCRIBE, TOPIC, seq)(_.topic)
+
def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
val version = request.header.apiVersion
if (version < 4) {
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index bbdb94b..fd5f12c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -49,6 +49,7 @@ import
org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartiti
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords,
RecordBatch, SimpleRecord}
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.resource.ResourceType._
@@ -62,6 +63,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+import java.util.Collections.singletonList
import scala.annotation.nowarn
import scala.collection.mutable
import scala.collection.mutable.Buffer
@@ -175,7 +177,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}),
ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) =>
Errors.forCode(
resp.data.topics().get(0).partitions().get(0).errorCode)),
- ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) =>
resp.error),
+ ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) =>
resp.groupLevelError(group)),
ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => {
Errors.forCode(resp.data.coordinators.asScala.find(g => group ==
g.key).head.errorCode)
}),
@@ -378,10 +380,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
OffsetsForLeaderEpochRequest.Builder.forConsumer(epochs).build()
}
- private def createOffsetFetchRequest = {
+ private def createOffsetFetchRequest: OffsetFetchRequest = {
new requests.OffsetFetchRequest.Builder(group, false, List(tp).asJava,
false).build()
}
+ private def createOffsetFetchRequestAllPartitions: OffsetFetchRequest = {
+ new requests.OffsetFetchRequest.Builder(group, false, null, false).build()
+ }
+
+ private def createOffsetFetchRequest(groupToPartitionMap: util.Map[String,
util.List[TopicPartition]]): OffsetFetchRequest = {
+ new requests.OffsetFetchRequest.Builder(groupToPartitionMap, false,
false).build()
+ }
+
private def createFindCoordinatorRequest = {
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
@@ -1341,7 +1351,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
@Test
- def testFetchAllOffsetsTopicAuthorization(): Unit = {
+ def testOffsetFetchAllTopicPartitionsAuthorization(): Unit = {
createTopic(topic)
val offset = 15L
@@ -1358,17 +1368,204 @@ class AuthorizerIntegrationTest extends
BaseRequestTest {
// note there's only one broker, so no need to lookup the group coordinator
// without describe permission on the topic, we shouldn't be able to fetch
offsets
- val offsetFetchRequest = new requests.OffsetFetchRequest.Builder(group,
false, null, false).build()
+ val offsetFetchRequest = createOffsetFetchRequestAllPartitions
var offsetFetchResponse =
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
- assertEquals(Errors.NONE, offsetFetchResponse.error)
- assertTrue(offsetFetchResponse.responseData.isEmpty)
+ assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group))
+ assertTrue(offsetFetchResponse.partitionDataMap(group).isEmpty)
// now add describe permission on the topic and verify that the offset can
be fetched
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, DESCRIBE, ALLOW)), topicResource)
offsetFetchResponse =
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
- assertEquals(Errors.NONE, offsetFetchResponse.error)
- assertTrue(offsetFetchResponse.responseData.containsKey(tp))
- assertEquals(offset, offsetFetchResponse.responseData.get(tp).offset)
+ assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group))
+ assertTrue(offsetFetchResponse.partitionDataMap(group).containsKey(tp))
+ assertEquals(offset,
offsetFetchResponse.partitionDataMap(group).get(tp).offset)
+ }
+
+ @Test
+ def testOffsetFetchMultipleGroupsAuthorization(): Unit = {
+ val groups: Seq[String] = (1 to 5).map(i => s"group$i")
+ val groupResources = groups.map(group => new ResourcePattern(GROUP, group,
LITERAL))
+ val topics: Seq[String] = (1 to 3).map(i => s"topic$i")
+ val topicResources = topics.map(topic => new ResourcePattern(TOPIC, topic,
LITERAL))
+
+ val topic1List = singletonList(new TopicPartition(topics(0), 0))
+ val topic1And2List = util.Arrays.asList(
+ new TopicPartition(topics(0), 0),
+ new TopicPartition(topics(1), 0),
+ new TopicPartition(topics(1), 1))
+ val allTopicsList = util.Arrays.asList(
+ new TopicPartition(topics(0), 0),
+ new TopicPartition(topics(1), 0),
+ new TopicPartition(topics(1), 1),
+ new TopicPartition(topics(2), 0),
+ new TopicPartition(topics(2), 1),
+ new TopicPartition(topics(2), 2))
+
+ // create group to partition map to build batched offsetFetch request
+ val groupToPartitionMap = new util.HashMap[String,
util.List[TopicPartition]]()
+ groupToPartitionMap.put(groups(0), topic1List)
+ groupToPartitionMap.put(groups(1), topic1And2List)
+ groupToPartitionMap.put(groups(2), allTopicsList)
+ groupToPartitionMap.put(groups(3), null)
+ groupToPartitionMap.put(groups(4), null)
+
+ createTopic(topics(0))
+ createTopic(topics(1), numPartitions = 2)
+ createTopic(topics(2), numPartitions = 3)
+ groupResources.foreach(r => {
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, READ, ALLOW)), r)
+ })
+ topicResources.foreach(t => {
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, READ, ALLOW)), t)
+ })
+
+ val offset = 15L
+ val leaderEpoch: Optional[Integer] = Optional.of(1)
+ val metadata = "metadata"
+ val topicOneOffsets = topic1List.asScala.map {
+ tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+ }.toMap.asJava
+ val topicOneAndTwoOffsets = topic1And2List.asScala.map {
+ tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+ }.toMap.asJava
+ val allTopicOffsets = allTopicsList.asScala.map {
+ tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+ }.toMap.asJava
+
+ // create 5 consumers to commit offsets so we can fetch them later
+
+ def commitOffsets(tpList: util.List[TopicPartition],
+ offsets: util.Map[TopicPartition, OffsetAndMetadata]):
Unit = {
+ val consumer = createConsumer()
+ consumer.assign(tpList)
+ consumer.commitSync(offsets)
+ consumer.close()
+ }
+
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(0))
+ commitOffsets(topic1List, topicOneOffsets)
+
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(1))
+ commitOffsets(topic1And2List, topicOneAndTwoOffsets)
+
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(2))
+ commitOffsets(allTopicsList, allTopicOffsets)
+
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(3))
+ commitOffsets(allTopicsList, allTopicOffsets)
+
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(4))
+ commitOffsets(allTopicsList, allTopicOffsets)
+
+ removeAllClientAcls()
+
+ def verifyPartitionData(partitionData: OffsetFetchResponse.PartitionData):
Unit = {
+ assertTrue(!partitionData.hasError)
+ assertEquals(offset, partitionData.offset)
+ assertEquals(metadata, partitionData.metadata)
+ assertEquals(leaderEpoch.get(), partitionData.leaderEpoch.get())
+ }
+
+ def verifyResponse(groupLevelResponse: Errors,
+ partitionData: util.Map[TopicPartition, PartitionData],
+ topicList: util.List[TopicPartition]): Unit = {
+ assertEquals(Errors.NONE, groupLevelResponse)
+ assertTrue(partitionData.size() == topicList.size())
+ topicList.forEach(t => verifyPartitionData(partitionData.get(t)))
+ }
+
+ // test handling partial errors, where one group is fully authorized, some
groups don't have
+ // the right topic authorizations, and some groups have no authorization
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, READ, ALLOW)), groupResources(0))
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, READ, ALLOW)), groupResources(1))
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, READ, ALLOW)), groupResources(3))
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, DESCRIBE, ALLOW)), topicResources(0))
+ val offsetFetchRequest = createOffsetFetchRequest(groupToPartitionMap)
+ var offsetFetchResponse =
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
+ offsetFetchResponse.data().groups().forEach(g =>
+ g.groupId() match {
+ case "group1" =>
+ verifyResponse(offsetFetchResponse.groupLevelError(groups(0)),
offsetFetchResponse
+ .partitionDataMap(groups(0)), topic1List)
+ case "group2" =>
+ assertEquals(Errors.NONE,
offsetFetchResponse.groupLevelError(groups(1)))
+ val group2Response = offsetFetchResponse.partitionDataMap(groups(1))
+ assertTrue(group2Response.size() == 3)
+ assertTrue(group2Response.keySet().containsAll(topic1And2List))
+ verifyPartitionData(group2Response.get(topic1And2List.get(0)))
+ assertTrue(group2Response.get(topic1And2List.get(1)).hasError)
+ assertTrue(group2Response.get(topic1And2List.get(2)).hasError)
+ assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION,
group2Response.get(topic1And2List.get(1)))
+ assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION,
group2Response.get(topic1And2List.get(2)))
+ case "group3" =>
+ assertEquals(Errors.GROUP_AUTHORIZATION_FAILED,
offsetFetchResponse.groupLevelError(groups(2)))
+ assertTrue(offsetFetchResponse.partitionDataMap(groups(2)).size() ==
0)
+ case "group4" =>
+ verifyResponse(offsetFetchResponse.groupLevelError(groups(3)),
offsetFetchResponse
+ .partitionDataMap(groups(3)), topic1List)
+ case "group5" =>
+ assertEquals(Errors.GROUP_AUTHORIZATION_FAILED,
offsetFetchResponse.groupLevelError(groups(4)))
+ assertTrue(offsetFetchResponse.partitionDataMap(groups(4)).size() ==
0)
+ })
+
+ // test that after adding some of the ACLs, we get no group level
authorization errors, but
+ // still get topic level authorization errors for topics we don't have
ACLs for
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, READ, ALLOW)), groupResources(2))
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, READ, ALLOW)), groupResources(4))
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, DESCRIBE, ALLOW)), topicResources(1))
+ offsetFetchResponse =
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
+ offsetFetchResponse.data().groups().forEach(g =>
+ g.groupId() match {
+ case "group1" =>
+ verifyResponse(offsetFetchResponse.groupLevelError(groups(0)),
offsetFetchResponse
+ .partitionDataMap(groups(0)), topic1List)
+ case "group2" =>
+ verifyResponse(offsetFetchResponse.groupLevelError(groups(1)),
offsetFetchResponse
+ .partitionDataMap(groups(1)), topic1And2List)
+ case "group3" =>
+ assertEquals(Errors.NONE,
offsetFetchResponse.groupLevelError(groups(2)))
+ val group3Response = offsetFetchResponse.partitionDataMap(groups(2))
+ assertTrue(group3Response.size() == 6)
+ assertTrue(group3Response.keySet().containsAll(allTopicsList))
+ verifyPartitionData(group3Response.get(allTopicsList.get(0)))
+ verifyPartitionData(group3Response.get(allTopicsList.get(1)))
+ verifyPartitionData(group3Response.get(allTopicsList.get(2)))
+ assertTrue(group3Response.get(allTopicsList.get(3)).hasError)
+ assertTrue(group3Response.get(allTopicsList.get(4)).hasError)
+ assertTrue(group3Response.get(allTopicsList.get(5)).hasError)
+ assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION,
group3Response.get(allTopicsList.get(3)))
+ assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION,
group3Response.get(allTopicsList.get(4)))
+ assertEquals(OffsetFetchResponse.UNAUTHORIZED_PARTITION,
group3Response.get(allTopicsList.get(5)))
+ case "group4" =>
+ verifyResponse(offsetFetchResponse.groupLevelError(groups(3)),
offsetFetchResponse
+ .partitionDataMap(groups(3)), topic1And2List)
+ case "group5" =>
+ verifyResponse(offsetFetchResponse.groupLevelError(groups(4)),
offsetFetchResponse
+ .partitionDataMap(groups(4)), topic1And2List)
+ })
+
+ // test that after adding all necessary ACLs, we get no partition level or
group level errors
+ // from the offsetFetch response
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, DESCRIBE, ALLOW)), topicResources(2))
+ offsetFetchResponse =
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
+ offsetFetchResponse.data().groups().forEach(g =>
+ g.groupId() match {
+ case "group1" =>
+ verifyResponse(offsetFetchResponse.groupLevelError(groups(0)),
offsetFetchResponse
+ .partitionDataMap(groups(0)), topic1List)
+ case "group2" =>
+ verifyResponse(offsetFetchResponse.groupLevelError(groups(1)),
offsetFetchResponse
+ .partitionDataMap(groups(1)), topic1And2List)
+ case "group3" =>
+ verifyResponse(offsetFetchResponse.groupLevelError(groups(2)),
offsetFetchResponse
+ .partitionDataMap(groups(2)), allTopicsList)
+ case "group4" =>
+ verifyResponse(offsetFetchResponse.groupLevelError(groups(3)),
offsetFetchResponse
+ .partitionDataMap(groups(3)), allTopicsList)
+ case "group5" =>
+ verifyResponse(offsetFetchResponse.groupLevelError(groups(4)),
offsetFetchResponse
+ .partitionDataMap(groups(4)), allTopicsList)
+ })
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
new file mode 100644
index 0000000..ea5064b
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
+import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest,
OffsetFetchResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+import java.util
+import java.util.Collections.singletonList
+import scala.jdk.CollectionConverters._
+import java.util.{Optional, Properties}
+
+class OffsetFetchRequestTest extends BaseRequestTest {
+
+ override def brokerCount: Int = 1
+
+ val brokerId: Integer = 0
+ val offset = 15L
+ val leaderEpoch: Optional[Integer] = Optional.of(3)
+ val metadata = "metadata"
+ val topic = "topic"
+ val groupId = "groupId"
+ val groups: Seq[String] = (1 to 5).map(i => s"group$i")
+
+ override def brokerPropertyOverrides(properties: Properties): Unit = {
+ properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+ properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+ properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
+ properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
+ properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
+ properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
+ }
+
+ @BeforeEach
+ override def setUp(): Unit = {
+ doSetup(createOffsetsTopic = false)
+
+ TestUtils.createOffsetsTopic(zkClient, servers)
+ }
+
+ @Test
+ def testOffsetFetchRequestSingleGroup(): Unit = {
+ createTopic(topic)
+
+ val tpList = singletonList(new TopicPartition(topic, 0))
+ val topicOffsets = tpList.asScala.map{
+ tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+ }.toMap.asJava
+
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+ commitOffsets(tpList, topicOffsets)
+
+ // testing from version 1 onward since version 0 read offsets from ZK
+ for (version <- 1 to ApiKeys.OFFSET_FETCH.latestVersion()) {
+ if (version < 8) {
+ val request =
+ if (version < 7) {
+ new OffsetFetchRequest.Builder(
+ groupId, false, tpList, false)
+ .build(version.asInstanceOf[Short])
+ } else {
+ new OffsetFetchRequest.Builder(
+ groupId, false, tpList, true)
+ .build(version.asInstanceOf[Short])
+ }
+ val response = connectAndReceive[OffsetFetchResponse](request)
+ val topicData = response.data().topics().get(0)
+ val partitionData = topicData.partitions().get(0)
+ if (version < 3) {
+ assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME,
response.throttleTimeMs())
+ }
+ verifySingleGroupResponse(version.asInstanceOf[Short],
+ response.error().code(), partitionData.errorCode(), topicData.name(),
+ partitionData.partitionIndex(), partitionData.committedOffset(),
+ partitionData.committedLeaderEpoch(), partitionData.metadata())
+ } else {
+ val request = new OffsetFetchRequest.Builder(
+ Map(groupId -> tpList).asJava, false, false)
+ .build(version.asInstanceOf[Short])
+ val response = connectAndReceive[OffsetFetchResponse](request)
+ val groupData = response.data().groups().get(0)
+ val topicData = groupData.topics().get(0)
+ val partitionData = topicData.partitions().get(0)
+ verifySingleGroupResponse(version.asInstanceOf[Short],
+ groupData.errorCode(), partitionData.errorCode(), topicData.name(),
+ partitionData.partitionIndex(), partitionData.committedOffset(),
+ partitionData.committedLeaderEpoch(), partitionData.metadata())
+ }
+ }
+ }
+
+ @Test
+ def testOffsetFetchRequestWithMultipleGroups(): Unit = {
+
+ val topic1 = "topic1"
+ val topic1List = singletonList(new TopicPartition(topic1, 0))
+ val topic2 = "topic2"
+ val topic1And2List = util.Arrays.asList(
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic2, 0),
+ new TopicPartition(topic2, 1))
+ val topic3 = "topic3"
+ val allTopicsList = util.Arrays.asList(
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic2, 0),
+ new TopicPartition(topic2, 1),
+ new TopicPartition(topic3, 0),
+ new TopicPartition(topic3, 1),
+ new TopicPartition(topic3, 2))
+
+ // create group to partition map to build batched offsetFetch request
+ val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
+ new util.HashMap[String, util.List[TopicPartition]]()
+ groupToPartitionMap.put(groups(0), topic1List)
+ groupToPartitionMap.put(groups(1), topic1And2List)
+ groupToPartitionMap.put(groups(2), allTopicsList)
+ groupToPartitionMap.put(groups(3), null)
+ groupToPartitionMap.put(groups(4), null)
+
+ createTopic(topic1)
+ createTopic(topic2, numPartitions = 2)
+ createTopic(topic3, numPartitions = 3)
+
+ val topicOneOffsets = topic1List.asScala.map{
+ tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+ }.toMap.asJava
+ val topicOneAndTwoOffsets = topic1And2List.asScala.map{
+ tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+ }.toMap.asJava
+ val allTopicOffsets = allTopicsList.asScala.map{
+ tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+ }.toMap.asJava
+
+ // create 5 consumers to commit offsets so we can fetch them later
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(0))
+ commitOffsets(topic1List, topicOneOffsets)
+
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(1))
+ commitOffsets(topic1And2List, topicOneAndTwoOffsets)
+
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(2))
+ commitOffsets(allTopicsList, allTopicOffsets)
+
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(3))
+ commitOffsets(allTopicsList, allTopicOffsets)
+
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(4))
+ commitOffsets(allTopicsList, allTopicOffsets)
+
+ for (version <- 8 to ApiKeys.OFFSET_FETCH.latestVersion()) {
+ val request = new OffsetFetchRequest.Builder(groupToPartitionMap,
false, false)
+ .build(version.asInstanceOf[Short])
+ val response = connectAndReceive[OffsetFetchResponse](request)
+ response.data().groups().forEach(g =>
+ g.groupId() match {
+ case "group1" =>
+ verifyResponse(response.groupLevelError(groups(0)),
+ response.partitionDataMap(groups(0)), topic1List)
+ case "group2" =>
+ verifyResponse(response.groupLevelError(groups(1)),
+ response.partitionDataMap(groups(1)), topic1And2List)
+ case "group3" =>
+ verifyResponse(response.groupLevelError(groups(2)),
+ response.partitionDataMap(groups(2)), allTopicsList)
+ case "group4" =>
+ verifyResponse(response.groupLevelError(groups(3)),
+ response.partitionDataMap(groups(3)), allTopicsList)
+ case "group5" =>
+ verifyResponse(response.groupLevelError(groups(4)),
+ response.partitionDataMap(groups(4)), allTopicsList)
+ })
+ }
+ }
+
+ private def verifySingleGroupResponse(version: Short,
+ responseError: Short,
+ partitionError: Short,
+ topicName: String,
+ partitionIndex: Integer,
+ committedOffset: Long,
+ committedLeaderEpoch: Integer,
+ partitionMetadata: String): Unit = {
+ assertEquals(Errors.NONE.code(), responseError)
+ assertEquals(topic, topicName)
+ assertEquals(0, partitionIndex)
+ assertEquals(offset, committedOffset)
+ if (version >= 5) {
+ assertEquals(leaderEpoch.get(), committedLeaderEpoch)
+ }
+ assertEquals(metadata, partitionMetadata)
+ assertEquals(Errors.NONE.code(), partitionError)
+ }
+
+ private def verifyPartitionData(partitionData:
OffsetFetchResponse.PartitionData): Unit = {
+ assertTrue(!partitionData.hasError)
+ assertEquals(offset, partitionData.offset)
+ assertEquals(metadata, partitionData.metadata)
+ assertEquals(leaderEpoch.get(), partitionData.leaderEpoch.get())
+ }
+
+ private def verifyResponse(groupLevelResponse: Errors,
+ partitionData: util.Map[TopicPartition,
PartitionData],
+ topicList: util.List[TopicPartition]): Unit = {
+ assertEquals(Errors.NONE, groupLevelResponse)
+ assertTrue(partitionData.size() == topicList.size())
+ topicList.forEach(t => verifyPartitionData(partitionData.get(t)))
+ }
+
+ private def commitOffsets(tpList: util.List[TopicPartition],
+ offsets: util.Map[TopicPartition,
OffsetAndMetadata]): Unit = {
+ val consumer = createConsumer()
+ consumer.assign(tpList)
+ consumer.commitSync(offsets)
+ consumer.close()
+ }
+}